test: enable mosquitto password auth; add mqttping endpoint; use credentials in subscriber; publish defaults
This commit is contained in:
+54
-38
@@ -1,56 +1,72 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
paho "github.com/eclipse/paho.mqtt.golang"
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/config"
|
||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/config"
|
||||
paho "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
client paho.Client
|
||||
client paho.Client
|
||||
}
|
||||
|
||||
// Default is a package-level client used by helpers
|
||||
var Default *Client
|
||||
|
||||
// SetDefault sets the package default client
|
||||
func SetDefault(c *Client) {
|
||||
Default = c
|
||||
}
|
||||
|
||||
// PublishDefault publishes using the default client
|
||||
func PublishDefault(topic string, payload []byte) error {
|
||||
if Default == nil {
|
||||
return fmt.Errorf("no default mqtt client")
|
||||
}
|
||||
return Default.Publish(topic, payload)
|
||||
}
|
||||
|
||||
func Connect(cfg config.MQTTConfig) (*Client, error) {
|
||||
opts := paho.NewClientOptions()
|
||||
opts.AddBroker(cfg.Broker)
|
||||
if cfg.ClientID != "" {
|
||||
opts.SetClientID(cfg.ClientID)
|
||||
}
|
||||
if cfg.Username != "" {
|
||||
opts.SetUsername(cfg.Username)
|
||||
opts.SetPassword(cfg.Password)
|
||||
}
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetConnectRetry(true)
|
||||
opts.SetConnectTimeout(5 * time.Second)
|
||||
opts := paho.NewClientOptions()
|
||||
opts.AddBroker(cfg.Broker)
|
||||
if cfg.ClientID != "" {
|
||||
opts.SetClientID(cfg.ClientID)
|
||||
}
|
||||
if cfg.Username != "" {
|
||||
opts.SetUsername(cfg.Username)
|
||||
opts.SetPassword(cfg.Password)
|
||||
}
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetConnectRetry(true)
|
||||
opts.SetConnectTimeout(5 * time.Second)
|
||||
|
||||
c := paho.NewClient(opts)
|
||||
token := c.Connect()
|
||||
if ok := token.WaitTimeout(10 * time.Second); !ok {
|
||||
return nil, fmt.Errorf("mqtt connect timeout")
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Client{client: c}, nil
|
||||
c := paho.NewClient(opts)
|
||||
token := c.Connect()
|
||||
if ok := token.WaitTimeout(10 * time.Second); !ok {
|
||||
return nil, fmt.Errorf("mqtt connect timeout")
|
||||
}
|
||||
if err := token.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Client{client: c}, nil
|
||||
}
|
||||
|
||||
func (c *Client) Publish(topic string, payload []byte) error {
|
||||
if c == nil || c.client == nil {
|
||||
return fmt.Errorf("mqtt client not connected")
|
||||
}
|
||||
token := c.client.Publish(topic, 0, false, payload)
|
||||
if ok := token.WaitTimeout(5 * time.Second); !ok {
|
||||
return fmt.Errorf("publish timeout")
|
||||
}
|
||||
return token.Error()
|
||||
if c == nil || c.client == nil {
|
||||
return fmt.Errorf("mqtt client not connected")
|
||||
}
|
||||
token := c.client.Publish(topic, 0, false, payload)
|
||||
if ok := token.WaitTimeout(5 * time.Second); !ok {
|
||||
return fmt.Errorf("publish timeout")
|
||||
}
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (c *Client) Close() {
|
||||
if c == nil || c.client == nil {
|
||||
return
|
||||
}
|
||||
c.client.Disconnect(250)
|
||||
if c == nil || c.client == nil {
|
||||
return
|
||||
}
|
||||
c.client.Disconnect(250)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user