feat: add rudimentary autodiscovery
This commit is contained in:
@@ -0,0 +1,307 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
// DiscoveryMessage represents the JSON payload from a device discovery message
|
||||
type DiscoveryMessage struct {
|
||||
MACAddress string `json:"mac_address"`
|
||||
Device DiscoveryDevicePayload `json:"device"`
|
||||
Sensors []DiscoverySensorPayload `json:"sensors"`
|
||||
Actors []DiscoveryActorPayload `json:"actors"`
|
||||
}
|
||||
|
||||
type DiscoveryDevicePayload struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
Location string `json:"location"`
|
||||
StatusID int `json:"status_id"`
|
||||
}
|
||||
|
||||
type DiscoverySensorPayload struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
DataTypeID int `json:"data_type_id"`
|
||||
}
|
||||
|
||||
type DiscoveryActorPayload struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
DataTypeID int `json:"data_type_id"`
|
||||
}
|
||||
|
||||
// ProcessDiscoveryMessage handles incoming device discovery messages
|
||||
func (h *Handler) ProcessDiscoveryMessage(payload []byte) error {
|
||||
var msg DiscoveryMessage
|
||||
if err := json.Unmarshal(payload, &msg); err != nil {
|
||||
return fmt.Errorf("failed to parse discovery message: %w", err)
|
||||
}
|
||||
|
||||
if msg.Device.ID == "" {
|
||||
return fmt.Errorf("discovery message missing device id")
|
||||
}
|
||||
|
||||
// Check if device already exists
|
||||
var existingDeviceID string
|
||||
var existingStatusID int
|
||||
err := h.DB.QueryRow(
|
||||
"SELECT BIN_TO_UUID(id), status_id FROM devices WHERE id = UUID_TO_BIN(?)",
|
||||
msg.Device.ID,
|
||||
).Scan(&existingDeviceID, &existingStatusID)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
// New device - create it
|
||||
return h.createNewDevice(msg)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query device: %w", err)
|
||||
}
|
||||
|
||||
// Device exists - update if needed
|
||||
return h.updateExistingDevice(msg, existingStatusID)
|
||||
}
|
||||
|
||||
// createNewDevice adds a new device, sensors, and actors to the database
|
||||
func (h *Handler) createNewDevice(msg DiscoveryMessage) error {
|
||||
tx, err := h.DB.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Insert device
|
||||
_, err = tx.Exec(
|
||||
"INSERT INTO devices (id, name, description, location, status_id) VALUES (UUID_TO_BIN(?), ?, ?, ?, ?)",
|
||||
msg.Device.ID, msg.Device.Name, msg.Device.Description, msg.Device.Location, 1,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert device: %w", err)
|
||||
}
|
||||
|
||||
// Insert sensors
|
||||
for _, sensor := range msg.Sensors {
|
||||
_, err = tx.Exec(
|
||||
"INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
||||
sensor.ID, msg.Device.ID, sensor.Name, sensor.Type, sensor.DataTypeID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert sensor: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert actors
|
||||
for _, actor := range msg.Actors {
|
||||
_, err = tx.Exec(
|
||||
"INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
||||
actor.ID, msg.Device.ID, actor.Name, actor.Type, actor.DataTypeID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert actor: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("discovered new device: %s (%s)", msg.Device.Name, msg.Device.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateExistingDevice updates an existing device and its sensors/actors
|
||||
func (h *Handler) updateExistingDevice(msg DiscoveryMessage, previousStatusID int) error {
|
||||
tx, err := h.DB.Begin()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Update device (name, description, location always; status to ok if coming from device)
|
||||
_, err = tx.Exec(
|
||||
"UPDATE devices SET name = ?, description = ?, location = ?, status_id = 1 WHERE id = UUID_TO_BIN(?)",
|
||||
msg.Device.Name, msg.Device.Description, msg.Device.Location, msg.Device.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update device: %w", err)
|
||||
}
|
||||
|
||||
// Get existing sensors for this device
|
||||
existingSensors, err := getDeviceSensors(tx, msg.Device.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get existing sensors: %w", err)
|
||||
}
|
||||
|
||||
// Sync sensors
|
||||
if err := syncSensors(tx, msg.Device.ID, msg.Sensors, existingSensors); err != nil {
|
||||
return fmt.Errorf("failed to sync sensors: %w", err)
|
||||
}
|
||||
|
||||
// Get existing actors for this device
|
||||
existingActors, err := getDeviceActors(tx, msg.Device.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get existing actors: %w", err)
|
||||
}
|
||||
|
||||
// Sync actors
|
||||
if err := syncActors(tx, msg.Device.ID, msg.Actors, existingActors); err != nil {
|
||||
return fmt.Errorf("failed to sync actors: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("updated device: %s (%s)", msg.Device.Name, msg.Device.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getDeviceSensors retrieves existing sensors for a device
|
||||
func getDeviceSensors(tx *sql.Tx, deviceID string) (map[string]DiscoverySensorPayload, error) {
|
||||
rows, err := tx.Query(
|
||||
"SELECT BIN_TO_UUID(id), name, type, data_type_id FROM sensors WHERE device_id = UUID_TO_BIN(?) ORDER BY id",
|
||||
deviceID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
sensors := make(map[string]DiscoverySensorPayload)
|
||||
for rows.Next() {
|
||||
var id, name, typ string
|
||||
var dataTypeID int
|
||||
if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sensors[id] = DiscoverySensorPayload{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Type: typ,
|
||||
DataTypeID: dataTypeID,
|
||||
}
|
||||
}
|
||||
return sensors, rows.Err()
|
||||
}
|
||||
|
||||
// syncSensors creates, updates, or removes sensors as needed
|
||||
func syncSensors(tx *sql.Tx, deviceID string, newSensors []DiscoverySensorPayload, existingSensors map[string]DiscoverySensorPayload) error {
|
||||
// Track which sensors are still present
|
||||
seenIDs := make(map[string]bool)
|
||||
|
||||
// Insert or update sensors
|
||||
for _, sensor := range newSensors {
|
||||
seenIDs[sensor.ID] = true
|
||||
|
||||
existing, exists := existingSensors[sensor.ID]
|
||||
if !exists {
|
||||
// New sensor
|
||||
_, err := tx.Exec(
|
||||
"INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
||||
sensor.ID, deviceID, sensor.Name, sensor.Type, sensor.DataTypeID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if existing.Name != sensor.Name || existing.Type != sensor.Type || existing.DataTypeID != sensor.DataTypeID {
|
||||
// Sensor exists but differs
|
||||
_, err := tx.Exec(
|
||||
"UPDATE sensors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)",
|
||||
sensor.Name, sensor.Type, sensor.DataTypeID, sensor.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete sensors that are no longer present
|
||||
for sensorID := range existingSensors {
|
||||
if !seenIDs[sensorID] {
|
||||
_, err := tx.Exec("DELETE FROM sensors WHERE id = UUID_TO_BIN(?)", sensorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getDeviceActors retrieves existing actors for a device
|
||||
func getDeviceActors(tx *sql.Tx, deviceID string) (map[string]DiscoveryActorPayload, error) {
|
||||
rows, err := tx.Query(
|
||||
"SELECT BIN_TO_UUID(id), name, type, data_type_id FROM actors WHERE device_id = UUID_TO_BIN(?) ORDER BY id",
|
||||
deviceID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
actors := make(map[string]DiscoveryActorPayload)
|
||||
for rows.Next() {
|
||||
var id, name, typ string
|
||||
var dataTypeID int
|
||||
if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
actors[id] = DiscoveryActorPayload{
|
||||
ID: id,
|
||||
Name: name,
|
||||
Type: typ,
|
||||
DataTypeID: dataTypeID,
|
||||
}
|
||||
}
|
||||
return actors, rows.Err()
|
||||
}
|
||||
|
||||
// syncActors creates, updates, or removes actors as needed
|
||||
func syncActors(tx *sql.Tx, deviceID string, newActors []DiscoveryActorPayload, existingActors map[string]DiscoveryActorPayload) error {
|
||||
// Track which actors are still present
|
||||
seenIDs := make(map[string]bool)
|
||||
|
||||
// Insert or update actors
|
||||
for _, actor := range newActors {
|
||||
seenIDs[actor.ID] = true
|
||||
|
||||
existing, exists := existingActors[actor.ID]
|
||||
if !exists {
|
||||
// New actor
|
||||
_, err := tx.Exec(
|
||||
"INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
||||
actor.ID, deviceID, actor.Name, actor.Type, actor.DataTypeID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if existing.Name != actor.Name || existing.Type != actor.Type || existing.DataTypeID != actor.DataTypeID {
|
||||
// Actor exists but differs
|
||||
_, err := tx.Exec(
|
||||
"UPDATE actors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)",
|
||||
actor.Name, actor.Type, actor.DataTypeID, actor.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete actors that are no longer present
|
||||
for actorID := range existingActors {
|
||||
if !seenIDs[actorID] {
|
||||
_, err := tx.Exec("DELETE FROM actors WHERE id = UUID_TO_BIN(?)", actorID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user