diff --git a/API.md b/API.md index 54c0d40..9ba4d46 100644 --- a/API.md +++ b/API.md @@ -140,10 +140,10 @@ Retrieve all devices in the system (public read access). ``` **Status IDs**: -- `1`: OK/Online -- `2`: Warning -- `3`: Error -- `4`: Offline +- `1`: ok +- `2`: pending +- `3`: lost +- `4`: disabled --- @@ -840,6 +840,57 @@ Devices can announce themselves by publishing to the discovery topic: --- +### Device Check (State Polling) + +The server periodically publishes device check requests to the main MQTT topic and +expects devices to respond on the same topic. + +**Topic**: `{configured_mqtt_topic}` + +**Request Message Published**: +```json +{ + "type": "device_check_request", + "device_id": "uuid", + "requested_at": "2026-01-14T10:00:00Z" +} +``` + +**Expected Device Response**: +```json +{ + "type": "device_check_response", + "device_id": "uuid", + "status": "ok" | "lost" | "pending" +} +``` + +--- + +### Sensor Readings via MQTT + +Devices can publish sensor readings directly to the main MQTT topic. The server +stores readings only if the `sensor_id` exists in the database. + +**Topic**: `{configured_mqtt_topic}` + +**Message Format**: +```json +{ + "type": "sensor_reading", + "sensor_id": "uuid", + "value": 23.7, + "value_at": "2026-01-14T10:00:00Z" +} +``` + +**Notes**: +- `type` may also be `sensor_value`. +- `value` may be a number, boolean, or numeric string (e.g., "1", "0", "23.7"). +- `value_at` is optional; if omitted, the server uses the current time. + +--- + ## Error Responses All endpoints may return the following standard error responses: diff --git a/cmd/server/main.go b/cmd/server/main.go index 1388d46..7b9aefe 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -78,6 +78,9 @@ func main() { if checker != nil { checker.HandleMessage(p) } + if handled, err := h.ProcessSensorReadingMessage(p); handled && err != nil { + log.Printf("sensor reading message error: %v", err) + } } if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil { log.Printf("warning: mqtt subscribe failed: %v", err) diff --git a/internal/devicecheck/devicecheck.go b/internal/devicecheck/devicecheck.go index 6cf1ede..bddc2cc 100644 --- a/internal/devicecheck/devicecheck.go +++ b/internal/devicecheck/devicecheck.go @@ -48,6 +48,9 @@ func New(db *sql.DB, mq *mqtt.Client, topic string, interval, timeout time.Durat // Start launches the periodic checker until the context is canceled. func (c *Checker) Start(ctx context.Context) { + if c != nil { + log.Printf("devicecheck: starting (topic=%s interval=%s timeout=%s)", c.Topic, c.Interval, c.Timeout) + } go c.loop(ctx) } @@ -76,7 +79,12 @@ func (c *Checker) HandleMessage(payload []byte) { return } - // Notify waiter if present + msgType := strings.ToLower(strings.TrimSpace(msg.Type)) + if msgType != "device_check_response" && msgType != "device_state" && !strings.EqualFold(msg.Status, "ok") { + return + } + + // Notify waiter if present for valid response/state c.mu.Lock() ch := c.waiters[id] c.mu.Unlock() @@ -87,10 +95,8 @@ func (c *Checker) HandleMessage(payload []byte) { } } - // Update status to ok on explicit ok or check response - if msg.Type == "device_check_response" || msg.Type == "device_state" || strings.EqualFold(msg.Status, "ok") { - _ = c.updateStatus(id, 1) // 1 = ok - } + // Update status to ok on explicit ok or check response/state + _ = c.updateStatus(id, 1) // 1 = ok } func (c *Checker) loop(ctx context.Context) { @@ -115,7 +121,7 @@ func (c *Checker) runOnce() { return } - rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id != 4") // skip disabled + rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id NOT IN (3, 4)") // skip lost/disabled if err != nil { log.Printf("devicecheck: query devices failed: %v", err) return @@ -131,6 +137,14 @@ func (c *Checker) runOnce() { } ids = append(ids, strings.ToLower(id)) } + if err := rows.Err(); err != nil { + log.Printf("devicecheck: rows error: %v", err) + return + } + if len(ids) == 0 { + log.Printf("devicecheck: no devices found to check") + return + } var wg sync.WaitGroup for _, id := range ids { @@ -148,10 +162,6 @@ func (c *Checker) checkDevice(deviceID string) { return } - if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending - log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err) - } - ch := make(chan struct{}, 1) c.mu.Lock() c.waiters[deviceID] = ch @@ -169,17 +179,22 @@ func (c *Checker) checkDevice(deviceID string) { body, err := json.Marshal(payload) if err == nil { + log.Printf("devicecheck: publish request for %s", deviceID) if err := c.MQTT.Publish(c.Topic, body); err != nil { log.Printf("devicecheck: publish failed for %s: %v", deviceID, err) } } + if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending + log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err) + } + select { case <-ch: if err := c.updateStatus(deviceID, 1); err != nil { // ok log.Printf("devicecheck: set ok failed for %s: %v", deviceID, err) } - case <-time.After(c.Timeout): + case <-time.After(c.Interval): if err := c.updateStatus(deviceID, 3); err != nil { // lost log.Printf("devicecheck: set lost failed for %s: %v", deviceID, err) } @@ -194,6 +209,19 @@ func (c *Checker) updateStatus(deviceID string, statusID int) error { if c == nil || c.DB == nil { return nil } - _, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID) - return err + log.Printf("devicecheck: update status request %s -> %d", deviceID, statusID) + res, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID) + if err != nil { + log.Printf("devicecheck: update status failed for %s -> %d: %v", deviceID, statusID, err) + return err + } + rows, err := res.RowsAffected() + if err != nil { + log.Printf("devicecheck: rows affected failed for %s -> %d: %v", deviceID, statusID, err) + return err + } + if rows == 0 { + log.Printf("devicecheck: no rows updated for %s -> %d", deviceID, statusID) + } + return nil } diff --git a/internal/handler/sensor_readings_mqtt.go b/internal/handler/sensor_readings_mqtt.go new file mode 100644 index 0000000..6bb456a --- /dev/null +++ b/internal/handler/sensor_readings_mqtt.go @@ -0,0 +1,98 @@ +package handler + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" +) + +// ProcessSensorReadingMessage parses a sensor reading MQTT message and stores it. +// Returns handled=false when the payload does not look like a sensor reading. +func (h *Handler) ProcessSensorReadingMessage(payload []byte) (bool, error) { + if h == nil || h.DB == nil { + return false, nil + } + + var msg struct { + Type string `json:"type"` + SensorID string `json:"sensor_id"` + Value json.RawMessage `json:"value"` + ValueAt string `json:"value_at"` + } + if err := json.Unmarshal(payload, &msg); err != nil { + return false, nil + } + + msgType := strings.ToLower(strings.TrimSpace(msg.Type)) + if msgType != "" && msgType != "sensor_reading" && msgType != "sensor_value" { + return false, nil + } + if strings.TrimSpace(msg.SensorID) == "" || len(msg.Value) == 0 { + return false, nil + } + + value, err := parseSensorValue(msg.Value) + if err != nil { + return true, fmt.Errorf("invalid sensor value: %w", err) + } + + valueAt := time.Now().UTC() + if strings.TrimSpace(msg.ValueAt) != "" { + parsed, err := time.Parse(time.RFC3339, msg.ValueAt) + if err != nil { + return true, fmt.Errorf("invalid value_at: %w", err) + } + valueAt = parsed + } + + var exists int + if err := h.DB.QueryRow("SELECT COUNT(1) FROM sensors WHERE id = UUID_TO_BIN(?)", msg.SensorID).Scan(&exists); err != nil { + return true, fmt.Errorf("failed to check sensor: %w", err) + } + if exists == 0 { + return true, fmt.Errorf("sensor not found: %s", msg.SensorID) + } + + if _, err := h.DB.Exec( + "INSERT INTO sensor_readings (sensor_id, value, value_at) VALUES (UUID_TO_BIN(?), ?, ?)", + msg.SensorID, + value, + valueAt, + ); err != nil { + return true, fmt.Errorf("failed to insert sensor reading: %w", err) + } + + return true, nil +} + +func parseSensorValue(raw json.RawMessage) (float64, error) { + var v any + if err := json.Unmarshal(raw, &v); err != nil { + return 0, err + } + switch t := v.(type) { + case float64: + return t, nil + case bool: + if t { + return 1, nil + } + return 0, nil + case string: + if strings.EqualFold(t, "true") { + return 1, nil + } + if strings.EqualFold(t, "false") { + return 0, nil + } + f, err := strconv.ParseFloat(t, 64) + if err != nil { + return 0, err + } + return f, nil + default: + return 0, fmt.Errorf("unsupported value type") + } +}