feat: add endpoints for triggering sensor reads and actor writes
This commit is contained in:
+3
-1
@@ -108,7 +108,7 @@ func main() {
|
||||
r.Use(gin.Recovery())
|
||||
r.Use(middleware.GinLogger())
|
||||
|
||||
h := &handler.Handler{DB: sqlDB, JWTSecret: cfg.Server.JWTSecret}
|
||||
h := &handler.Handler{DB: sqlDB, JWTSecret: cfg.Server.JWTSecret, MQTTTopic: cfg.MQTT.Topic}
|
||||
|
||||
// Public routes
|
||||
r.GET("/health", h.Health)
|
||||
@@ -126,12 +126,14 @@ func main() {
|
||||
auth.GET("/messages", handler.GetMessages)
|
||||
auth.GET("/sensors", h.GetSensors)
|
||||
auth.GET("/sensors/:id", h.GetSensor)
|
||||
auth.POST("/sensors/:id/trigger", h.TriggerSensor)
|
||||
auth.POST("/sensors", h.CreateSensor)
|
||||
auth.PUT("/sensors/:id", h.UpdateSensor)
|
||||
auth.DELETE("/sensors/:id", h.DeleteSensor)
|
||||
|
||||
auth.GET("/actors", h.GetActors)
|
||||
auth.GET("/actors/:id", h.GetActor)
|
||||
auth.POST("/actors/:id/write", h.WriteActor)
|
||||
auth.POST("/actors", h.CreateActor)
|
||||
auth.PUT("/actors/:id", h.UpdateActor)
|
||||
auth.DELETE("/actors/:id", h.DeleteActor)
|
||||
|
||||
@@ -2,11 +2,14 @@ package handler
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/storage"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
@@ -16,6 +19,7 @@ import (
|
||||
type Handler struct {
|
||||
DB *sql.DB
|
||||
JWTSecret string
|
||||
MQTTTopic string
|
||||
}
|
||||
|
||||
type Device struct {
|
||||
@@ -55,6 +59,30 @@ type SensorReading struct {
|
||||
ValueAt time.Time `json:"value_at" db:"value_at"`
|
||||
}
|
||||
|
||||
func (h *Handler) mqttTopic() string {
|
||||
if h != nil && h.MQTTTopic != "" {
|
||||
return h.MQTTTopic
|
||||
}
|
||||
if v := os.Getenv("MQTT_TOPIC"); v != "" {
|
||||
return v
|
||||
}
|
||||
return "lambdaiot"
|
||||
}
|
||||
|
||||
func (h *Handler) publishMQTT(c *gin.Context, payload any) bool {
|
||||
topic := h.mqttTopic()
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal payload"})
|
||||
return false
|
||||
}
|
||||
if err := mqtt.PublishDefault(topic, body); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Health returns basic service health
|
||||
func (h *Handler) Health(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
||||
@@ -237,6 +265,58 @@ func (h *Handler) DeleteSensor(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"message": "sensor deleted"})
|
||||
}
|
||||
|
||||
// TriggerSensor sends an MQTT message requesting a new reading from the sensor.
|
||||
// The message payload contains the sensor ID and the requested action (default "read").
|
||||
func (h *Handler) TriggerSensor(c *gin.Context) {
|
||||
sensorID, err := uuid.Parse(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid sensor id"})
|
||||
return
|
||||
}
|
||||
|
||||
var exists int
|
||||
if err := h.DB.QueryRow("SELECT COUNT(1) FROM sensors WHERE id = UUID_TO_BIN(?)", sensorID.String()).Scan(&exists); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check sensor"})
|
||||
return
|
||||
}
|
||||
if exists == 0 {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "sensor not found"})
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Action string `json:"action"`
|
||||
}
|
||||
if c.Request.ContentLength > 0 {
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
action := req.Action
|
||||
if action == "" {
|
||||
action = "read"
|
||||
}
|
||||
|
||||
payload := struct {
|
||||
Type string `json:"type"`
|
||||
SensorID string `json:"sensor_id"`
|
||||
Action string `json:"action"`
|
||||
RequestedAt string `json:"requested_at"`
|
||||
}{
|
||||
Type: "sensor_trigger",
|
||||
SensorID: sensorID.String(),
|
||||
Action: action,
|
||||
RequestedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if !h.publishMQTT(c, payload) {
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusAccepted, gin.H{"status": "published", "topic": h.mqttTopic()})
|
||||
}
|
||||
|
||||
// CreateDevice creates a new device
|
||||
func (h *Handler) CreateDevice(c *gin.Context) {
|
||||
var req struct {
|
||||
@@ -555,6 +635,54 @@ func (h *Handler) DeleteActor(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"message": "actor deleted"})
|
||||
}
|
||||
|
||||
// WriteActor publishes a desired action/value for the actor to the MQTT topic.
|
||||
func (h *Handler) WriteActor(c *gin.Context) {
|
||||
actorID, err := uuid.Parse(c.Param("id"))
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid actor id"})
|
||||
return
|
||||
}
|
||||
|
||||
var exists int
|
||||
if err := h.DB.QueryRow("SELECT COUNT(1) FROM actors WHERE id = UUID_TO_BIN(?)", actorID.String()).Scan(&exists); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check actor"})
|
||||
return
|
||||
}
|
||||
if exists == 0 {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "actor not found"})
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Action string `json:"action" binding:"required"`
|
||||
Value json.RawMessage `json:"value"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
payload := struct {
|
||||
Type string `json:"type"`
|
||||
ActorID string `json:"actor_id"`
|
||||
Action string `json:"action"`
|
||||
Value json.RawMessage `json:"value,omitempty"`
|
||||
RequestedAt string `json:"requested_at"`
|
||||
}{
|
||||
Type: "actor_command",
|
||||
ActorID: actorID.String(),
|
||||
Action: req.Action,
|
||||
Value: req.Value,
|
||||
RequestedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if !h.publishMQTT(c, payload) {
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusAccepted, gin.H{"status": "published", "topic": h.mqttTopic()})
|
||||
}
|
||||
|
||||
// Sensor readings CRUD
|
||||
func (h *Handler) CreateSensorReading(c *gin.Context) {
|
||||
var req struct {
|
||||
|
||||
@@ -2,23 +2,16 @@ package handler
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// MQTTPing publishes the current timestamp to the configured MQTT topic
|
||||
func (h *Handler) MQTTPing(c *gin.Context) {
|
||||
ts := time.Now().Format(time.RFC3339)
|
||||
topic := os.Getenv("MQTT_TOPIC")
|
||||
if topic == "" {
|
||||
topic = "lambdaiot"
|
||||
}
|
||||
if err := mqtt.PublishDefault(topic, []byte(ts)); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
ts := time.Now().UTC().Format(time.RFC3339)
|
||||
if !h.publishMQTT(c, gin.H{"type": "ping", "timestamp": ts}) {
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"timestamp": ts})
|
||||
c.JSON(http.StatusOK, gin.H{"timestamp": ts, "topic": h.mqttTopic()})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user