Add MQTT client wrapper; wire startup publish; support env-config

This commit is contained in:
2025-12-28 15:02:52 +01:00
parent bfe8704f18
commit c866346c06
6 changed files with 220 additions and 4 deletions
+13
View File
@@ -0,0 +1,13 @@
# Example env file for lambdaiot-core
# Server
SERVER_ADDRESS=0.0.0.0
SERVER_PORT=8080
JWT_SECRET=secret
# MQTT
MQTT_BROKER=tcp://localhost:1883
MQTT_CLIENT_ID=lambdaiot-core
MQTT_USERNAME=
MQTT_PASSWORD=
MQTT_TOPIC=lambda/iot
+41 -3
View File
@@ -2,6 +2,7 @@ package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
@@ -11,12 +12,46 @@ import (
"github.com/gin-gonic/gin"
"git.piskot.si/SeminarM2/lambdaiot-core/internal/config"
mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
"git.piskot.si/SeminarM2/lambdaiot-core/internal/handler"
"git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware"
)
func main() {
addr := ":8080"
// load configuration (look for ./config.toml by default)
cfg, err := config.Load("")
if err != nil {
log.Fatalf("failed to load config: %v", err)
}
// determine address
addr := cfg.Server.Address
if addr == "" {
addr = ":8080"
}
if cfg.Server.Port != 0 {
addr = fmt.Sprintf(":%d", cfg.Server.Port)
}
// connect to MQTT broker (best-effort)
var mq *mqttclient.Client
if cfg.MQTT.Broker != "" {
mqc, err := mqttclient.Connect(cfg.MQTT)
if err != nil {
log.Printf("warning: mqtt connect failed: %v", err)
} else {
mq = mqc
// publish a startup message (non-blocking)
go func() {
msg := fmt.Sprintf("lambdaiot-core started on %s", addr)
if err := mq.Publish(cfg.MQTT.Topic, []byte(msg)); err != nil {
log.Printf("mqtt publish failed: %v", err)
}
}()
}
}
// Gin setup
r := gin.New()
@@ -30,7 +65,7 @@ func main() {
// Protected routes
auth := r.Group("/")
auth.Use(middleware.AuthMiddleware(getJWTSecret()))
auth.Use(middleware.AuthMiddleware(cfg.Server.JWTSecret))
auth.GET("/protected", handler.Protected)
srv := &http.Server{
@@ -53,6 +88,9 @@ func main() {
if err := srv.Shutdown(ctx); err != nil {
log.Fatalf("server forced to shutdown: %v", err)
}
if mq != nil {
mq.Close()
}
log.Println("server exiting")
}
+5 -1
View File
@@ -5,11 +5,16 @@ go 1.21
require (
github.com/gin-gonic/gin v1.9.0
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/joho/godotenv v1.5.1
github.com/pelletier/go-toml/v2 v2.0.6
)
require github.com/gorilla/websocket v1.4.2 // indirect
require (
github.com/bytedance/sonic v1.8.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
@@ -21,7 +26,6 @@ require (
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.9 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
+11
View File
@@ -7,6 +7,8 @@ github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583j
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.0 h1:OjyFBKICoexlu99ctXNR2gg+c5pKrKMuyjgARg9qeY8=
@@ -27,6 +29,10 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
@@ -65,13 +71,18 @@ github.com/ugorji/go/codec v1.2.9 h1:rmenucSohSTiyL09Y+l2OCk+FrMxGMzho2+tjr5ticU
github.com/ugorji/go/codec v1.2.9/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+94
View File
@@ -0,0 +1,94 @@
package config
import (
"os"
"strconv"
"github.com/joho/godotenv"
"github.com/pelletier/go-toml/v2"
)
// Config holds application configuration loaded from TOML
type Config struct {
Server ServerConfig `toml:"server"`
MQTT MQTTConfig `toml:"mqtt"`
}
type ServerConfig struct {
Address string `toml:"address"`
Port int `toml:"port"`
JWTSecret string `toml:"jwt_secret"`
}
type MQTTConfig struct {
Broker string `toml:"broker"`
ClientID string `toml:"client_id"`
Username string `toml:"username"`
Password string `toml:"password"`
Topic string `toml:"topic"`
}
// Load loads configuration from the given path. If path is empty, it tries
// to load ./config.toml or falls back to environment defaults.
func Load(path string) (*Config, error) {
cfg := &Config{
Server: ServerConfig{
Address: "0.0.0.0",
Port: 8080,
JWTSecret: "secret",
},
MQTT: MQTTConfig{
Broker: "tcp://localhost:1883",
ClientID: "lambda-iot-core",
Topic: "lambda/iot",
},
}
// try to load TOML if provided or present
if path == "" {
path = "config.toml"
}
if _, err := os.Stat(path); err == nil {
f, err := os.ReadFile(path)
if err != nil {
return nil, err
}
if err := toml.Unmarshal(f, cfg); err != nil {
return nil, err
}
}
// load .env (if present) so env vars are populated for overrides
_ = godotenv.Load()
// environment overrides (common for Docker/.env)
if v := os.Getenv("SERVER_ADDRESS"); v != "" {
cfg.Server.Address = v
}
if v := os.Getenv("SERVER_PORT"); v != "" {
if p, err := strconv.Atoi(v); err == nil {
cfg.Server.Port = p
}
}
if v := os.Getenv("JWT_SECRET"); v != "" {
cfg.Server.JWTSecret = v
}
if v := os.Getenv("MQTT_BROKER"); v != "" {
cfg.MQTT.Broker = v
}
if v := os.Getenv("MQTT_CLIENT_ID"); v != "" {
cfg.MQTT.ClientID = v
}
if v := os.Getenv("MQTT_USERNAME"); v != "" {
cfg.MQTT.Username = v
}
if v := os.Getenv("MQTT_PASSWORD"); v != "" {
cfg.MQTT.Password = v
}
if v := os.Getenv("MQTT_TOPIC"); v != "" {
cfg.MQTT.Topic = v
}
return cfg, nil
}
+56
View File
@@ -0,0 +1,56 @@
package mqtt
import (
"fmt"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"git.piskot.si/SeminarM2/lambdaiot-core/internal/config"
)
type Client struct {
client paho.Client
}
func Connect(cfg config.MQTTConfig) (*Client, error) {
opts := paho.NewClientOptions()
opts.AddBroker(cfg.Broker)
if cfg.ClientID != "" {
opts.SetClientID(cfg.ClientID)
}
if cfg.Username != "" {
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
}
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectTimeout(5 * time.Second)
c := paho.NewClient(opts)
token := c.Connect()
if ok := token.WaitTimeout(10 * time.Second); !ok {
return nil, fmt.Errorf("mqtt connect timeout")
}
if err := token.Error(); err != nil {
return nil, err
}
return &Client{client: c}, nil
}
func (c *Client) Publish(topic string, payload []byte) error {
if c == nil || c.client == nil {
return fmt.Errorf("mqtt client not connected")
}
token := c.client.Publish(topic, 0, false, payload)
if ok := token.WaitTimeout(5 * time.Second); !ok {
return fmt.Errorf("publish timeout")
}
return token.Error()
}
func (c *Client) Close() {
if c == nil || c.client == nil {
return
}
c.client.Disconnect(250)
}