mqtt: add Subscribe method; server subscribes and logs incoming messages
This commit is contained in:
@@ -43,6 +43,15 @@ func main() {
|
|||||||
} else {
|
} else {
|
||||||
mq = mqc
|
mq = mqc
|
||||||
mqttclient.SetDefault(mqc)
|
mqttclient.SetDefault(mqc)
|
||||||
|
|
||||||
|
// subscribe to the configured topic and log incoming messages
|
||||||
|
if cfg.MQTT.Topic != "" {
|
||||||
|
if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) {
|
||||||
|
log.Printf("mqtt recv on %s: %s", t, string(p))
|
||||||
|
}); err != nil {
|
||||||
|
log.Printf("warning: mqtt subscribe failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
// publish a startup message (non-blocking)
|
// publish a startup message (non-blocking)
|
||||||
go func() {
|
go func() {
|
||||||
msg := fmt.Sprintf("lambdaiot-core started on %s", addr)
|
msg := fmt.Sprintf("lambdaiot-core started on %s", addr)
|
||||||
|
|||||||
@@ -64,6 +64,24 @@ func (c *Client) Publish(topic string, payload []byte) error {
|
|||||||
return token.Error()
|
return token.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe subscribes to the given topic and calls the provided handler
|
||||||
|
// for each incoming message.
|
||||||
|
func (c *Client) Subscribe(topic string, handler func(topic string, payload []byte)) error {
|
||||||
|
if c == nil || c.client == nil {
|
||||||
|
return fmt.Errorf("mqtt client not connected")
|
||||||
|
}
|
||||||
|
mh := func(cli paho.Client, msg paho.Message) {
|
||||||
|
if handler != nil {
|
||||||
|
handler(msg.Topic(), msg.Payload())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
token := c.client.Subscribe(topic, 0, mh)
|
||||||
|
if ok := token.WaitTimeout(5 * time.Second); !ok {
|
||||||
|
return fmt.Errorf("subscribe timeout")
|
||||||
|
}
|
||||||
|
return token.Error()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) Close() {
|
func (c *Client) Close() {
|
||||||
if c == nil || c.client == nil {
|
if c == nil || c.client == nil {
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user