feat: device state checker
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/config"
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/devicecheck"
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/handler"
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware"
|
||||
mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
||||
@@ -54,6 +55,8 @@ func main() {
|
||||
|
||||
// connect to MQTT broker (best-effort)
|
||||
var mq *mqttclient.Client
|
||||
var checker *devicecheck.Checker
|
||||
var checkCancel context.CancelFunc
|
||||
// initialize sqlite for state messages
|
||||
var stateDB *storage.DB
|
||||
{
|
||||
@@ -77,17 +80,26 @@ func main() {
|
||||
mq = mqc
|
||||
mqttclient.SetDefault(mqc)
|
||||
|
||||
// start device state checker when MQTT is available
|
||||
checkCtx, checkCancel = context.WithCancel(context.Background())
|
||||
checker = devicecheck.New(sqlDB, mq, cfg.MQTT.Topic, 0, 0)
|
||||
checker.Start(checkCtx)
|
||||
|
||||
// subscribe to the configured topic and log incoming messages
|
||||
if cfg.MQTT.Topic != "" {
|
||||
if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) {
|
||||
handlerFn := func(t string, p []byte) {
|
||||
log.Printf("mqtt recv on %s: %s", t, string(p))
|
||||
if checker != nil {
|
||||
checker.HandleMessage(p)
|
||||
}
|
||||
// if topic starts with state: -> store in sqlite
|
||||
if strings.HasPrefix(t, "state:") {
|
||||
if storage.Default != nil {
|
||||
_ = storage.Default.InsertMessage(string(p), time.Now())
|
||||
}
|
||||
}
|
||||
}); err != nil {
|
||||
}
|
||||
if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil {
|
||||
log.Printf("warning: mqtt subscribe failed: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -173,6 +185,9 @@ func main() {
|
||||
if mq != nil {
|
||||
mq.Close()
|
||||
}
|
||||
if checkCancel != nil {
|
||||
checkCancel()
|
||||
}
|
||||
if stateDB != nil {
|
||||
stateDB.Close()
|
||||
}
|
||||
|
||||
199
internal/devicecheck/devicecheck.go
Normal file
199
internal/devicecheck/devicecheck.go
Normal file
@@ -0,0 +1,199 @@
|
||||
package devicecheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
||||
)
|
||||
|
||||
// Checker periodically pings devices via MQTT and updates their status
|
||||
// based on responses received on the subscribed topic.
|
||||
type Checker struct {
|
||||
DB *sql.DB
|
||||
MQTT *mqtt.Client
|
||||
Topic string
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
|
||||
mu sync.Mutex
|
||||
waiters map[string]chan struct{}
|
||||
}
|
||||
|
||||
// New constructs a Checker with sane defaults for interval and timeout when zero.
|
||||
func New(db *sql.DB, mq *mqtt.Client, topic string, interval, timeout time.Duration) *Checker {
|
||||
if interval == 0 {
|
||||
interval = time.Minute
|
||||
}
|
||||
if timeout == 0 {
|
||||
timeout = 10 * time.Second
|
||||
}
|
||||
if topic == "" {
|
||||
topic = "lambdaiot"
|
||||
}
|
||||
return &Checker{
|
||||
DB: db,
|
||||
MQTT: mq,
|
||||
Topic: topic,
|
||||
Interval: interval,
|
||||
Timeout: timeout,
|
||||
waiters: make(map[string]chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the periodic checker until the context is canceled.
|
||||
func (c *Checker) Start(ctx context.Context) {
|
||||
go c.loop(ctx)
|
||||
}
|
||||
|
||||
// HandleMessage attempts to parse a device state/check response message and
|
||||
// signals any waiter for the given device. Expected JSON payload shape:
|
||||
//
|
||||
// {
|
||||
// "type": "device_check_response" | "device_state",
|
||||
// "device_id": "uuid",
|
||||
// "status": "ok" | "lost" | "pending"
|
||||
// }
|
||||
func (c *Checker) HandleMessage(payload []byte) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
var msg struct {
|
||||
Type string `json:"type"`
|
||||
DeviceID string `json:"device_id"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
if err := json.Unmarshal(payload, &msg); err != nil {
|
||||
return
|
||||
}
|
||||
id := strings.ToLower(strings.TrimSpace(msg.DeviceID))
|
||||
if id == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Notify waiter if present
|
||||
c.mu.Lock()
|
||||
ch := c.waiters[id]
|
||||
c.mu.Unlock()
|
||||
if ch != nil {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Update status to ok on explicit ok or check response
|
||||
if msg.Type == "device_check_response" || msg.Type == "device_state" || strings.EqualFold(msg.Status, "ok") {
|
||||
_ = c.updateStatus(id, 1) // 1 = ok
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Checker) loop(ctx context.Context) {
|
||||
ticker := time.NewTicker(c.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run immediately
|
||||
c.runOnce()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
c.runOnce()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Checker) runOnce() {
|
||||
if c == nil || c.DB == nil || c.MQTT == nil {
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := c.DB.Query("SELECT BIN_TO_UUID(id) FROM devices WHERE status_id != 4") // skip disabled
|
||||
if err != nil {
|
||||
log.Printf("devicecheck: query devices failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
log.Printf("devicecheck: scan id failed: %v", err)
|
||||
return
|
||||
}
|
||||
ids = append(ids, strings.ToLower(id))
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, id := range ids {
|
||||
wg.Add(1)
|
||||
go func(deviceID string) {
|
||||
defer wg.Done()
|
||||
c.checkDevice(deviceID)
|
||||
}(id)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (c *Checker) checkDevice(deviceID string) {
|
||||
if c == nil || c.DB == nil || c.MQTT == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.updateStatus(deviceID, 2); err != nil { // 2 = pending
|
||||
log.Printf("devicecheck: set pending failed for %s: %v", deviceID, err)
|
||||
}
|
||||
|
||||
ch := make(chan struct{}, 1)
|
||||
c.mu.Lock()
|
||||
c.waiters[deviceID] = ch
|
||||
c.mu.Unlock()
|
||||
|
||||
payload := struct {
|
||||
Type string `json:"type"`
|
||||
DeviceID string `json:"device_id"`
|
||||
RequestedAt string `json:"requested_at"`
|
||||
}{
|
||||
Type: "device_check_request",
|
||||
DeviceID: deviceID,
|
||||
RequestedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
body, err := json.Marshal(payload)
|
||||
if err == nil {
|
||||
if err := c.MQTT.Publish(c.Topic, body); err != nil {
|
||||
log.Printf("devicecheck: publish failed for %s: %v", deviceID, err)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
if err := c.updateStatus(deviceID, 1); err != nil { // ok
|
||||
log.Printf("devicecheck: set ok failed for %s: %v", deviceID, err)
|
||||
}
|
||||
case <-time.After(c.Timeout):
|
||||
if err := c.updateStatus(deviceID, 3); err != nil { // lost
|
||||
log.Printf("devicecheck: set lost failed for %s: %v", deviceID, err)
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
delete(c.waiters, deviceID)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Checker) updateStatus(deviceID string, statusID int) error {
|
||||
if c == nil || c.DB == nil {
|
||||
return nil
|
||||
}
|
||||
_, err := c.DB.Exec("UPDATE devices SET status_id = ?, updated_at = NOW(6) WHERE id = UUID_TO_BIN(?)", statusID, deviceID)
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user