From c866346c066b12317f154855e2ca0b098415815c Mon Sep 17 00:00:00 2001 From: Kristjan Komlosi Date: Sun, 28 Dec 2025 15:02:52 +0100 Subject: [PATCH] Add MQTT client wrapper; wire startup publish; support env-config --- .env.example | 13 ++++++ cmd/server/main.go | 44 ++++++++++++++++-- go.mod | 6 ++- go.sum | 11 +++++ internal/config/config.go | 94 +++++++++++++++++++++++++++++++++++++++ internal/mqtt/mqtt.go | 56 +++++++++++++++++++++++ 6 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 .env.example create mode 100644 internal/config/config.go create mode 100644 internal/mqtt/mqtt.go diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..3ef7cbd --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +# Example env file for lambdaiot-core + +# Server +SERVER_ADDRESS=0.0.0.0 +SERVER_PORT=8080 +JWT_SECRET=secret + +# MQTT +MQTT_BROKER=tcp://localhost:1883 +MQTT_CLIENT_ID=lambdaiot-core +MQTT_USERNAME= +MQTT_PASSWORD= +MQTT_TOPIC=lambda/iot diff --git a/cmd/server/main.go b/cmd/server/main.go index 505540a..3016d09 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "log" "net/http" "os" @@ -11,12 +12,46 @@ import ( "github.com/gin-gonic/gin" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" + mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" "git.piskot.si/SeminarM2/lambdaiot-core/internal/handler" "git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware" ) - func main() { - addr := ":8080" + // load configuration (look for ./config.toml by default) + cfg, err := config.Load("") + if err != nil { + log.Fatalf("failed to load config: %v", err) + } + + // determine address + addr := cfg.Server.Address + if addr == "" { + addr = ":8080" + } + if cfg.Server.Port != 0 { + addr = fmt.Sprintf(":%d", cfg.Server.Port) + } + + // connect to MQTT broker (best-effort) + var mq *mqttclient.Client + if cfg.MQTT.Broker != "" { + mqc, err := mqttclient.Connect(cfg.MQTT) + if err != nil { + log.Printf("warning: mqtt connect failed: %v", err) + } else { + mq = mqc + // publish a startup message (non-blocking) + go func() { + msg := fmt.Sprintf("lambdaiot-core started on %s", addr) + if err := mq.Publish(cfg.MQTT.Topic, []byte(msg)); err != nil { + log.Printf("mqtt publish failed: %v", err) + } + }() + } + } + + // Gin setup r := gin.New() @@ -30,7 +65,7 @@ func main() { // Protected routes auth := r.Group("/") - auth.Use(middleware.AuthMiddleware(getJWTSecret())) + auth.Use(middleware.AuthMiddleware(cfg.Server.JWTSecret)) auth.GET("/protected", handler.Protected) srv := &http.Server{ @@ -53,6 +88,9 @@ func main() { if err := srv.Shutdown(ctx); err != nil { log.Fatalf("server forced to shutdown: %v", err) } + if mq != nil { + mq.Close() + } log.Println("server exiting") } diff --git a/go.mod b/go.mod index 7bd0956..1e75d39 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,16 @@ go 1.21 require ( github.com/gin-gonic/gin v1.9.0 github.com/golang-jwt/jwt/v5 v5.0.0 + github.com/joho/godotenv v1.5.1 + github.com/pelletier/go-toml/v2 v2.0.6 ) +require github.com/gorilla/websocket v1.4.2 // indirect + require ( github.com/bytedance/sonic v1.8.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect + github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect @@ -21,7 +26,6 @@ require ( github.com/mattn/go-isatty v0.0.17 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.9 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect diff --git a/go.sum b/go.sum index fdbad1a..d44413f 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583j github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= +github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8= @@ -27,6 +29,10 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= @@ -65,13 +71,18 @@ github.com/ugorji/go/codec v1.2.9 h1:rmenucSohSTiyL09Y+l2OCk+FrMxGMzho2+tjr5ticU github.com/ugorji/go/codec v1.2.9/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..d636527 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,94 @@ +package config + +import ( + "os" + "strconv" + + "github.com/joho/godotenv" + "github.com/pelletier/go-toml/v2" +) + +// Config holds application configuration loaded from TOML +type Config struct { + Server ServerConfig `toml:"server"` + MQTT MQTTConfig `toml:"mqtt"` +} + +type ServerConfig struct { + Address string `toml:"address"` + Port int `toml:"port"` + JWTSecret string `toml:"jwt_secret"` +} + +type MQTTConfig struct { + Broker string `toml:"broker"` + ClientID string `toml:"client_id"` + Username string `toml:"username"` + Password string `toml:"password"` + Topic string `toml:"topic"` +} + +// Load loads configuration from the given path. If path is empty, it tries +// to load ./config.toml or falls back to environment defaults. +func Load(path string) (*Config, error) { + cfg := &Config{ + Server: ServerConfig{ + Address: "0.0.0.0", + Port: 8080, + JWTSecret: "secret", + }, + MQTT: MQTTConfig{ + Broker: "tcp://localhost:1883", + ClientID: "lambda-iot-core", + Topic: "lambda/iot", + }, + } + + // try to load TOML if provided or present + if path == "" { + path = "config.toml" + } + if _, err := os.Stat(path); err == nil { + f, err := os.ReadFile(path) + if err != nil { + return nil, err + } + if err := toml.Unmarshal(f, cfg); err != nil { + return nil, err + } + } + + // load .env (if present) so env vars are populated for overrides + _ = godotenv.Load() + + // environment overrides (common for Docker/.env) + if v := os.Getenv("SERVER_ADDRESS"); v != "" { + cfg.Server.Address = v + } + if v := os.Getenv("SERVER_PORT"); v != "" { + if p, err := strconv.Atoi(v); err == nil { + cfg.Server.Port = p + } + } + if v := os.Getenv("JWT_SECRET"); v != "" { + cfg.Server.JWTSecret = v + } + + if v := os.Getenv("MQTT_BROKER"); v != "" { + cfg.MQTT.Broker = v + } + if v := os.Getenv("MQTT_CLIENT_ID"); v != "" { + cfg.MQTT.ClientID = v + } + if v := os.Getenv("MQTT_USERNAME"); v != "" { + cfg.MQTT.Username = v + } + if v := os.Getenv("MQTT_PASSWORD"); v != "" { + cfg.MQTT.Password = v + } + if v := os.Getenv("MQTT_TOPIC"); v != "" { + cfg.MQTT.Topic = v + } + + return cfg, nil +} diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go new file mode 100644 index 0000000..e96d7fe --- /dev/null +++ b/internal/mqtt/mqtt.go @@ -0,0 +1,56 @@ +package mqtt + +import ( + "fmt" + "time" + + paho "github.com/eclipse/paho.mqtt.golang" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" +) + +type Client struct { + client paho.Client +} + +func Connect(cfg config.MQTTConfig) (*Client, error) { + opts := paho.NewClientOptions() + opts.AddBroker(cfg.Broker) + if cfg.ClientID != "" { + opts.SetClientID(cfg.ClientID) + } + if cfg.Username != "" { + opts.SetUsername(cfg.Username) + opts.SetPassword(cfg.Password) + } + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectTimeout(5 * time.Second) + + c := paho.NewClient(opts) + token := c.Connect() + if ok := token.WaitTimeout(10 * time.Second); !ok { + return nil, fmt.Errorf("mqtt connect timeout") + } + if err := token.Error(); err != nil { + return nil, err + } + return &Client{client: c}, nil +} + +func (c *Client) Publish(topic string, payload []byte) error { + if c == nil || c.client == nil { + return fmt.Errorf("mqtt client not connected") + } + token := c.client.Publish(topic, 0, false, payload) + if ok := token.WaitTimeout(5 * time.Second); !ok { + return fmt.Errorf("publish timeout") + } + return token.Error() +} + +func (c *Client) Close() { + if c == nil || c.client == nil { + return + } + c.client.Disconnect(250) +}