diff --git a/cmd/server/main.go b/cmd/server/main.go index fb119ca..f39d01f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -43,6 +43,15 @@ func main() { } else { mq = mqc mqttclient.SetDefault(mqc) + + // subscribe to the configured topic and log incoming messages + if cfg.MQTT.Topic != "" { + if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) { + log.Printf("mqtt recv on %s: %s", t, string(p)) + }); err != nil { + log.Printf("warning: mqtt subscribe failed: %v", err) + } + } // publish a startup message (non-blocking) go func() { msg := fmt.Sprintf("lambdaiot-core started on %s", addr) diff --git a/internal/mqtt/mqtt.go b/internal/mqtt/mqtt.go index b7568f4..aa75f91 100644 --- a/internal/mqtt/mqtt.go +++ b/internal/mqtt/mqtt.go @@ -64,6 +64,24 @@ func (c *Client) Publish(topic string, payload []byte) error { return token.Error() } +// Subscribe subscribes to the given topic and calls the provided handler +// for each incoming message. +func (c *Client) Subscribe(topic string, handler func(topic string, payload []byte)) error { + if c == nil || c.client == nil { + return fmt.Errorf("mqtt client not connected") + } + mh := func(cli paho.Client, msg paho.Message) { + if handler != nil { + handler(msg.Topic(), msg.Payload()) + } + } + token := c.client.Subscribe(topic, 0, mh) + if ok := token.WaitTimeout(5 * time.Second); !ok { + return fmt.Errorf("subscribe timeout") + } + return token.Error() +} + func (c *Client) Close() { if c == nil || c.client == nil { return