Files
lambdaiot-core/internal/devicecheck/devicecheck.go

228 lines
5.3 KiB
Go

package devicecheck
import (
"context"
"database/sql"
"encoding/json"
"log"
"strings"
"sync"
"time"
"git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
)
// Checker periodically pings devices via MQTT and updates their status
// based on responses received on the subscribed topic.
type Checker struct {
DB *sql.DB
MQTT *mqtt.Client
Topic string
Interval time.Duration
Timeout time.Duration
mu sync.Mutex
waiters map[string]chan struct{}
}
// New constructs a Checker with sane defaults for interval and timeout when zero.
func New(db *sql.DB, mq *mqtt.Client, topic string, interval, timeout time.Duration) *Checker {
if interval == 0 {
interval = time.Minute
}
if timeout == 0 {
timeout = 10 * time.Second
}
if topic == "" {
topic = "lambdaiot"
}
return &Checker{
DB: db,
MQTT: mq,
Topic: topic,
Interval: interval,
Timeout: timeout,
waiters: make(map[string]chan struct{}),
}
}
// Start launches the periodic checker until the context is canceled.
func (c *Checker) Start(ctx context.Context) {
if c != nil {
log.Printf("devicecheck: starting (topic=%s interval=%s timeout=%s)", c.Topic, c.Interval, c.Timeout)
}
go c.loop(ctx)
}
// HandleMessage attempts to parse a device state/check response message and
// signals any waiter for the given device. Expected JSON payload shape:
//
// {
// "type": "device_check_response" | "device_state",
// "device_id": "uuid",
// "status": "ok" | "lost" | "pending"
// }
func (c *Checker) HandleMessage(payload []byte) {
if c == nil {
return
}
var msg struct {
Type string `json:"type"`
DeviceID string `json:"device_id"`
Status string `json:"status"`
}
if err := json.Unmarshal(payload, &msg); err != nil {
return
}
id := strings.ToLower(strings.TrimSpace(msg.DeviceID))
if id == "" {
return
}
msgType := strings.ToLower(strings.TrimSpace(msg.Type))
if msgType != "device_check_response" && msgType != "device_state" && !strings.EqualFold(msg.Status, "ok") {
return
}
// Notify waiter if present for valid response/state
c.mu.Lock()
ch := c.waiters[id]
c.mu.Unlock()
if ch != nil {
select {
case ch <- struct{}{}:
default:
}
}
// Update status to ok on explicit ok or check response/state
_ = c.updateStatus(id, 1) // 1 = ok
}
func (c *Checker) loop(ctx context.Context) {
ticker := time.NewTicker(c.Interval)
defer ticker.Stop()
// Run immediately
c.runOnce()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.runOnce()
}
}
}
func (c *Checker) runOnce() {
if c == nil || c.DB == nil || c.MQTT == nil {
return
}
rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id NOT IN (3, 4)") // skip lost/disabled
if err != nil {
log.Printf("devicecheck: query devices failed: %v", err)
return
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
log.Printf("devicecheck: scan id failed: %v", err)
return
}
ids = append(ids, strings.ToLower(id))
}
if err := rows.Err(); err != nil {
log.Printf("devicecheck: rows error: %v", err)
return
}
if len(ids) == 0 {
log.Printf("devicecheck: no devices found to check")
return
}
var wg sync.WaitGroup
for _, id := range ids {
wg.Add(1)
go func(deviceID string) {
defer wg.Done()
c.checkDevice(deviceID)
}(id)
}
wg.Wait()
}
func (c *Checker) checkDevice(deviceID string) {
if c == nil || c.DB == nil || c.MQTT == nil {
return
}
ch := make(chan struct{}, 1)
c.mu.Lock()
c.waiters[deviceID] = ch
c.mu.Unlock()
payload := struct {
Type string `json:"type"`
DeviceID string `json:"device_id"`
RequestedAt string `json:"requested_at"`
}{
Type: "device_check_request",
DeviceID: deviceID,
RequestedAt: time.Now().UTC().Format(time.RFC3339),
}
body, err := json.Marshal(payload)
if err == nil {
log.Printf("devicecheck: publish request for %s", deviceID)
if err := c.MQTT.Publish(c.Topic, body); err != nil {
log.Printf("devicecheck: publish failed for %s: %v", deviceID, err)
}
}
if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending
log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err)
}
select {
case <-ch:
if err := c.updateStatus(deviceID, 1); err != nil { // ok
log.Printf("devicecheck: set ok failed for %s: %v", deviceID, err)
}
case <-time.After(c.Interval):
if err := c.updateStatus(deviceID, 3); err != nil { // lost
log.Printf("devicecheck: set lost failed for %s: %v", deviceID, err)
}
}
c.mu.Lock()
delete(c.waiters, deviceID)
c.mu.Unlock()
}
func (c *Checker) updateStatus(deviceID string, statusID int) error {
if c == nil || c.DB == nil {
return nil
}
log.Printf("devicecheck: update status request %s -> %d", deviceID, statusID)
res, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID)
if err != nil {
log.Printf("devicecheck: update status failed for %s -> %d: %v", deviceID, statusID, err)
return err
}
rows, err := res.RowsAffected()
if err != nil {
log.Printf("devicecheck: rows affected failed for %s -> %d: %v", deviceID, statusID, err)
return err
}
if rows == 0 {
log.Printf("devicecheck: no rows updated for %s -> %d", deviceID, statusID)
}
return nil
}