From a6196cc0eebc46846ec9b1b206a0f651b722f94a Mon Sep 17 00:00:00 2001 From: Kristjan Komlosi Date: Sat, 10 Jan 2026 14:20:34 +0100 Subject: [PATCH] feat: implement SQLite storage for state messages and add message retrieval endpoint --- cmd/server/main.go | 29 +++++++++++- go.mod | 1 + go.sum | 2 + internal/handler/handlers.go | 25 ++++++++++ internal/storage/sqlite.go | 86 +++++++++++++++++++++++++++++++++++ state_test.db | Bin 0 -> 8192 bytes 6 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 internal/storage/sqlite.go create mode 100644 state_test.db diff --git a/cmd/server/main.go b/cmd/server/main.go index 028a72e..eba9dd9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -18,6 +19,7 @@ import ( "git.piskot.si/SeminarM2/lambdaiot-core/internal/handler" "git.piskot.si/SeminarM2/lambdaiot-core/internal/middleware" mqttclient "git.piskot.si/SeminarM2/lambdaiot-core/internal/mqtt" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/storage" ) func main() { @@ -52,6 +54,21 @@ func main() { // connect to MQTT broker (best-effort) var mq *mqttclient.Client + // initialize sqlite for state messages + var db *storage.DB + { + dbPath := os.Getenv("SQLITE_PATH") + if dbPath == "" { + dbPath = "state_messages.db" + } + dbInit, err := storage.Init(dbPath) + if err != nil { + log.Printf("warning: sqlite init failed: %v", err) + } else { + db = dbInit + storage.SetDefault(db) + } + } if cfg.MQTT.Broker != "" { mqc, err := mqttclient.Connect(cfg.MQTT) if err != nil { @@ -64,6 +81,12 @@ func main() { if cfg.MQTT.Topic != "" { if err := mq.Subscribe(cfg.MQTT.Topic, func(t string, p []byte) { log.Printf("mqtt recv on %s: %s", t, string(p)) + // if topic starts with state: -> store in sqlite + if strings.HasPrefix(t, "state:") { + if storage.Default != nil { + _ = storage.Default.InsertMessage(string(p), time.Now()) + } + } }); err != nil { log.Printf("warning: mqtt subscribe failed: %v", err) } @@ -93,8 +116,7 @@ func main() { r.POST("/login", h.Login) r.GET("/devices", h.GetDevices) - // mqttping endpoint handled by internal/handler - r.POST("/mqttping", h.MQTTPing) + // mqttping endpoint handled by internal/handler (protected) // Protected routes auth := r.Group("/") @@ -130,6 +152,9 @@ func main() { if mq != nil { mq.Close() } + if db != nil { + db.Close() + } log.Println("server exiting") } diff --git a/go.mod b/go.mod index ab0f2dd..027a42a 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.0.0 github.com/joho/godotenv v1.5.1 github.com/pelletier/go-toml/v2 v2.0.6 + github.com/mattn/go-sqlite3 v1.14.20 ) require ( diff --git a/go.sum b/go.sum index efb0479..502348d 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-sqlite3 v1.14.20 h1:BAZ50Ns0OFBNxdAqFhbZqdPcht1Xlb16pDCqkq1spr0= +github.com/mattn/go-sqlite3 v1.14.20/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= diff --git a/internal/handler/handlers.go b/internal/handler/handlers.go index 5ca7fd1..ddb8c7f 100644 --- a/internal/handler/handlers.go +++ b/internal/handler/handlers.go @@ -3,9 +3,11 @@ package handler import ( "database/sql" "net/http" + "strconv" "strings" "time" + "git.piskot.si/SeminarM2/lambdaiot-core/internal/storage" "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt/v5" "github.com/google/uuid" @@ -243,3 +245,26 @@ func (h *Handler) Protected(c *gin.Context) { } c.JSON(http.StatusUnauthorized, gin.H{"error": "no claims"}) } + +// GetMessages returns the last 100 state messages with optional pagination +// query parameter: ?page=N (0-based) +func GetMessages(c *gin.Context) { + page := 0 + if p := c.Query("page"); p != "" { + if n, err := strconv.Atoi(p); err == nil && n >= 0 { + page = n + } + } + limit := 100 + offset := page * limit + if storage.Default == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage not initialized"}) + return + } + msgs, err := storage.Default.QueryMessages(limit, offset) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"messages": msgs}) +} diff --git a/internal/storage/sqlite.go b/internal/storage/sqlite.go new file mode 100644 index 0000000..be2ce49 --- /dev/null +++ b/internal/storage/sqlite.go @@ -0,0 +1,86 @@ +package storage + +import ( + "database/sql" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// DB wraps sql.DB +type DB struct { + conn *sql.DB +} + +// Default is the package-level DB used by handlers +var Default *DB + +// Init opens the sqlite database at path and ensures the table exists +func Init(path string) (*DB, error) { + conn, err := sql.Open("sqlite3", path) + if err != nil { + return nil, err + } + // set reasonable pragmas + if _, err := conn.Exec("PRAGMA journal_mode=WAL;"); err != nil { + // ignore + } + schema := `CREATE TABLE IF NOT EXISTS messages ( + message TEXT NOT NULL, + timestamp TEXT NOT NULL + );` + if _, err := conn.Exec(schema); err != nil { + conn.Close() + return nil, err + } + return &DB{conn: conn}, nil +} + +// SetDefault sets the package default DB +func SetDefault(d *DB) { + Default = d +} + +// Close closes the underlying connection +func (d *DB) Close() error { + if d == nil || d.conn == nil { + return nil + } + return d.conn.Close() +} + +// InsertMessage inserts a message with timestamp into the DB +func (d *DB) InsertMessage(msg string, ts time.Time) error { + if d == nil || d.conn == nil { + return nil + } + _, err := d.conn.Exec("INSERT INTO messages(message, timestamp) VALUES(?, ?)", msg, ts.Format(time.RFC3339)) + return err +} + +// Message is the returned message shape +type Message struct { + Message string `json:"message"` + Timestamp string `json:"timestamp"` +} + +// QueryMessages returns messages ordered by newest first +func (d *DB) QueryMessages(limit, offset int) ([]Message, error) { + if d == nil || d.conn == nil { + return nil, nil + } + rows, err := d.conn.Query("SELECT message, timestamp FROM messages ORDER BY rowid DESC LIMIT ? OFFSET ?", limit, offset) + if err != nil { + return nil, err + } + defer rows.Close() + res := []Message{} + for rows.Next() { + var m Message + if err := rows.Scan(&m.Message, &m.Timestamp); err != nil { + return nil, err + } + res = append(res, m) + } + return res, nil +} diff --git a/state_test.db b/state_test.db new file mode 100644 index 0000000000000000000000000000000000000000..3f27e14b54354940fa9752dedaa0685db3bfdfdc GIT binary patch literal 8192 zcmeI#y$ZrG6b0a$AP9o!=2W<&g1GnsR>>9=HG*q|C?Mh~D{ps4m%E~{2UpZp&Q>ZlicLDzitAhMw>TzVu~A#Y y^F^2&t%vVtpM?f*^Ydpz^*$*7=Hb(iKmY;|fB*y_009U<00Izz00bcL!vZgxXe#Rf literal 0 HcmV?d00001