From b706a9ce14d6508ce599ec5ef3533569045b95fc Mon Sep 17 00:00:00 2001 From: Kristjan Komlosi Date: Mon, 12 Jan 2026 11:19:58 +0100 Subject: [PATCH] feat: device state checker --- cmd/server/main.go | 19 ++- internal/devicecheck/devicecheck.go | 199 ++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 internal/devicecheck/devicecheck.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 5894921..e611190 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,6 +16,7 @@ import ( _ "github.com/go-sql-driver/mysql" "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/devicecheck" "git.piskot.si/SeminarM2/lambdaiot-core/internal/handler" "git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware" mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" @@ -54,6 +55,8 @@ func main() { // connect to MQTT broker (best-effort) var mq *mqttclient.Client + var checker *devicecheck.Checker + var checkCancel context.CancelFunc // initialize sqlite for state messages var stateDB *storage.DB { @@ -77,17 +80,26 @@ func main() { mq = mqc mqttclient.SetDefault(mqc) + // start device state checker when MQTT is available + checkCtx, checkCancel = context.WithCancel(context.Background()) + checker = devicecheck.New(sqlDB, mq, cfg.MQTT.Topic, 0, 0) + checker.Start(checkCtx) + // subscribe to the configured topic and log incoming messages if cfg.MQTT.Topic != "" { - if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) { + handlerFn := func(t string, p []byte) { log.Printf("mqtt recv on %s: %s", t, string(p)) + if checker != nil { + checker.HandleMessage(p) + } // if topic starts with state: -> store in sqlite if strings.HasPrefix(t, "state:") { if storage.Default != nil { _ = storage.Default.InsertMessage(string(p), time.Now()) } } - }); err != nil { + } + if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil { log.Printf("warning: mqtt subscribe failed: %v", err) } } @@ -173,6 +185,9 @@ func main() { if mq != nil { mq.Close() } + if checkCancel != nil { + checkCancel() + } if stateDB != nil { stateDB.Close() } diff --git a/internal/devicecheck/devicecheck.go b/internal/devicecheck/devicecheck.go new file mode 100644 index 0000000..6cf1ede --- /dev/null +++ b/internal/devicecheck/devicecheck.go @@ -0,0 +1,199 @@ +package devicecheck + +import ( + "context" + "database/sql" + "encoding/json" + "log" + "strings" + "sync" + "time" + + "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" +) + +// Checker periodically pings devices via MQTT and updates their status +// based on responses received on the subscribed topic. +type Checker struct { + DB *sql.DB + MQTT *mqtt.Client + Topic string + Interval time.Duration + Timeout time.Duration + + mu sync.Mutex + waiters map[string]chan struct{} +} + +// New constructs a Checker with sane defaults for interval and timeout when zero. +func New(db *sql.DB, mq *mqtt.Client, topic string, interval, timeout time.Duration) *Checker { + if interval == 0 { + interval = time.Minute + } + if timeout == 0 { + timeout = 10 * time.Second + } + if topic == "" { + topic = "lambdaiot" + } + return &Checker{ + DB: db, + MQTT: mq, + Topic: topic, + Interval: interval, + Timeout: timeout, + waiters: make(map[string]chan struct{}), + } +} + +// Start launches the periodic checker until the context is canceled. +func (c *Checker) Start(ctx context.Context) { + go c.loop(ctx) +} + +// HandleMessage attempts to parse a device state/check response message and +// signals any waiter for the given device. Expected JSON payload shape: +// +// { +// "type": "device_check_response" | "device_state", +// "device_id": "uuid", +// "status": "ok" | "lost" | "pending" +// } +func (c *Checker) HandleMessage(payload []byte) { + if c == nil { + return + } + var msg struct { + Type string `json:"type"` + DeviceID string `json:"device_id"` + Status string `json:"status"` + } + if err := json.Unmarshal(payload, &msg); err != nil { + return + } + id := strings.ToLower(strings.TrimSpace(msg.DeviceID)) + if id == "" { + return + } + + // Notify waiter if present + c.mu.Lock() + ch := c.waiters[id] + c.mu.Unlock() + if ch != nil { + select { + case ch <- struct{}{}: + default: + } + } + + // Update status to ok on explicit ok or check response + if msg.Type == "device_check_response" || msg.Type == "device_state" || strings.EqualFold(msg.Status, "ok") { + _ = c.updateStatus(id, 1) // 1 = ok + } +} + +func (c *Checker) loop(ctx context.Context) { + ticker := time.NewTicker(c.Interval) + defer ticker.Stop() + + // Run immediately + c.runOnce() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.runOnce() + } + } +} + +func (c *Checker) runOnce() { + if c == nil || c.DB == nil || c.MQTT == nil { + return + } + + rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id != 4") // skip disabled + if err != nil { + log.Printf("devicecheck: query devices failed: %v", err) + return + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + log.Printf("devicecheck: scan id failed: %v", err) + return + } + ids = append(ids, strings.ToLower(id)) + } + + var wg sync.WaitGroup + for _, id := range ids { + wg.Add(1) + go func(deviceID string) { + defer wg.Done() + c.checkDevice(deviceID) + }(id) + } + wg.Wait() +} + +func (c *Checker) checkDevice(deviceID string) { + if c == nil || c.DB == nil || c.MQTT == nil { + return + } + + if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending + log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err) + } + + ch := make(chan struct{}, 1) + c.mu.Lock() + c.waiters[deviceID] = ch + c.mu.Unlock() + + payload := struct { + Type string `json:"type"` + DeviceID string `json:"device_id"` + RequestedAt string `json:"requested_at"` + }{ + Type: "device_check_request", + DeviceID: deviceID, + RequestedAt: time.Now().UTC().Format(time.RFC3339), + } + + body, err := json.Marshal(payload) + if err == nil { + if err := c.MQTT.Publish(c.Topic, body); err != nil { + log.Printf("devicecheck: publish failed for %s: %v", deviceID, err) + } + } + + select { + case <-ch: + if err := c.updateStatus(deviceID, 1); err != nil { // ok + log.Printf("devicecheck: set ok failed for %s: %v", deviceID, err) + } + case <-time.After(c.Timeout): + if err := c.updateStatus(deviceID, 3); err != nil { // lost + log.Printf("devicecheck: set lost failed for %s: %v", deviceID, err) + } + } + + c.mu.Lock() + delete(c.waiters, deviceID) + c.mu.Unlock() +} + +func (c *Checker) updateStatus(deviceID string, statusID int) error { + if c == nil || c.DB == nil { + return nil + } + _, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID) + return err +}