Files
lambdaiot-core/internal/handler/sensor_readings_mqtt.go

99 lines
2.3 KiB
Go

package handler
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
)
// ProcessSensorReadingMessage parses a sensor reading MQTT message and stores it.
// Returns handled=false when the payload does not look like a sensor reading.
func (h *Handler) ProcessSensorReadingMessage(payload []byte) (bool, error) {
if h == nil || h.DB == nil {
return false, nil
}
var msg struct {
Type string `json:"type"`
SensorID string `json:"sensor_id"`
Value json.RawMessage `json:"value"`
ValueAt string `json:"value_at"`
}
if err := json.Unmarshal(payload, &msg); err != nil {
return false, nil
}
msgType := strings.ToLower(strings.TrimSpace(msg.Type))
if msgType != "" && msgType != "sensor_reading" && msgType != "sensor_value" {
return false, nil
}
if strings.TrimSpace(msg.SensorID) == "" || len(msg.Value) == 0 {
return false, nil
}
value, err := parseSensorValue(msg.Value)
if err != nil {
return true, fmt.Errorf("invalid sensor value: %w", err)
}
valueAt := time.Now().UTC()
if strings.TrimSpace(msg.ValueAt) != "" {
parsed, err := time.Parse(time.RFC3339, msg.ValueAt)
if err != nil {
return true, fmt.Errorf("invalid value_at: %w", err)
}
valueAt = parsed
}
var exists int
if err := h.DB.QueryRow("SELECT COUNT(1) FROM sensors WHERE id = UUID_TO_BIN(?)", msg.SensorID).Scan(&exists); err != nil {
return true, fmt.Errorf("failed to check sensor: %w", err)
}
if exists == 0 {
return true, fmt.Errorf("sensor not found: %s", msg.SensorID)
}
if _, err := h.DB.Exec(
"INSERT INTO sensor_readings (sensor_id, value, value_at) VALUES (UUID_TO_BIN(?), ?, ?)",
msg.SensorID,
value,
valueAt,
); err != nil {
return true, fmt.Errorf("failed to insert sensor reading: %w", err)
}
return true, nil
}
func parseSensorValue(raw json.RawMessage) (float64, error) {
var v any
if err := json.Unmarshal(raw, &v); err != nil {
return 0, err
}
switch t := v.(type) {
case float64:
return t, nil
case bool:
if t {
return 1, nil
}
return 0, nil
case string:
if strings.EqualFold(t, "true") {
return 1, nil
}
if strings.EqualFold(t, "false") {
return 0, nil
}
f, err := strconv.ParseFloat(t, 64)
if err != nil {
return 0, err
}
return f, nil
default:
return 0, fmt.Errorf("unsupported value type")
}
}