package main import ( "context" "database/sql" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gin-gonic/gin" _ "github.com/go-sql-driver/mysql" "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" "git.piskot.si/SeminarM2/lambdaiot-core/internal/devicecheck" "git.piskot.si/SeminarM2/lambdaiot-core/internal/handler" "git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware" mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" ) func main() { // load configuration (look for ./config.toml by default) cfg, err := config.Load("") if err != nil { log.Fatalf("failed to load config: %v", err) } // connect to MySQL database dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?parseTime=true", cfg.Database.User, cfg.Database.Password, cfg.Database.Host, cfg.Database.Port, cfg.Database.Name) sqlDB, err := sql.Open("mysql", dsn) if err != nil { log.Fatalf("failed to connect to database: %v", err) } defer sqlDB.Close() // test connection if err := sqlDB.Ping(); err != nil { log.Fatalf("failed to ping database: %v", err) } log.Println("connected to database") // determine address addr := cfg.Server.Address if addr == "" { addr = ":8080" } if cfg.Server.Port != 0 { addr = fmt.Sprintf(":%d", cfg.Server.Port) } h := &handler.Handler{DB: sqlDB, JWTSecret: cfg.Server.JWTSecret, MQTTTopic: cfg.MQTT.Topic} // connect to MQTT broker (best-effort) var mq *mqttclient.Client var checker *devicecheck.Checker var checkCtx context.Context var checkCancel context.CancelFunc if cfg.MQTT.Broker != "" { mqc, err := mqttclient.Connect(cfg.MQTT) if err != nil { log.Printf("warning: mqtt connect failed: %v", err) } else { mq = mqc mqttclient.SetDefault(mqc) // start device state checker when MQTT is available checkCtx, checkCancel = context.WithCancel(context.Background()) checker = devicecheck.New(sqlDB, mq, cfg.MQTT.Topic, 0, 0) checker.Start(checkCtx) // subscribe to the configured topic and log incoming messages if cfg.MQTT.Topic != "" { handlerFn := func(t string, p []byte) { log.Printf("mqtt recv on %s: %s", t, string(p)) if checker != nil { checker.HandleMessage(p) } if handled, err := h.ProcessSensorReadingMessage(p); handled && err != nil { log.Printf("sensor reading message error: %v", err) } } if err := mq.Subscribe(cfg.MQTT.Topic, handlerFn); err != nil { log.Printf("warning: mqtt subscribe failed: %v", err) } } // subscribe to discovery topic discoveryTopic := cfg.MQTT.Topic + "/discovery" discoveryHandler := func(t string, p []byte) { log.Printf("device discovery on %s", t) if err := h.ProcessDiscoveryMessage(p); err != nil { log.Printf("error processing discovery message: %v", err) } } if err := mq.Subscribe(discoveryTopic, discoveryHandler); err != nil { log.Printf("warning: mqtt subscribe to discovery failed: %v", err) } // publish a startup message (non-blocking) go func() { msg := fmt.Sprintf("lambdaiot-core started on %s", addr) if err := mq.Publish(cfg.MQTT.Topic, []byte(msg)); err != nil { log.Printf("mqtt publish failed: %v", err) } }() } } // mqttping endpoint will be added after router initialization // Gin setup r := gin.New() r.Use(gin.Recovery()) r.Use(middleware.CORS()) r.Use(middleware.GinLogger()) // Public routes r.GET("/health", h.Health) r.GET("/hello", h.Hello) r.POST("/login", h.Login) r.GET("/devices", h.GetDevices) // mqttping endpoint handled by internal/handler (protected) // Protected routes auth := r.Group("/") auth.POST("/register", h.Register) auth.Use(middleware.AuthMiddleware(cfg.Server.JWTSecret)) auth.GET("/protected", h.Protected) auth.GET("/mqttping", h.MQTTPing) auth.GET("/sensors", h.GetSensors) auth.GET("/sensors/:id", h.GetSensor) auth.POST("/sensors/:id/trigger", h.TriggerSensor) auth.POST("/sensors", h.CreateSensor) auth.PUT("/sensors/:id", h.UpdateSensor) auth.DELETE("/sensors/:id", h.DeleteSensor) auth.GET("/actors", h.GetActors) auth.GET("/actors/:id", h.GetActor) auth.POST("/actors/:id/write", h.WriteActor) auth.POST("/actors", h.CreateActor) auth.PUT("/actors/:id", h.UpdateActor) auth.DELETE("/actors/:id", h.DeleteActor) auth.GET("/sensor-readings", h.GetSensorReadings) auth.GET("/sensor-readings/:id", h.GetSensorReading) auth.POST("/sensor-readings", h.CreateSensorReading) auth.PUT("/sensor-readings/:id", h.UpdateSensorReading) auth.DELETE("/sensor-readings/:id", h.DeleteSensorReading) // Device CRUD routes auth.POST("/devices", h.CreateDevice) auth.GET("/devices/:id", h.GetDevice) auth.PUT("/devices/:id", h.UpdateDevice) auth.DELETE("/devices/:id", h.DeleteDevice) srv := &http.Server{ Addr: addr, Handler: r, } go func() { log.Printf("starting server on %s", addr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("listen: %v", err) } }() quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, syscall.SIGTERM) <-quit ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { log.Fatalf("server forced to shutdown: %v", err) } if mq != nil { mq.Close() } if checkCancel != nil { checkCancel() } log.Println("server exiting") } // removed unused getJWTSecret helper; configuration provides JWTSecret