308 lines
8.8 KiB
Go
308 lines
8.8 KiB
Go
package handler
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
)
|
|
|
|
// DiscoveryMessage represents the JSON payload from a device discovery message
|
|
type DiscoveryMessage struct {
|
|
MACAddress string `json:"mac_address"`
|
|
Device DiscoveryDevicePayload `json:"device"`
|
|
Sensors []DiscoverySensorPayload `json:"sensors"`
|
|
Actors []DiscoveryActorPayload `json:"actors"`
|
|
}
|
|
|
|
type DiscoveryDevicePayload struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
Location string `json:"location"`
|
|
StatusID int `json:"status_id"`
|
|
}
|
|
|
|
type DiscoverySensorPayload struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
DataTypeID int `json:"data_type_id"`
|
|
}
|
|
|
|
type DiscoveryActorPayload struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
DataTypeID int `json:"data_type_id"`
|
|
}
|
|
|
|
// ProcessDiscoveryMessage handles incoming device discovery messages
|
|
func (h *Handler) ProcessDiscoveryMessage(payload []byte) error {
|
|
var msg DiscoveryMessage
|
|
if err := json.Unmarshal(payload, &msg); err != nil {
|
|
return fmt.Errorf("failed to parse discovery message: %w", err)
|
|
}
|
|
|
|
if msg.Device.ID == "" {
|
|
return fmt.Errorf("discovery message missing device id")
|
|
}
|
|
|
|
// Check if device already exists
|
|
var existingDeviceID string
|
|
var existingStatusID int
|
|
err := h.DB.QueryRow(
|
|
"SELECT BIN_TO_UUID(id), status_id FROM devices WHERE id = UUID_TO_BIN(?)",
|
|
msg.Device.ID,
|
|
).Scan(&existingDeviceID, &existingStatusID)
|
|
|
|
if err == sql.ErrNoRows {
|
|
// New device - create it
|
|
return h.createNewDevice(msg)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to query device: %w", err)
|
|
}
|
|
|
|
// Device exists - update if needed
|
|
return h.updateExistingDevice(msg, existingStatusID)
|
|
}
|
|
|
|
// createNewDevice adds a new device, sensors, and actors to the database
|
|
func (h *Handler) createNewDevice(msg DiscoveryMessage) error {
|
|
tx, err := h.DB.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Insert device
|
|
_, err = tx.Exec(
|
|
"INSERT INTO devices (id, name, description, location, status_id) VALUES (UUID_TO_BIN(?), ?, ?, ?, ?)",
|
|
msg.Device.ID, msg.Device.Name, msg.Device.Description, msg.Device.Location, 1,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert device: %w", err)
|
|
}
|
|
|
|
// Insert sensors
|
|
for _, sensor := range msg.Sensors {
|
|
_, err = tx.Exec(
|
|
"INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
|
sensor.ID, msg.Device.ID, sensor.Name, sensor.Type, sensor.DataTypeID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert sensor: %w", err)
|
|
}
|
|
}
|
|
|
|
// Insert actors
|
|
for _, actor := range msg.Actors {
|
|
_, err = tx.Exec(
|
|
"INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
|
actor.ID, msg.Device.ID, actor.Name, actor.Type, actor.DataTypeID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert actor: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
|
}
|
|
|
|
log.Printf("discovered new device: %s (%s)", msg.Device.Name, msg.Device.ID)
|
|
return nil
|
|
}
|
|
|
|
// updateExistingDevice updates an existing device and its sensors/actors
|
|
func (h *Handler) updateExistingDevice(msg DiscoveryMessage, previousStatusID int) error {
|
|
tx, err := h.DB.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// Update device (name, description, location always; status to ok if coming from device)
|
|
_, err = tx.Exec(
|
|
"UPDATE devices SET name = ?, description = ?, location = ?, status_id = 1 WHERE id = UUID_TO_BIN(?)",
|
|
msg.Device.Name, msg.Device.Description, msg.Device.Location, msg.Device.ID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update device: %w", err)
|
|
}
|
|
|
|
// Get existing sensors for this device
|
|
existingSensors, err := getDeviceSensors(tx, msg.Device.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get existing sensors: %w", err)
|
|
}
|
|
|
|
// Sync sensors
|
|
if err := syncSensors(tx, msg.Device.ID, msg.Sensors, existingSensors); err != nil {
|
|
return fmt.Errorf("failed to sync sensors: %w", err)
|
|
}
|
|
|
|
// Get existing actors for this device
|
|
existingActors, err := getDeviceActors(tx, msg.Device.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get existing actors: %w", err)
|
|
}
|
|
|
|
// Sync actors
|
|
if err := syncActors(tx, msg.Device.ID, msg.Actors, existingActors); err != nil {
|
|
return fmt.Errorf("failed to sync actors: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
|
}
|
|
|
|
log.Printf("updated device: %s (%s)", msg.Device.Name, msg.Device.ID)
|
|
return nil
|
|
}
|
|
|
|
// getDeviceSensors retrieves existing sensors for a device<
|
|
func getDeviceSensors(tx *sql.Tx, deviceID string) (map[string]DiscoverySensorPayload, error) {
|
|
rows, err := tx.Query(
|
|
"SELECT BIN_TO_UUID(id), name, type, data_type_id FROM sensors WHERE device_id = UUID_TO_BIN(?) ORDER BY id",
|
|
deviceID,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
sensors := make(map[string]DiscoverySensorPayload)
|
|
for rows.Next() {
|
|
var id, name, typ string
|
|
var dataTypeID int
|
|
if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil {
|
|
return nil, err
|
|
}
|
|
sensors[id] = DiscoverySensorPayload{
|
|
ID: id,
|
|
Name: name,
|
|
Type: typ,
|
|
DataTypeID: dataTypeID,
|
|
}
|
|
}
|
|
return sensors, rows.Err()
|
|
}
|
|
|
|
// syncSensors creates, updates, or removes sensors as needed
|
|
func syncSensors(tx *sql.Tx, deviceID string, newSensors []DiscoverySensorPayload, existingSensors map[string]DiscoverySensorPayload) error {
|
|
// Track which sensors are still present
|
|
seenIDs := make(map[string]bool)
|
|
|
|
// Insert or update sensors
|
|
for _, sensor := range newSensors {
|
|
seenIDs[sensor.ID] = true
|
|
|
|
existing, exists := existingSensors[sensor.ID]
|
|
if !exists {
|
|
// New sensor
|
|
_, err := tx.Exec(
|
|
"INSERT INTO sensors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
|
sensor.ID, deviceID, sensor.Name, sensor.Type, sensor.DataTypeID,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else if existing.Name != sensor.Name || existing.Type != sensor.Type || existing.DataTypeID != sensor.DataTypeID {
|
|
// Sensor exists but differs
|
|
_, err := tx.Exec(
|
|
"UPDATE sensors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)",
|
|
sensor.Name, sensor.Type, sensor.DataTypeID, sensor.ID,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete sensors that are no longer present
|
|
for sensorID := range existingSensors {
|
|
if !seenIDs[sensorID] {
|
|
_, err := tx.Exec("DELETE FROM sensors WHERE id = UUID_TO_BIN(?)", sensorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getDeviceActors retrieves existing actors for a device
|
|
func getDeviceActors(tx *sql.Tx, deviceID string) (map[string]DiscoveryActorPayload, error) {
|
|
rows, err := tx.Query(
|
|
"SELECT BIN_TO_UUID(id), name, type, data_type_id FROM actors WHERE device_id = UUID_TO_BIN(?) ORDER BY id",
|
|
deviceID,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
actors := make(map[string]DiscoveryActorPayload)
|
|
for rows.Next() {
|
|
var id, name, typ string
|
|
var dataTypeID int
|
|
if err := rows.Scan(&id, &name, &typ, &dataTypeID); err != nil {
|
|
return nil, err
|
|
}
|
|
actors[id] = DiscoveryActorPayload{
|
|
ID: id,
|
|
Name: name,
|
|
Type: typ,
|
|
DataTypeID: dataTypeID,
|
|
}
|
|
}
|
|
return actors, rows.Err()
|
|
}
|
|
|
|
// syncActors creates, updates, or removes actors as needed
|
|
func syncActors(tx *sql.Tx, deviceID string, newActors []DiscoveryActorPayload, existingActors map[string]DiscoveryActorPayload) error {
|
|
// Track which actors are still present
|
|
seenIDs := make(map[string]bool)
|
|
|
|
// Insert or update actors
|
|
for _, actor := range newActors {
|
|
seenIDs[actor.ID] = true
|
|
|
|
existing, exists := existingActors[actor.ID]
|
|
if !exists {
|
|
// New actor
|
|
_, err := tx.Exec(
|
|
"INSERT INTO actors (id, device_id, name, type, data_type_id) VALUES (UUID_TO_BIN(?), UUID_TO_BIN(?), ?, ?, ?)",
|
|
actor.ID, deviceID, actor.Name, actor.Type, actor.DataTypeID,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else if existing.Name != actor.Name || existing.Type != actor.Type || existing.DataTypeID != actor.DataTypeID {
|
|
// Actor exists but differs
|
|
_, err := tx.Exec(
|
|
"UPDATE actors SET name = ?, type = ?, data_type_id = ? WHERE id = UUID_TO_BIN(?)",
|
|
actor.Name, actor.Type, actor.DataTypeID, actor.ID,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete actors that are no longer present
|
|
for actorID := range existingActors {
|
|
if !seenIDs[actorID] {
|
|
_, err := tx.Exec("DELETE FROM actors WHERE id = UUID_TO_BIN(?)", actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|