fix: devicecheck biznis logika
This commit is contained in:
@@ -140,10 +140,10 @@ Retrieve all devices in the system (public read access).
|
|||||||
```
|
```
|
||||||
|
|
||||||
**Status IDs**:
|
**Status IDs**:
|
||||||
- `1`: OK/Online
|
- `1`: ok
|
||||||
- `2`: Warning
|
- `2`: pending
|
||||||
- `3`: Error
|
- `3`: lost
|
||||||
- `4`: Offline
|
- `4`: disabled
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -840,6 +840,57 @@ Devices can announce themselves by publishing to the discovery topic:
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
### Device Check (State Polling)
|
||||||
|
|
||||||
|
The server periodically publishes device check requests to the main MQTT topic and
|
||||||
|
expects devices to respond on the same topic.
|
||||||
|
|
||||||
|
**Topic**: `{configured_mqtt_topic}`
|
||||||
|
|
||||||
|
**Request Message Published**:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "device_check_request",
|
||||||
|
"device_id": "uuid",
|
||||||
|
"requested_at": "2026-01-14T10:00:00Z"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Expected Device Response**:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "device_check_response",
|
||||||
|
"device_id": "uuid",
|
||||||
|
"status": "ok" | "lost" | "pending"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Sensor Readings via MQTT
|
||||||
|
|
||||||
|
Devices can publish sensor readings directly to the main MQTT topic. The server
|
||||||
|
stores readings only if the `sensor_id` exists in the database.
|
||||||
|
|
||||||
|
**Topic**: `{configured_mqtt_topic}`
|
||||||
|
|
||||||
|
**Message Format**:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "sensor_reading",
|
||||||
|
"sensor_id": "uuid",
|
||||||
|
"value": 23.7,
|
||||||
|
"value_at": "2026-01-14T10:00:00Z"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Notes**:
|
||||||
|
- `type` may also be `sensor_value`.
|
||||||
|
- `value` may be a number, boolean, or numeric string (e.g., "1", "0", "23.7").
|
||||||
|
- `value_at` is optional; if omitted, the server uses the current time.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Error Responses
|
## Error Responses
|
||||||
|
|
||||||
All endpoints may return the following standard error responses:
|
All endpoints may return the following standard error responses:
|
||||||
|
|||||||
@@ -78,6 +78,9 @@ func main() {
|
|||||||
if checker != nil {
|
if checker != nil {
|
||||||
checker.HandleMessage(p)
|
checker.HandleMessage(p)
|
||||||
}
|
}
|
||||||
|
if handled, err := h.ProcessSensorReadingMessage(p); handled && err != nil {
|
||||||
|
log.Printf("sensor reading message error: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil {
|
if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil {
|
||||||
log.Printf("warning: mqtt subscribe failed: %v", err)
|
log.Printf("warning: mqtt subscribe failed: %v", err)
|
||||||
|
|||||||
@@ -48,6 +48,9 @@ func New(db *sql.DB, mq *mqtt.Client, topic string, interval, timeout time.Durat
|
|||||||
|
|
||||||
// Start launches the periodic checker until the context is canceled.
|
// Start launches the periodic checker until the context is canceled.
|
||||||
func (c *Checker) Start(ctx context.Context) {
|
func (c *Checker) Start(ctx context.Context) {
|
||||||
|
if c != nil {
|
||||||
|
log.Printf("devicecheck: starting (topic=%s interval=%s timeout=%s)", c.Topic, c.Interval, c.Timeout)
|
||||||
|
}
|
||||||
go c.loop(ctx)
|
go c.loop(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,7 +79,12 @@ func (c *Checker) HandleMessage(payload []byte) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify waiter if present
|
msgType := strings.ToLower(strings.TrimSpace(msg.Type))
|
||||||
|
if msgType != "device_check_response" && msgType != "device_state" && !strings.EqualFold(msg.Status, "ok") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify waiter if present for valid response/state
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
ch := c.waiters[id]
|
ch := c.waiters[id]
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
@@ -87,10 +95,8 @@ func (c *Checker) HandleMessage(payload []byte) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update status to ok on explicit ok or check response
|
// Update status to ok on explicit ok or check response/state
|
||||||
if msg.Type == "device_check_response" || msg.Type == "device_state" || strings.EqualFold(msg.Status, "ok") {
|
_ = c.updateStatus(id, 1) // 1 = ok
|
||||||
_ = c.updateStatus(id, 1) // 1 = ok
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Checker) loop(ctx context.Context) {
|
func (c *Checker) loop(ctx context.Context) {
|
||||||
@@ -115,7 +121,7 @@ func (c *Checker) runOnce() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id != 4") // skip disabled
|
rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id NOT IN (3, 4)") // skip lost/disabled
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("devicecheck: query devices failed: %v", err)
|
log.Printf("devicecheck: query devices failed: %v", err)
|
||||||
return
|
return
|
||||||
@@ -131,6 +137,14 @@ func (c *Checker) runOnce() {
|
|||||||
}
|
}
|
||||||
ids = append(ids, strings.ToLower(id))
|
ids = append(ids, strings.ToLower(id))
|
||||||
}
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
log.Printf("devicecheck: rows error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(ids) == 0 {
|
||||||
|
log.Printf("devicecheck: no devices found to check")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
@@ -148,10 +162,6 @@ func (c *Checker) checkDevice(deviceID string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending
|
|
||||||
log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ch := make(chan struct{}, 1)
|
ch := make(chan struct{}, 1)
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
c.waiters[deviceID] = ch
|
c.waiters[deviceID] = ch
|
||||||
@@ -169,17 +179,22 @@ func (c *Checker) checkDevice(deviceID string) {
|
|||||||
|
|
||||||
body, err := json.Marshal(payload)
|
body, err := json.Marshal(payload)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
log.Printf("devicecheck: publish request for %s", deviceID)
|
||||||
if err := c.MQTT.Publish(c.Topic, body); err != nil {
|
if err := c.MQTT.Publish(c.Topic, body); err != nil {
|
||||||
log.Printf("devicecheck: publish failed for %s: %v", deviceID, err)
|
log.Printf("devicecheck: publish failed for %s: %v", deviceID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending
|
||||||
|
log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err)
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
if err := c.updateStatus(deviceID, 1); err != nil { // ok
|
if err := c.updateStatus(deviceID, 1); err != nil { // ok
|
||||||
log.Printf("devicecheck: set ok failed for %s: %v", deviceID, err)
|
log.Printf("devicecheck: set ok failed for %s: %v", deviceID, err)
|
||||||
}
|
}
|
||||||
case <-time.After(c.Timeout):
|
case <-time.After(c.Interval):
|
||||||
if err := c.updateStatus(deviceID, 3); err != nil { // lost
|
if err := c.updateStatus(deviceID, 3); err != nil { // lost
|
||||||
log.Printf("devicecheck: set lost failed for %s: %v", deviceID, err)
|
log.Printf("devicecheck: set lost failed for %s: %v", deviceID, err)
|
||||||
}
|
}
|
||||||
@@ -194,6 +209,19 @@ func (c *Checker) updateStatus(deviceID string, statusID int) error {
|
|||||||
if c == nil || c.DB == nil {
|
if c == nil || c.DB == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID)
|
log.Printf("devicecheck: update status request %s -> %d", deviceID, statusID)
|
||||||
return err
|
res, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("devicecheck: update status failed for %s -> %d: %v", deviceID, statusID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rows, err := res.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("devicecheck: rows affected failed for %s -> %d: %v", deviceID, statusID, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if rows == 0 {
|
||||||
|
log.Printf("devicecheck: no rows updated for %s -> %d", deviceID, statusID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,98 @@
|
|||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ProcessSensorReadingMessage parses a sensor reading MQTT message and stores it.
|
||||||
|
// Returns handled=false when the payload does not look like a sensor reading.
|
||||||
|
func (h *Handler) ProcessSensorReadingMessage(payload []byte) (bool, error) {
|
||||||
|
if h == nil || h.DB == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var msg struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
SensorID string `json:"sensor_id"`
|
||||||
|
Value json.RawMessage `json:"value"`
|
||||||
|
ValueAt string `json:"value_at"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(payload, &msg); err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
msgType := strings.ToLower(strings.TrimSpace(msg.Type))
|
||||||
|
if msgType != "" && msgType != "sensor_reading" && msgType != "sensor_value" {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(msg.SensorID) == "" || len(msg.Value) == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
value, err := parseSensorValue(msg.Value)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("invalid sensor value: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
valueAt := time.Now().UTC()
|
||||||
|
if strings.TrimSpace(msg.ValueAt) != "" {
|
||||||
|
parsed, err := time.Parse(time.RFC3339, msg.ValueAt)
|
||||||
|
if err != nil {
|
||||||
|
return true, fmt.Errorf("invalid value_at: %w", err)
|
||||||
|
}
|
||||||
|
valueAt = parsed
|
||||||
|
}
|
||||||
|
|
||||||
|
var exists int
|
||||||
|
if err := h.DB.QueryRow("SELECT COUNT(1) FROM sensors WHERE id = UUID_TO_BIN(?)", msg.SensorID).Scan(&exists); err != nil {
|
||||||
|
return true, fmt.Errorf("failed to check sensor: %w", err)
|
||||||
|
}
|
||||||
|
if exists == 0 {
|
||||||
|
return true, fmt.Errorf("sensor not found: %s", msg.SensorID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := h.DB.Exec(
|
||||||
|
"INSERT INTO sensor_readings (sensor_id, value, value_at) VALUES (UUID_TO_BIN(?), ?, ?)",
|
||||||
|
msg.SensorID,
|
||||||
|
value,
|
||||||
|
valueAt,
|
||||||
|
); err != nil {
|
||||||
|
return true, fmt.Errorf("failed to insert sensor reading: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseSensorValue(raw json.RawMessage) (float64, error) {
|
||||||
|
var v any
|
||||||
|
if err := json.Unmarshal(raw, &v); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
switch t := v.(type) {
|
||||||
|
case float64:
|
||||||
|
return t, nil
|
||||||
|
case bool:
|
||||||
|
if t {
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
return 0, nil
|
||||||
|
case string:
|
||||||
|
if strings.EqualFold(t, "true") {
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
if strings.EqualFold(t, "false") {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
f, err := strconv.ParseFloat(t, 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("unsupported value type")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user