From 2f9bd6316b0d6972e71c283311393a9f74f2038f Mon Sep 17 00:00:00 2001 From: Kristjan Komlosi Date: Wed, 14 Jan 2026 14:06:42 +0100 Subject: [PATCH] feat: add rudimentary autodiscovery --- DEVICE_DISCOVERY_SPEC.md | 241 ++++++++++++++++++++++++++ cmd/server/main.go | 13 ++ internal/handler/discovery.go | 307 ++++++++++++++++++++++++++++++++++ 3 files changed, 561 insertions(+) create mode 100644 DEVICE_DISCOVERY_SPEC.md create mode 100644 internal/handler/discovery.go diff --git a/DEVICE_DISCOVERY_SPEC.md b/DEVICE_DISCOVERY_SPEC.md new file mode 100644 index 0000000..d47dd99 --- /dev/null +++ b/DEVICE_DISCOVERY_SPEC.md @@ -0,0 +1,241 @@ +# Device Discovery Specification + +This specification defines the format and protocol that microcontroller-based IoT devices must follow to register themselves with the Lambda IoT Core system. + +## Overview + +At boot, each device must submit a single JSON message containing: +- Device information (name, description, location) +- List of sensors +- List of actors (actuators) + +All UUIDs are **deterministically generated** based on the device's MAC address to ensure consistency across reboots. + +## UUID Generation + +### Device UUID +Generate a UUID v5 using: +- **Namespace**: `6ba7b810-9dad-11d1-80b4-00c04fd430c8` (DNS namespace) +- **Name**: Device MAC address in lowercase, no separators (e.g., `aabbccddeeff`) + +### Sensor UUID +Generate a UUID v5 using: +- **Namespace**: Device UUID +- **Name**: `sensor-{N}` where N is the 0-based sensor index + +### Actor UUID +Generate a UUID v5 using: +- **Namespace**: Device UUID +- **Name**: `actor-{N}` where N is the 0-based actor index + +## Discovery Message Format + +### MQTT Topic +Devices must publish to: `{MQTT_TOPIC}/discovery` + +Where `{MQTT_TOPIC}` is the configured MQTT topic (e.g., `lambdaiot`, resulting in `lambdaiot/discovery`) + +### Message Structure + +```json +{ + "mac_address": "aa:bb:cc:dd:ee:ff", + "device": { + "id": "550e8400-e29b-41d4-a716-446655440000", + "name": "Living Room Sensor", + "description": "Multi-sensor device for home monitoring", + "location": "Living Room", + "status_id": 1 + }, + "sensors": [ + { + "id": "550e8400-e29b-41d4-a716-446655440001", + "name": "Temperature", + "type": "DHT22", + "data_type_id": 2 + }, + { + "id": "550e8400-e29b-41d4-a716-446655440002", + "name": "Humidity", + "type": "DHT22", + "data_type_id": 2 + }, + { + "id": "550e8400-e29b-41d4-a716-446655440003", + "name": "Motion", + "type": "PIR", + "data_type_id": 1 + } + ], + "actors": [ + { + "id": "550e8400-e29b-41d4-a716-446655440004", + "name": "LED", + "type": "RGB_LED", + "data_type_id": 1 + }, + { + "id": "550e8400-e29b-41d4-a716-446655440005", + "name": "Fan", + "type": "PWM_FAN", + "data_type_id": 2 + } + ] +} +``` + +## Field Definitions + +### Device Object + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `id` | UUID string | Yes | Deterministic UUID v5 based on MAC | +| `name` | String | Yes | Human-readable device name | +| `description` | String | No | Device description | +| `location` | String | No | Physical location (e.g., "Living Room") | +| `status_id` | Integer | Yes | Status code: 1=ok, 2=pending, 3=lost, 4=disabled | + +### Sensor/Actor Object + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `id` | UUID string | Yes | Deterministic UUID v5 (device UUID + "sensor-N" or "actor-N") | +| `name` | String | Yes | Human-readable sensor/actor name | +| `type` | String | Yes | Hardware type (e.g., "DHT22", "PIR", "RGB_LED") | +| `data_type_id` | Integer | Yes | Data type: 1=bool, 2=float | + +### Root Object + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `mac_address` | String | Yes | Device MAC address (lowercase with colons) | +| `device` | Object | Yes | Device information | +| `sensors` | Array | Yes | Array of sensor objects (can be empty) | +| `actors` | Array | Yes | Array of actor objects (can be empty) | + +## Behavior + +### On First Boot +1. Device generates deterministic UUIDs based on its MAC address +2. Device publishes discovery message to `{MQTT_TOPIC}/discovery` +3. Backend receives message, creates device record with status_id=1 (ok) +4. Sensors and actors are created + +### On Subsequent Boots +1. Device uses same deterministic UUIDs (based on MAC) +2. Device publishes discovery message again +3. Backend detects existing device by UUID +4. Device details (name, description, location) are updated +5. Sensors/actors are synchronized: + - New sensors/actors are added + - Existing sensors/actors with same ID but different properties are updated + - Sensors/actors no longer in the message are deleted +6. Device status is set to ok (1) + +### Conflict Resolution +- If a device UUID already exists, it is updated rather than creating a duplicate +- Sensors and actors are matched by UUID, not by position +- Removing a sensor/actor from the discovery message will delete it from the database + +### UUID Collision Handling +Devices with different MACs will generate different UUIDs. If two devices somehow share the same MAC (misconfiguration), they will overwrite each other's records. This is acceptable as MAC duplication is a network configuration error. + +## Data Type Codes + +| Code | Type | Description | +|------|------|-------------| +| 1 | bool | Boolean value (true/false or 0/1) | +| 2 | float | Floating-point numeric value | + +## Status Codes + +| Code | Status | Description | +|------|--------|-------------| +| 1 | ok | Device is functioning normally | +| 2 | pending | Device is initializing | +| 3 | lost | Device has not checked in recently | +| 4 | disabled | Device has been administratively disabled | + +## Example Device Implementations + +### Minimal Device (1 sensor, no actors) +```json +{ + "mac_address": "aa:bb:cc:dd:ee:ff", + "device": { + "id": "550e8400-e29b-41d4-a716-446655440000", + "name": "Temperature Sensor", + "description": "", + "location": "", + "status_id": 1 + }, + "sensors": [ + { + "id": "550e8400-e29b-41d4-a716-446655440001", + "name": "Temp", + "type": "DHT22", + "data_type_id": 2 + } + ], + "actors": [] +} +``` + +### Complex Device (3 sensors, 2 actors) +```json +{ + "mac_address": "11:22:33:44:55:66", + "device": { + "id": "6ba7b811-9dad-11d1-80b4-00c04fd430c8", + "name": "Greenhouse Controller", + "description": "Automated greenhouse monitoring and control", + "location": "Greenhouse A", + "status_id": 1 + }, + "sensors": [ + { + "id": "6ba7b812-9dad-11d1-80b4-00c04fd430c8", + "name": "Temperature", + "type": "DHT22", + "data_type_id": 2 + }, + { + "id": "6ba7b813-9dad-11d1-80b4-00c04fd430c8", + "name": "Humidity", + "type": "DHT22", + "data_type_id": 2 + }, + { + "id": "6ba7b814-9dad-11d1-80b4-00c04fd430c8", + "name": "Soil Moisture", + "type": "Capacitive", + "data_type_id": 2 + } + ], + "actors": [ + { + "id": "6ba7b815-9dad-11d1-80b4-00c04fd430c8", + "name": "Irrigation Pump", + "type": "Relay", + "data_type_id": 1 + }, + { + "id": "6ba7b816-9dad-11d1-80b4-00c04fd430c8", + "name": "Ventilation Fan", + "type": "PWM", + "data_type_id": 2 + } + ] +} +``` + +## Notes for Device Developers + +- Always use the same UUID generation algorithm to ensure consistency +- Preserve sensor/actor order to maintain deterministic IDs +- If hardware changes (sensors added/removed), rearrange the array to maintain backward compatibility, or accept that indices will shift +- Include sufficient description and location information for users to identify devices +- Set status_id to 1 (ok) on successful boot +- Consider publishing discovery message periodically (e.g., every 5 minutes) as a heartbeat +- Use MQTT QoS 1 (at-least-once) for discovery messages diff --git a/cmd/server/main.go b/cmd/server/main.go index 98e761f..c87692b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -103,6 +103,19 @@ func main() { log.Printf("warning: mqtt subscribe failed: %v", err) } } + + // subscribe to discovery topic + discoveryTopic := cfg.MQTT.Topic + "/discovery" + discoveryHandler := func(t string, p []byte) { + log.Printf("device discovery on %s", t) + if err := h.ProcessDiscoveryMessage(p); err != nil { + log.Printf("error processing discovery message: %v", err) + } + } + if err := mq.Subscribe(discoveryTopic, discoveryHandler); err != nil { + log.Printf("warning: mqtt subscribe to discovery failed: %v", err) + } + // publish a startup message (non-blocking) go func() { msg := fmt.Sprintf("lambdaiot-core started on %s", addr) diff --git a/internal/handler/discovery.go b/internal/handler/discovery.go new file mode 100644 index 0000000..a2da935 --- /dev/null +++ b/internal/handler/discovery.go @@ -0,0 +1,307 @@ +package handler + +import ( + "database/sql" + "encoding/json" + "fmt" + "log" +) + +// DiscoveryMessage represents the JSON payload from a device discovery message +type DiscoveryMessage struct { + MACAddress string `json:"mac_address"` + Device DiscoveryDevicePayload `json:"device"` + Sensors []DiscoverySensorPayload `json:"sensors"` + Actors []DiscoveryActorPayload `json:"actors"` +} + +type DiscoveryDevicePayload struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Location string `json:"location"` + StatusID int `json:"status_id"` +} + +type DiscoverySensorPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + DataTypeID int `json:"data_type_id"` +} + +type DiscoveryActorPayload struct { + ID string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + DataTypeID int `json:"data_type_id"` +} + +// ProcessDiscoveryMessage handles incoming device discovery messages +func (h *Handler) ProcessDiscoveryMessage(payload []byte) error { + var msg DiscoveryMessage + if err := json.Unmarshal(payload, &msg); err != nil { + return fmt.Errorf("failed to parse discovery message: %w", err) + } + + if msg.Device.ID == "" { + return fmt.Errorf("discovery message missing device id") + } + + // Check if device already exists + var existingDeviceID string + var existingStatusID int + err := h.DB.QueryRow( + "SELECT BIN_TO_UUID(id), status_id FROM devices WHERE id = UUID_TO_BIN(?)", + msg.Device.ID, + ).Scan(&existingDeviceID, &existingStatusID) + + if err == sql.ErrNoRows { + // New device - create it + return h.createNewDevice(msg) + } + if err != nil { + return fmt.Errorf("failed to query device: %w", err) + } + + // Device exists - update if needed + return h.updateExistingDevice(msg, existingStatusID) +} + +// createNewDevice adds a new device, sensors, and actors to the database +func (h *Handler) createNewDevice(msg DiscoveryMessage) error { + tx, err := h.DB.Begin() + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + + // Insert device + _, err = tx.Exec( + "INSERT INTO devices (id, name, description, location, status_id) VALUES (UUID_TO_BIN(?), ?, ?, ?, ?)", + msg.Device.ID, msg.Device.Name, msg.Device.Description, msg.Device.Location, 1, + ) + if err != nil { + return fmt.Errorf("failed to insert device: %w", err) + } + + // Insert sensors + for _, sensor := range msg.Sensors { + _, err = tx.Exec( + "INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", + sensor.ID, msg.Device.ID, sensor.Name, sensor.Type, sensor.DataTypeID, + ) + if err != nil { + return fmt.Errorf("failed to insert sensor: %w", err) + } + } + + // Insert actors + for _, actor := range msg.Actors { + _, err = tx.Exec( + "INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", + actor.ID, msg.Device.ID, actor.Name, actor.Type, actor.DataTypeID, + ) + if err != nil { + return fmt.Errorf("failed to insert actor: %w", err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.Printf("discovered new device: %s (%s)", msg.Device.Name, msg.Device.ID) + return nil +} + +// updateExistingDevice updates an existing device and its sensors/actors +func (h *Handler) updateExistingDevice(msg DiscoveryMessage, previousStatusID int) error { + tx, err := h.DB.Begin() + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback() + + // Update device (name, description, location always; status to ok if coming from device) + _, err = tx.Exec( + "UPDATE devices SET name = ?, description = ?, location = ?, status_id = 1 WHERE id = UUID_TO_BIN(?)", + msg.Device.Name, msg.Device.Description, msg.Device.Location, msg.Device.ID, + ) + if err != nil { + return fmt.Errorf("failed to update device: %w", err) + } + + // Get existing sensors for this device + existingSensors, err := getDeviceSensors(tx, msg.Device.ID) + if err != nil { + return fmt.Errorf("failed to get existing sensors: %w", err) + } + + // Sync sensors + if err := syncSensors(tx, msg.Device.ID, msg.Sensors, existingSensors); err != nil { + return fmt.Errorf("failed to sync sensors: %w", err) + } + + // Get existing actors for this device + existingActors, err := getDeviceActors(tx, msg.Device.ID) + if err != nil { + return fmt.Errorf("failed to get existing actors: %w", err) + } + + // Sync actors + if err := syncActors(tx, msg.Device.ID, msg.Actors, existingActors); err != nil { + return fmt.Errorf("failed to sync actors: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.Printf("updated device: %s (%s)", msg.Device.Name, msg.Device.ID) + return nil +} + +// getDeviceSensors retrieves existing sensors for a device +func getDeviceSensors(tx *sql.Tx, deviceID string) (map[string]DiscoverySensorPayload, error) { + rows, err := tx.Query( + "SELECT BIN_TO_UUID(id), name, type, data_type_id FROM sensors WHERE device_id = UUID_TO_BIN(?) ORDER BY id", + deviceID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + sensors := make(map[string]DiscoverySensorPayload) + for rows.Next() { + var id, name, typ string + var dataTypeID int + if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil { + return nil, err + } + sensors[id] = DiscoverySensorPayload{ + ID: id, + Name: name, + Type: typ, + DataTypeID: dataTypeID, + } + } + return sensors, rows.Err() +} + +// syncSensors creates, updates, or removes sensors as needed +func syncSensors(tx *sql.Tx, deviceID string, newSensors []DiscoverySensorPayload, existingSensors map[string]DiscoverySensorPayload) error { + // Track which sensors are still present + seenIDs := make(map[string]bool) + + // Insert or update sensors + for _, sensor := range newSensors { + seenIDs[sensor.ID] = true + + existing, exists := existingSensors[sensor.ID] + if !exists { + // New sensor + _, err := tx.Exec( + "INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", + sensor.ID, deviceID, sensor.Name, sensor.Type, sensor.DataTypeID, + ) + if err != nil { + return err + } + } else if existing.Name != sensor.Name || existing.Type != sensor.Type || existing.DataTypeID != sensor.DataTypeID { + // Sensor exists but differs + _, err := tx.Exec( + "UPDATE sensors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)", + sensor.Name, sensor.Type, sensor.DataTypeID, sensor.ID, + ) + if err != nil { + return err + } + } + } + + // Delete sensors that are no longer present + for sensorID := range existingSensors { + if !seenIDs[sensorID] { + _, err := tx.Exec("DELETE FROM sensors WHERE id = UUID_TO_BIN(?)", sensorID) + if err != nil { + return err + } + } + } + + return nil +} + +// getDeviceActors retrieves existing actors for a device +func getDeviceActors(tx *sql.Tx, deviceID string) (map[string]DiscoveryActorPayload, error) { + rows, err := tx.Query( + "SELECT BIN_TO_UUID(id), name, type, data_type_id FROM actors WHERE device_id = UUID_TO_BIN(?) ORDER BY id", + deviceID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + + actors := make(map[string]DiscoveryActorPayload) + for rows.Next() { + var id, name, typ string + var dataTypeID int + if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil { + return nil, err + } + actors[id] = DiscoveryActorPayload{ + ID: id, + Name: name, + Type: typ, + DataTypeID: dataTypeID, + } + } + return actors, rows.Err() +} + +// syncActors creates, updates, or removes actors as needed +func syncActors(tx *sql.Tx, deviceID string, newActors []DiscoveryActorPayload, existingActors map[string]DiscoveryActorPayload) error { + // Track which actors are still present + seenIDs := make(map[string]bool) + + // Insert or update actors + for _, actor := range newActors { + seenIDs[actor.ID] = true + + existing, exists := existingActors[actor.ID] + if !exists { + // New actor + _, err := tx.Exec( + "INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", + actor.ID, deviceID, actor.Name, actor.Type, actor.DataTypeID, + ) + if err != nil { + return err + } + } else if existing.Name != actor.Name || existing.Type != actor.Type || existing.DataTypeID != actor.DataTypeID { + // Actor exists but differs + _, err := tx.Exec( + "UPDATE actors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)", + actor.Name, actor.Type, actor.DataTypeID, actor.ID, + ) + if err != nil { + return err + } + } + } + + // Delete actors that are no longer present + for actorID := range existingActors { + if !seenIDs[actorID] { + _, err := tx.Exec("DELETE FROM actors WHERE id = UUID_TO_BIN(?)", actorID) + if err != nil { + return err + } + } + } + + return nil +}