From c835fb7a983092e7e1b2c49072d10343a12a8c7e Mon Sep 17 00:00:00 2001 From: Kristjan Komlosi Date: Sun, 28 Dec 2025 15:24:12 +0100 Subject: [PATCH] test: enable mosquitto password auth; add mqttping endpoint; use credentials in subscriber; publish defaults --- cmd/server/main.go | 20 ++++++++- internal/mqtt/mqtt.go | 92 ++++++++++++++++++++++++----------------- test/docker-compose.yml | 6 +++ test/mosquitto.conf | 5 ++- test/subscribe.py | 4 ++ 5 files changed, 86 insertions(+), 41 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 3016d09..fb119ca 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -13,10 +13,11 @@ import ( "github.com/gin-gonic/gin" "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" - mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" "git.piskot.si/SeminarM2/lambdaiot-core/internal/handler" "git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware" + mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" ) + func main() { // load configuration (look for ./config.toml by default) cfg, err := config.Load("") @@ -41,6 +42,7 @@ func main() { log.Printf("warning: mqtt connect failed: %v", err) } else { mq = mqc + mqttclient.SetDefault(mqc) // publish a startup message (non-blocking) go func() { msg := fmt.Sprintf("lambdaiot-core started on %s", addr) @@ -51,7 +53,7 @@ func main() { } } - + // mqttping endpoint will be added after router initialization // Gin setup r := gin.New() @@ -63,6 +65,20 @@ func main() { r.GET("/hello", handler.Hello) r.POST("/login", handler.Login) + // mqttping endpoint: publish current timestamp to MQTT topic + r.POST("/mqttping", func(c *gin.Context) { + ts := time.Now().Format(time.RFC3339) + if mq == nil { + c.JSON(http.StatusServiceUnavailable, gin.H{"error": "mqtt not connected"}) + return + } + if err := mq.Publish(cfg.MQTT.Topic, []byte(ts)); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"timestamp": ts}) + }) + // Protected routes auth := r.Group("/") auth.Use(middleware.AuthMiddleware(cfg.Server.JWTSecret)) diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index e96d7fe..b7568f4 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -1,56 +1,72 @@ package mqtt import ( - "fmt" - "time" + "fmt" + "time" - paho "github.com/eclipse/paho.mqtt.golang" - "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/config" + paho "github.com/eclipse/paho.mqtt.golang" ) type Client struct { - client paho.Client + client paho.Client +} + +// Default is a package-level client used by helpers +var Default *Client + +// SetDefault sets the package default client +func SetDefault(c *Client) { + Default = c +} + +// PublishDefault publishes using the default client +func PublishDefault(topic string, payload []byte) error { + if Default == nil { + return fmt.Errorf("no default mqtt client") + } + return Default.Publish(topic, payload) } func Connect(cfg config.MQTTConfig) (*Client, error) { - opts := paho.NewClientOptions() - opts.AddBroker(cfg.Broker) - if cfg.ClientID != "" { - opts.SetClientID(cfg.ClientID) - } - if cfg.Username != "" { - opts.SetUsername(cfg.Username) - opts.SetPassword(cfg.Password) - } - opts.SetAutoReconnect(true) - opts.SetConnectRetry(true) - opts.SetConnectTimeout(5 * time.Second) + opts := paho.NewClientOptions() + opts.AddBroker(cfg.Broker) + if cfg.ClientID != "" { + opts.SetClientID(cfg.ClientID) + } + if cfg.Username != "" { + opts.SetUsername(cfg.Username) + opts.SetPassword(cfg.Password) + } + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectTimeout(5 * time.Second) - c := paho.NewClient(opts) - token := c.Connect() - if ok := token.WaitTimeout(10 * time.Second); !ok { - return nil, fmt.Errorf("mqtt connect timeout") - } - if err := token.Error(); err != nil { - return nil, err - } - return &Client{client: c}, nil + c := paho.NewClient(opts) + token := c.Connect() + if ok := token.WaitTimeout(10 * time.Second); !ok { + return nil, fmt.Errorf("mqtt connect timeout") + } + if err := token.Error(); err != nil { + return nil, err + } + return &Client{client: c}, nil } func (c *Client) Publish(topic string, payload []byte) error { - if c == nil || c.client == nil { - return fmt.Errorf("mqtt client not connected") - } - token := c.client.Publish(topic, 0, false, payload) - if ok := token.WaitTimeout(5 * time.Second); !ok { - return fmt.Errorf("publish timeout") - } - return token.Error() + if c == nil || c.client == nil { + return fmt.Errorf("mqtt client not connected") + } + token := c.client.Publish(topic, 0, false, payload) + if ok := token.WaitTimeout(5 * time.Second); !ok { + return fmt.Errorf("publish timeout") + } + return token.Error() } func (c *Client) Close() { - if c == nil || c.client == nil { - return - } - c.client.Disconnect(250) + if c == nil || c.client == nil { + return + } + c.client.Disconnect(250) } diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 1023bd2..e6225f2 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -14,6 +14,8 @@ services: - MQTT_BROKER=tcp://mosquitto:1883 - MQTT_CLIENT_ID=lambdaiot-server - MQTT_TOPIC=lambdaiot + - MQTT_USERNAME=testuser + - MQTT_PASSWORD=testpass depends_on: - mosquitto # server image now waits for MQTT broker itself via entrypoint @@ -27,6 +29,8 @@ services: - mosquitto_data:/mosquitto/data - mosquitto_log:/mosquitto/log - ./mosquitto.conf:/mosquitto/config/mosquitto.conf:ro + # ensure a password file exists and start mosquitto with our config + command: sh -c "mosquitto_passwd -b /mosquitto/config/passwordfile testuser testpass >/dev/null 2>&1 || true; /usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf" healthcheck: test: ["CMD", "sh", "-c", "nc -z localhost 1883 || exit 1"] interval: 2s @@ -41,6 +45,8 @@ services: environment: - MQTT_BROKER=mosquitto:1883 - MQTT_TOPIC=lambdaiot + - MQTT_USERNAME=testuser + - MQTT_PASSWORD=testpass command: sh -c "pip install paho-mqtt && python subscribe.py" depends_on: - mosquitto diff --git a/test/mosquitto.conf b/test/mosquitto.conf index a1fb2b7..3572c5b 100644 --- a/test/mosquitto.conf +++ b/test/mosquitto.conf @@ -1,6 +1,9 @@ # Allow external connections on port 1883 +# listen on all interfaces listener 1883 0.0.0.0 -allow_anonymous true +# do not allow anonymous in this test stack; require password_file +allow_anonymous false +password_file /mosquitto/config/passwordfile # Increase persistence location so container can map volume if needed persistence true persistence_location /mosquitto/data/ diff --git a/test/subscribe.py b/test/subscribe.py index 6f8d2f6..bd4c214 100644 --- a/test/subscribe.py +++ b/test/subscribe.py @@ -6,6 +6,8 @@ import paho.mqtt.client as mqtt broker = os.getenv('MQTT_BROKER', 'localhost:1883') topic = os.getenv('MQTT_TOPIC', 'lambdaiot') +username = os.getenv('MQTT_USERNAME') +password = os.getenv('MQTT_PASSWORD') if broker.startswith('tcp://'): broker = broker[len('tcp://'):] @@ -28,6 +30,8 @@ def on_message(client, userdata, msg): client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message +if username: + client.username_pw_set(username, password) try: client.connect(host, port, 60)