99 lines
2.3 KiB
Go
99 lines
2.3 KiB
Go
package handler
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// ProcessSensorReadingMessage parses a sensor reading MQTT message and stores it.
|
|
// Returns handled=false when the payload does not look like a sensor reading.
|
|
func (h *Handler) ProcessSensorReadingMessage(payload []byte) (bool, error) {
|
|
if h == nil || h.DB == nil {
|
|
return false, nil
|
|
}
|
|
|
|
var msg struct {
|
|
Type string `json:"type"`
|
|
SensorID string `json:"sensor_id"`
|
|
Value json.RawMessage `json:"value"`
|
|
ValueAt string `json:"value_at"`
|
|
}
|
|
if err := json.Unmarshal(payload, &msg); err != nil {
|
|
return false, nil
|
|
}
|
|
|
|
msgType := strings.ToLower(strings.TrimSpace(msg.Type))
|
|
if msgType != "" && msgType != "sensor_reading" && msgType != "sensor_value" {
|
|
return false, nil
|
|
}
|
|
if strings.TrimSpace(msg.SensorID) == "" || len(msg.Value) == 0 {
|
|
return false, nil
|
|
}
|
|
|
|
value, err := parseSensorValue(msg.Value)
|
|
if err != nil {
|
|
return true, fmt.Errorf("invalid sensor value: %w", err)
|
|
}
|
|
|
|
valueAt := time.Now().UTC()
|
|
if strings.TrimSpace(msg.ValueAt) != "" {
|
|
parsed, err := time.Parse(time.RFC3339, msg.ValueAt)
|
|
if err != nil {
|
|
return true, fmt.Errorf("invalid value_at: %w", err)
|
|
}
|
|
valueAt = parsed
|
|
}
|
|
|
|
var exists int
|
|
if err := h.DB.QueryRow("SELECT COUNT(1) FROM sensors WHERE id = UUID_TO_BIN(?)", msg.SensorID).Scan(&exists); err != nil {
|
|
return true, fmt.Errorf("failed to check sensor: %w", err)
|
|
}
|
|
if exists == 0 {
|
|
return true, fmt.Errorf("sensor not found: %s", msg.SensorID)
|
|
}
|
|
|
|
if _, err := h.DB.Exec(
|
|
"INSERT INTO sensor_readings (sensor_id, value, value_at) VALUES (UUID_TO_BIN(?), ?, ?)",
|
|
msg.SensorID,
|
|
value,
|
|
valueAt,
|
|
); err != nil {
|
|
return true, fmt.Errorf("failed to insert sensor reading: %w", err)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func parseSensorValue(raw json.RawMessage) (float64, error) {
|
|
var v any
|
|
if err := json.Unmarshal(raw, &v); err != nil {
|
|
return 0, err
|
|
}
|
|
switch t := v.(type) {
|
|
case float64:
|
|
return t, nil
|
|
case bool:
|
|
if t {
|
|
return 1, nil
|
|
}
|
|
return 0, nil
|
|
case string:
|
|
if strings.EqualFold(t, "true") {
|
|
return 1, nil
|
|
}
|
|
if strings.EqualFold(t, "false") {
|
|
return 0, nil
|
|
}
|
|
f, err := strconv.ParseFloat(t, 64)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return f, nil
|
|
default:
|
|
return 0, fmt.Errorf("unsupported value type")
|
|
}
|
|
}
|