190 lines
5.3 KiB
Go
190 lines
5.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
_ "github.com/go-sql-driver/mysql"
|
|
|
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/config"
|
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/devicecheck"
|
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/handler"
|
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware"
|
|
mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
|
)
|
|
|
|
func main() {
|
|
// load configuration (look for ./config.toml by default)
|
|
cfg, err := config.Load("")
|
|
if err != nil {
|
|
log.Fatalf("failed to load config: %v", err)
|
|
}
|
|
|
|
// connect to MySQL database
|
|
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", cfg.Database.User, cfg.Database.Password, cfg.Database.Host, cfg.Database.Port, cfg.Database.Name)
|
|
sqlDB, err := sql.Open("mysql", dsn)
|
|
if err != nil {
|
|
log.Fatalf("failed to connect to database: %v", err)
|
|
}
|
|
defer sqlDB.Close()
|
|
|
|
// test connection
|
|
if err := sqlDB.Ping(); err != nil {
|
|
log.Fatalf("failed to ping database: %v", err)
|
|
}
|
|
log.Println("connected to database")
|
|
|
|
// determine address
|
|
addr := cfg.Server.Address
|
|
if addr == "" {
|
|
addr = ":8080"
|
|
}
|
|
if cfg.Server.Port != 0 {
|
|
addr = fmt.Sprintf(":%d", cfg.Server.Port)
|
|
}
|
|
|
|
h := &handler.Handler{DB: sqlDB, JWTSecret: cfg.Server.JWTSecret, MQTTTopic: cfg.MQTT.Topic}
|
|
|
|
// connect to MQTT broker (best-effort)
|
|
var mq *mqttclient.Client
|
|
var checker *devicecheck.Checker
|
|
var checkCtx context.Context
|
|
var checkCancel context.CancelFunc
|
|
if cfg.MQTT.Broker != "" {
|
|
mqc, err := mqttclient.Connect(cfg.MQTT)
|
|
if err != nil {
|
|
log.Printf("warning: mqtt connect failed: %v", err)
|
|
} else {
|
|
mq = mqc
|
|
mqttclient.SetDefault(mqc)
|
|
|
|
// start device state checker when MQTT is available
|
|
checkCtx, checkCancel = context.WithCancel(context.Background())
|
|
checker = devicecheck.New(sqlDB, mq, cfg.MQTT.Topic, 0, 0)
|
|
checker.Start(checkCtx)
|
|
|
|
// subscribe to the configured topic and log incoming messages
|
|
if cfg.MQTT.Topic != "" {
|
|
handlerFn := func(t string, p []byte) {
|
|
log.Printf("mqtt recv on %s: %s", t, string(p))
|
|
if checker != nil {
|
|
checker.HandleMessage(p)
|
|
}
|
|
if handled, err := h.ProcessSensorReadingMessage(p); handled && err != nil {
|
|
log.Printf("sensor reading message error: %v", err)
|
|
}
|
|
}
|
|
if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil {
|
|
log.Printf("warning: mqtt subscribe failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// subscribe to discovery topic
|
|
discoveryTopic := cfg.MQTT.Topic + "/discovery"
|
|
discoveryHandler := func(t string, p []byte) {
|
|
log.Printf("device discovery on %s", t)
|
|
if err := h.ProcessDiscoveryMessage(p); err != nil {
|
|
log.Printf("error processing discovery message: %v", err)
|
|
}
|
|
}
|
|
if err := mq.Subscribe(discoveryTopic, discoveryHandler); err != nil {
|
|
log.Printf("warning: mqtt subscribe to discovery failed: %v", err)
|
|
}
|
|
|
|
// publish a startup message (non-blocking)
|
|
go func() {
|
|
msg := fmt.Sprintf("lambdaiot-core started on %s", addr)
|
|
if err := mq.Publish(cfg.MQTT.Topic, []byte(msg)); err != nil {
|
|
log.Printf("mqtt publish failed: %v", err)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// mqttping endpoint will be added after router initialization
|
|
|
|
// Gin setup
|
|
r := gin.New()
|
|
r.Use(gin.Recovery())
|
|
r.Use(middleware.CORS())
|
|
r.Use(middleware.GinLogger())
|
|
|
|
// Public routes
|
|
r.GET("/health", h.Health)
|
|
r.GET("/hello", h.Hello)
|
|
r.POST("/login", h.Login)
|
|
r.GET("/devices", h.GetDevices)
|
|
|
|
// mqttping endpoint handled by internal/handler (protected)
|
|
|
|
// Protected routes
|
|
auth := r.Group("/")
|
|
auth.POST("/register", h.Register)
|
|
auth.Use(middleware.AuthMiddleware(cfg.Server.JWTSecret))
|
|
auth.GET("/protected", h.Protected)
|
|
auth.GET("/mqttping", h.MQTTPing)
|
|
auth.GET("/sensors", h.GetSensors)
|
|
auth.GET("/sensors/:id", h.GetSensor)
|
|
auth.POST("/sensors/:id/trigger", h.TriggerSensor)
|
|
auth.POST("/sensors", h.CreateSensor)
|
|
auth.PUT("/sensors/:id", h.UpdateSensor)
|
|
auth.DELETE("/sensors/:id", h.DeleteSensor)
|
|
|
|
auth.GET("/actors", h.GetActors)
|
|
auth.GET("/actors/:id", h.GetActor)
|
|
auth.POST("/actors/:id/write", h.WriteActor)
|
|
auth.POST("/actors", h.CreateActor)
|
|
auth.PUT("/actors/:id", h.UpdateActor)
|
|
auth.DELETE("/actors/:id", h.DeleteActor)
|
|
|
|
auth.GET("/sensor-readings", h.GetSensorReadings)
|
|
auth.GET("/sensor-readings/:id", h.GetSensorReading)
|
|
auth.POST("/sensor-readings", h.CreateSensorReading)
|
|
auth.PUT("/sensor-readings/:id", h.UpdateSensorReading)
|
|
auth.DELETE("/sensor-readings/:id", h.DeleteSensorReading)
|
|
|
|
// Device CRUD routes
|
|
auth.POST("/devices", h.CreateDevice)
|
|
auth.GET("/devices/:id", h.GetDevice)
|
|
auth.PUT("/devices/:id", h.UpdateDevice)
|
|
auth.DELETE("/devices/:id", h.DeleteDevice)
|
|
|
|
srv := &http.Server{
|
|
Addr: addr,
|
|
Handler: r,
|
|
}
|
|
|
|
go func() {
|
|
log.Printf("starting server on %s", addr)
|
|
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("listen: %v", err)
|
|
}
|
|
}()
|
|
|
|
quit := make(chan os.Signal, 1)
|
|
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
|
|
<-quit
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := srv.Shutdown(ctx); err != nil {
|
|
log.Fatalf("server forced to shutdown: %v", err)
|
|
}
|
|
if mq != nil {
|
|
mq.Close()
|
|
}
|
|
if checkCancel != nil {
|
|
checkCancel()
|
|
}
|
|
log.Println("server exiting")
|
|
}
|
|
|
|
// removed unused getJWTSecret helper; configuration provides JWTSecret
|