feat: implement SQLite storage for state messages and add message retrieval endpoint
This commit is contained in:
+27
-2
@@ -8,6 +8,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -18,6 +19,7 @@ import (
|
|||||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/handler"
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/handler"
|
||||||
"git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware"
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware"
|
||||||
mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt"
|
||||||
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -52,6 +54,21 @@ func main() {
|
|||||||
|
|
||||||
// connect to MQTT broker (best-effort)
|
// connect to MQTT broker (best-effort)
|
||||||
var mq *mqttclient.Client
|
var mq *mqttclient.Client
|
||||||
|
// initialize sqlite for state messages
|
||||||
|
var db *storage.DB
|
||||||
|
{
|
||||||
|
dbPath := os.Getenv("SQLITE_PATH")
|
||||||
|
if dbPath == "" {
|
||||||
|
dbPath = "state_messages.db"
|
||||||
|
}
|
||||||
|
dbInit, err := storage.Init(dbPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("warning: sqlite init failed: %v", err)
|
||||||
|
} else {
|
||||||
|
db = dbInit
|
||||||
|
storage.SetDefault(db)
|
||||||
|
}
|
||||||
|
}
|
||||||
if cfg.MQTT.Broker != "" {
|
if cfg.MQTT.Broker != "" {
|
||||||
mqc, err := mqttclient.Connect(cfg.MQTT)
|
mqc, err := mqttclient.Connect(cfg.MQTT)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -64,6 +81,12 @@ func main() {
|
|||||||
if cfg.MQTT.Topic != "" {
|
if cfg.MQTT.Topic != "" {
|
||||||
if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) {
|
if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) {
|
||||||
log.Printf("mqtt recv on %s: %s", t, string(p))
|
log.Printf("mqtt recv on %s: %s", t, string(p))
|
||||||
|
// if topic starts with state: -> store in sqlite
|
||||||
|
if strings.HasPrefix(t, "state:") {
|
||||||
|
if storage.Default != nil {
|
||||||
|
_ = storage.Default.InsertMessage(string(p), time.Now())
|
||||||
|
}
|
||||||
|
}
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Printf("warning: mqtt subscribe failed: %v", err)
|
log.Printf("warning: mqtt subscribe failed: %v", err)
|
||||||
}
|
}
|
||||||
@@ -93,8 +116,7 @@ func main() {
|
|||||||
r.POST("/login", h.Login)
|
r.POST("/login", h.Login)
|
||||||
r.GET("/devices", h.GetDevices)
|
r.GET("/devices", h.GetDevices)
|
||||||
|
|
||||||
// mqttping endpoint handled by internal/handler
|
// mqttping endpoint handled by internal/handler (protected)
|
||||||
r.POST("/mqttping", h.MQTTPing)
|
|
||||||
|
|
||||||
// Protected routes
|
// Protected routes
|
||||||
auth := r.Group("/")
|
auth := r.Group("/")
|
||||||
@@ -130,6 +152,9 @@ func main() {
|
|||||||
if mq != nil {
|
if mq != nil {
|
||||||
mq.Close()
|
mq.Close()
|
||||||
}
|
}
|
||||||
|
if db != nil {
|
||||||
|
db.Close()
|
||||||
|
}
|
||||||
log.Println("server exiting")
|
log.Println("server exiting")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ require (
|
|||||||
github.com/golang-jwt/jwt/v5 v5.0.0
|
github.com/golang-jwt/jwt/v5 v5.0.0
|
||||||
github.com/joho/godotenv v1.5.1
|
github.com/joho/godotenv v1.5.1
|
||||||
github.com/pelletier/go-toml/v2 v2.0.6
|
github.com/pelletier/go-toml/v2 v2.0.6
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.20
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
|||||||
@@ -51,6 +51,8 @@ github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
|
|||||||
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
|
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
|
||||||
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
|
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
|
||||||
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0=
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
||||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||||
|
|||||||
@@ -3,9 +3,11 @@ package handler
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.piskot.si/SeminarM2/lambdaiot-core/internal/storage"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/golang-jwt/jwt/v5"
|
"github.com/golang-jwt/jwt/v5"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -243,3 +245,26 @@ func (h *Handler) Protected(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
c.JSON(http.StatusUnauthorized, gin.H{"error": "no claims"})
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "no claims"})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetMessages returns the last 100 state messages with optional pagination
|
||||||
|
// query parameter: ?page=N (0-based)
|
||||||
|
func GetMessages(c *gin.Context) {
|
||||||
|
page := 0
|
||||||
|
if p := c.Query("page"); p != "" {
|
||||||
|
if n, err := strconv.Atoi(p); err == nil && n >= 0 {
|
||||||
|
page = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
limit := 100
|
||||||
|
offset := page * limit
|
||||||
|
if storage.Default == nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "storage not initialized"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
msgs, err := storage.Default.QueryMessages(limit, offset)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusOK, gin.H{"messages": msgs})
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,86 @@
|
|||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DB wraps sql.DB
|
||||||
|
type DB struct {
|
||||||
|
conn *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default is the package-level DB used by handlers
|
||||||
|
var Default *DB
|
||||||
|
|
||||||
|
// Init opens the sqlite database at path and ensures the table exists
|
||||||
|
func Init(path string) (*DB, error) {
|
||||||
|
conn, err := sql.Open("sqlite3", path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// set reasonable pragmas
|
||||||
|
if _, err := conn.Exec("PRAGMA journal_mode=WAL;"); err != nil {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
schema := `CREATE TABLE IF NOT EXISTS messages (
|
||||||
|
message TEXT NOT NULL,
|
||||||
|
timestamp TEXT NOT NULL
|
||||||
|
);`
|
||||||
|
if _, err := conn.Exec(schema); err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &DB{conn: conn}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetDefault sets the package default DB
|
||||||
|
func SetDefault(d *DB) {
|
||||||
|
Default = d
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the underlying connection
|
||||||
|
func (d *DB) Close() error {
|
||||||
|
if d == nil || d.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return d.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertMessage inserts a message with timestamp into the DB
|
||||||
|
func (d *DB) InsertMessage(msg string, ts time.Time) error {
|
||||||
|
if d == nil || d.conn == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := d.conn.Exec("INSERT INTO messages(message, timestamp) VALUES(?, ?)", msg, ts.Format(time.RFC3339))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message is the returned message shape
|
||||||
|
type Message struct {
|
||||||
|
Message string `json:"message"`
|
||||||
|
Timestamp string `json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryMessages returns messages ordered by newest first
|
||||||
|
func (d *DB) QueryMessages(limit, offset int) ([]Message, error) {
|
||||||
|
if d == nil || d.conn == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
rows, err := d.conn.Query("SELECT message, timestamp FROM messages ORDER BY rowid DESC LIMIT ? OFFSET ?", limit, offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
res := []Message{}
|
||||||
|
for rows.Next() {
|
||||||
|
var m Message
|
||||||
|
if err := rows.Scan(&m.Message, &m.Timestamp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res = append(res, m)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
Binary file not shown.
Reference in New Issue
Block a user