package handler import ( "database/sql" "encoding/json" "fmt" "log" ) // DiscoveryMessage represents the JSON payload from a device discovery message type DiscoveryMessage struct { MACAddress string `json:"mac_address"` Device DiscoveryDevicePayload `json:"device"` Sensors []DiscoverySensorPayload `json:"sensors"` Actors []DiscoveryActorPayload `json:"actors"` } type DiscoveryDevicePayload struct { ID string `json:"id"` Name string `json:"name"` Description string `json:"description"` Location string `json:"location"` StatusID int `json:"status_id"` } type DiscoverySensorPayload struct { ID string `json:"id"` Name string `json:"name"` Type string `json:"type"` DataTypeID int `json:"data_type_id"` } type DiscoveryActorPayload struct { ID string `json:"id"` Name string `json:"name"` Type string `json:"type"` DataTypeID int `json:"data_type_id"` } // ProcessDiscoveryMessage handles incoming device discovery messages func (h *Handler) ProcessDiscoveryMessage(payload []byte) error { var msg DiscoveryMessage if err := json.Unmarshal(payload, &msg); err != nil { return fmt.Errorf("failed to parse discovery message: %w", err) } if msg.Device.ID == "" { return fmt.Errorf("discovery message missing device id") } // Check if device already exists var existingDeviceID string var existingStatusID int err := h.DB.QueryRow( "SELECT BIN_TO_UUID(id), status_id FROM devices WHERE id = UUID_TO_BIN(?)", msg.Device.ID, ).Scan(&existingDeviceID, &existingStatusID) if err == sql.ErrNoRows { // New device - create it return h.createNewDevice(msg) } if err != nil { return fmt.Errorf("failed to query device: %w", err) } // Device exists - update if needed return h.updateExistingDevice(msg, existingStatusID) } // createNewDevice adds a new device, sensors, and actors to the database func (h *Handler) createNewDevice(msg DiscoveryMessage) error { tx, err := h.DB.Begin() if err != nil { return fmt.Errorf("failed to start transaction: %w", err) } defer tx.Rollback() // Insert device _, err = tx.Exec( "INSERT INTO devices (id, name, description, location, status_id) VALUES (UUID_TO_BIN(?), ?, ?, ?, ?)", msg.Device.ID, msg.Device.Name, msg.Device.Description, msg.Device.Location, 1, ) if err != nil { return fmt.Errorf("failed to insert device: %w", err) } // Insert sensors for _, sensor := range msg.Sensors { _, err = tx.Exec( "INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", sensor.ID, msg.Device.ID, sensor.Name, sensor.Type, sensor.DataTypeID, ) if err != nil { return fmt.Errorf("failed to insert sensor: %w", err) } } // Insert actors for _, actor := range msg.Actors { _, err = tx.Exec( "INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", actor.ID, msg.Device.ID, actor.Name, actor.Type, actor.DataTypeID, ) if err != nil { return fmt.Errorf("failed to insert actor: %w", err) } } if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } log.Printf("discovered new device: %s (%s)", msg.Device.Name, msg.Device.ID) return nil } // updateExistingDevice updates an existing device and its sensors/actors func (h *Handler) updateExistingDevice(msg DiscoveryMessage, previousStatusID int) error { tx, err := h.DB.Begin() if err != nil { return fmt.Errorf("failed to start transaction: %w", err) } defer tx.Rollback() // Update device (name, description, location always; status to ok if coming from device) _, err = tx.Exec( "UPDATE devices SET name = ?, description = ?, location = ?, status_id = 1 WHERE id = UUID_TO_BIN(?)", msg.Device.Name, msg.Device.Description, msg.Device.Location, msg.Device.ID, ) if err != nil { return fmt.Errorf("failed to update device: %w", err) } // Get existing sensors for this device existingSensors, err := getDeviceSensors(tx, msg.Device.ID) if err != nil { return fmt.Errorf("failed to get existing sensors: %w", err) } // Sync sensors if err := syncSensors(tx, msg.Device.ID, msg.Sensors, existingSensors); err != nil { return fmt.Errorf("failed to sync sensors: %w", err) } // Get existing actors for this device existingActors, err := getDeviceActors(tx, msg.Device.ID) if err != nil { return fmt.Errorf("failed to get existing actors: %w", err) } // Sync actors if err := syncActors(tx, msg.Device.ID, msg.Actors, existingActors); err != nil { return fmt.Errorf("failed to sync actors: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } log.Printf("updated device: %s (%s)", msg.Device.Name, msg.Device.ID) return nil } // getDeviceSensors retrieves existing sensors for a device< func getDeviceSensors(tx *sql.Tx, deviceID string) (map[string]DiscoverySensorPayload, error) { rows, err := tx.Query( "SELECT BIN_TO_UUID(id), name, type, data_type_id FROM sensors WHERE device_id = UUID_TO_BIN(?) ORDER BY id", deviceID, ) if err != nil { return nil, err } defer rows.Close() sensors := make(map[string]DiscoverySensorPayload) for rows.Next() { var id, name, typ string var dataTypeID int if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil { return nil, err } sensors[id] = DiscoverySensorPayload{ ID: id, Name: name, Type: typ, DataTypeID: dataTypeID, } } return sensors, rows.Err() } // syncSensors creates, updates, or removes sensors as needed func syncSensors(tx *sql.Tx, deviceID string, newSensors []DiscoverySensorPayload, existingSensors map[string]DiscoverySensorPayload) error { // Track which sensors are still present seenIDs := make(map[string]bool) // Insert or update sensors for _, sensor := range newSensors { seenIDs[sensor.ID] = true existing, exists := existingSensors[sensor.ID] if !exists { // New sensor _, err := tx.Exec( "INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", sensor.ID, deviceID, sensor.Name, sensor.Type, sensor.DataTypeID, ) if err != nil { return err } } else if existing.Name != sensor.Name || existing.Type != sensor.Type || existing.DataTypeID != sensor.DataTypeID { // Sensor exists but differs _, err := tx.Exec( "UPDATE sensors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)", sensor.Name, sensor.Type, sensor.DataTypeID, sensor.ID, ) if err != nil { return err } } } // Delete sensors that are no longer present for sensorID := range existingSensors { if !seenIDs[sensorID] { _, err := tx.Exec("DELETE FROM sensors WHERE id = UUID_TO_BIN(?)", sensorID) if err != nil { return err } } } return nil } // getDeviceActors retrieves existing actors for a device func getDeviceActors(tx *sql.Tx, deviceID string) (map[string]DiscoveryActorPayload, error) { rows, err := tx.Query( "SELECT BIN_TO_UUID(id), name, type, data_type_id FROM actors WHERE device_id = UUID_TO_BIN(?) ORDER BY id", deviceID, ) if err != nil { return nil, err } defer rows.Close() actors := make(map[string]DiscoveryActorPayload) for rows.Next() { var id, name, typ string var dataTypeID int if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil { return nil, err } actors[id] = DiscoveryActorPayload{ ID: id, Name: name, Type: typ, DataTypeID: dataTypeID, } } return actors, rows.Err() } // syncActors creates, updates, or removes actors as needed func syncActors(tx *sql.Tx, deviceID string, newActors []DiscoveryActorPayload, existingActors map[string]DiscoveryActorPayload) error { // Track which actors are still present seenIDs := make(map[string]bool) // Insert or update actors for _, actor := range newActors { seenIDs[actor.ID] = true existing, exists := existingActors[actor.ID] if !exists { // New actor _, err := tx.Exec( "INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)", actor.ID, deviceID, actor.Name, actor.Type, actor.DataTypeID, ) if err != nil { return err } } else if existing.Name != actor.Name || existing.Type != actor.Type || existing.DataTypeID != actor.DataTypeID { // Actor exists but differs _, err := tx.Exec( "UPDATE actors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)", actor.Name, actor.Type, actor.DataTypeID, actor.ID, ) if err != nil { return err } } } // Delete actors that are no longer present for actorID := range existingActors { if !seenIDs[actorID] { _, err := tx.Exec("DELETE FROM actors WHERE id = UUID_TO_BIN(?)", actorID) if err != nil { return err } } } return nil }