diff --git a/cmd/server/main.go b/cmd/server/main.go index 797fc9e..5894921 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -108,7 +108,7 @@ func main() { r.Use(gin.Recovery()) r.Use(middleware.GinLogger()) - h := &handler.Handler{DB: sqlDB, JWTSecret: cfg.Server.JWTSecret} + h := &handler.Handler{DB: sqlDB, JWTSecret: cfg.Server.JWTSecret, MQTTTopic: cfg.MQTT.Topic} // Public routes r.GET("/health", h.Health) @@ -126,12 +126,14 @@ func main() { auth.GET("/messages", handler.GetMessages) auth.GET("/sensors", h.GetSensors) auth.GET("/sensors/:id", h.GetSensor) + auth.POST("/sensors/:id/trigger", h.TriggerSensor) auth.POST("/sensors", h.CreateSensor) auth.PUT("/sensors/:id", h.UpdateSensor) auth.DELETE("/sensors/:id", h.DeleteSensor) auth.GET("/actors", h.GetActors) auth.GET("/actors/:id", h.GetActor) + auth.POST("/actors/:id/write", h.WriteActor) auth.POST("/actors", h.CreateActor) auth.PUT("/actors/:id", h.UpdateActor) auth.DELETE("/actors/:id", h.DeleteActor) diff --git a/internal/handler/handlers.go b/internal/handler/handlers.go index 9fca862..6bb77d1 100644 --- a/internal/handler/handlers.go +++ b/internal/handler/handlers.go @@ -2,11 +2,14 @@ package handler import ( "database/sql" + "encoding/json" "net/http" + "os" "strconv" "strings" "time" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" "git.piskot.si/SeminarM2/lambdaiot-core/internal/storage" "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt/v5" @@ -16,6 +19,7 @@ import ( type Handler struct { DB *sql.DB JWTSecret string + MQTTTopic string } type Device struct { @@ -55,6 +59,30 @@ type SensorReading struct { ValueAt time.Time `json:"value_at" db:"value_at"` } +func (h *Handler) mqttTopic() string { + if h != nil && h.MQTTTopic != "" { + return h.MQTTTopic + } + if v := os.Getenv("MQTT_TOPIC"); v != "" { + return v + } + return "lambdaiot" +} + +func (h *Handler) publishMQTT(c *gin.Context, payload any) bool { + topic := h.mqttTopic() + body, err := json.Marshal(payload) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal payload"}) + return false + } + if err := mqtt.PublishDefault(topic, body); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return false + } + return true +} + // Health returns basic service health func (h *Handler) Health(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok"}) @@ -237,6 +265,58 @@ func (h *Handler) DeleteSensor(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "sensor deleted"}) } +// TriggerSensor sends an MQTT message requesting a new reading from the sensor. +// The message payload contains the sensor ID and the requested action (default "read"). +func (h *Handler) TriggerSensor(c *gin.Context) { + sensorID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid sensor id"}) + return + } + + var exists int + if err := h.DB.QueryRow("SELECT COUNT(1) FROM sensors WHERE id = UUID_TO_BIN(?)", sensorID.String()).Scan(&exists); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check sensor"}) + return + } + if exists == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "sensor not found"}) + return + } + + var req struct { + Action string `json:"action"` + } + if c.Request.ContentLength > 0 { + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + } + action := req.Action + if action == "" { + action = "read" + } + + payload := struct { + Type string `json:"type"` + SensorID string `json:"sensor_id"` + Action string `json:"action"` + RequestedAt string `json:"requested_at"` + }{ + Type: "sensor_trigger", + SensorID: sensorID.String(), + Action: action, + RequestedAt: time.Now().UTC().Format(time.RFC3339), + } + + if !h.publishMQTT(c, payload) { + return + } + + c.JSON(http.StatusAccepted, gin.H{"status": "published", "topic": h.mqttTopic()}) +} + // CreateDevice creates a new device func (h *Handler) CreateDevice(c *gin.Context) { var req struct { @@ -555,6 +635,54 @@ func (h *Handler) DeleteActor(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "actor deleted"}) } +// WriteActor publishes a desired action/value for the actor to the MQTT topic. +func (h *Handler) WriteActor(c *gin.Context) { + actorID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid actor id"}) + return + } + + var exists int + if err := h.DB.QueryRow("SELECT COUNT(1) FROM actors WHERE id = UUID_TO_BIN(?)", actorID.String()).Scan(&exists); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check actor"}) + return + } + if exists == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "actor not found"}) + return + } + + var req struct { + Action string `json:"action" binding:"required"` + Value json.RawMessage `json:"value"` + } + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + payload := struct { + Type string `json:"type"` + ActorID string `json:"actor_id"` + Action string `json:"action"` + Value json.RawMessage `json:"value,omitempty"` + RequestedAt string `json:"requested_at"` + }{ + Type: "actor_command", + ActorID: actorID.String(), + Action: req.Action, + Value: req.Value, + RequestedAt: time.Now().UTC().Format(time.RFC3339), + } + + if !h.publishMQTT(c, payload) { + return + } + + c.JSON(http.StatusAccepted, gin.H{"status": "published", "topic": h.mqttTopic()}) +} + // Sensor readings CRUD func (h *Handler) CreateSensorReading(c *gin.Context) { var req struct { diff --git a/internal/handler/mqttping.go b/internal/handler/mqttping.go index 198f009..4baa6f0 100644 --- a/internal/handler/mqttping.go +++ b/internal/handler/mqttping.go @@ -2,23 +2,16 @@ package handler import ( "net/http" - "os" "time" - "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" "github.com/gin-gonic/gin" ) // MQTTPing publishes the current timestamp to the configured MQTT topic func (h *Handler) MQTTPing(c *gin.Context) { - ts := time.Now().Format(time.RFC3339) - topic := os.Getenv("MQTT_TOPIC") - if topic == "" { - topic = "lambdaiot" - } - if err := mqtt.PublishDefault(topic, []byte(ts)); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + ts := time.Now().UTC().Format(time.RFC3339) + if !h.publishMQTT(c, gin.H{"type": "ping", "timestamp": ts}) { return } - c.JSON(http.StatusOK, gin.H{"timestamp": ts}) + c.JSON(http.StatusOK, gin.H{"timestamp": ts, "topic": h.mqttTopic()}) }