Files
lambdaiot-core/test/device_emulator.py

229 lines
7.4 KiB
Python

#!/usr/bin/env python3
import json
import os
import random
import time
import uuid
from datetime import datetime, timezone
import paho.mqtt.client as mqtt
BROKER = "localhost"
PORT = 1883
TOPIC = "lambdaiot"
DISCOVERY_TOPIC = f"{TOPIC}/discovery"
DNS_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
def generate_mac() -> str:
return ":".join(f"{random.randint(0, 255):02x}" for _ in range(6))
def mac_to_device_uuid(mac: str) -> uuid.UUID:
return uuid.uuid5(DNS_NAMESPACE, mac.replace(":", "").lower())
def now_rfc3339() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def parse_actor_value(value):
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, (int, float)):
return 1 if value != 0 else 0
if isinstance(value, str):
if value.strip().lower() in {"true", "on", "1"}:
return 1
if value.strip().lower() in {"false", "off", "0"}:
return 0
return 0
def parse_actor_float(value):
if isinstance(value, bool):
return 1.0 if value else 0.0
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
if value.strip().lower() in {"true", "on"}:
return 1.0
if value.strip().lower() in {"false", "off"}:
return 0.0
try:
return float(value)
except ValueError:
return 0.0
return 0.0
def load_env_file(path: str) -> dict:
env = {}
try:
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
env[key.strip()] = value.strip()
except FileNotFoundError:
return {}
return env
class DeviceEmulator:
def __init__(self):
self.mac_address = generate_mac()
self.device_id = mac_to_device_uuid(self.mac_address)
self.sensor_id = uuid.uuid5(self.device_id, "sensor-0")
self.sensor_bool_id = uuid.uuid5(self.device_id, "sensor-1")
self.actor_id = uuid.uuid5(self.device_id, "actor-0")
self.actor_float_id = uuid.uuid5(self.device_id, "actor-1")
self.actor_value = 0
self.actor_float_value = 0.0
self.temperature = 22.5
self.switch_state = False
self.publish_interval = 5
env_path = os.path.join(os.path.dirname(__file__), ".env")
env = load_env_file(env_path)
self.mqtt_user = env.get("MOSQ_USER", "").strip()
self.mqtt_pass = env.get("MOSQ_PASS", "").strip()
self.client = mqtt.Client(client_id=f"emulator-{self.device_id}")
if self.mqtt_user or self.mqtt_pass:
self.client.username_pw_set(self.mqtt_user, self.mqtt_pass)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def discovery_payload(self):
return {
"mac_address": self.mac_address,
"device": {
"id": str(self.device_id),
"name": "Emulated Device",
"description": "Python MQTT emulator",
"location": "Localhost",
"status_id": 1,
},
"sensors": [
{
"id": str(self.sensor_id),
"name": "Temperature",
"type": "DHT22",
"data_type_id": 2,
},
{
"id": str(self.sensor_bool_id),
"name": "Presence",
"type": "PIR",
"data_type_id": 1,
}
],
"actors": [
{
"id": str(self.actor_id),
"name": "Binary Switch",
"type": "Relay",
"data_type_id": 1,
},
{
"id": str(self.actor_float_id),
"name": "Dimmer",
"type": "PWM",
"data_type_id": 2,
}
],
}
def publish_discovery(self):
payload = self.discovery_payload()
self.client.publish(DISCOVERY_TOPIC, json.dumps(payload), qos=1)
print(f"discovery published: device_id={self.device_id}")
def publish_sensor_reading(self):
self.temperature = round(self.temperature + random.uniform(-0.5, 0.5), 2)
payload = {
"type": "sensor_reading",
"sensor_id": str(self.sensor_id),
"value": self.temperature,
"value_at": now_rfc3339(),
}
self.client.publish(TOPIC, json.dumps(payload), qos=0)
print(f"sensor_reading published: {self.temperature}C")
def publish_bool_sensor_reading(self):
self.switch_state = not self.switch_state
payload = {
"type": "sensor_reading",
"sensor_id": str(self.sensor_bool_id),
"value": self.switch_state,
"value_at": now_rfc3339(),
}
self.client.publish(TOPIC, json.dumps(payload), qos=0)
print(f"sensor_reading published: presence={self.switch_state}")
def publish_device_state(self):
payload = {
"type": "device_check_response",
"device_id": str(self.device_id),
"status": "ok",
}
self.client.publish(TOPIC, json.dumps(payload), qos=0)
print("device_check_response published")
def on_connect(self, client, userdata, flags, rc):
if rc != 0:
print(f"mqtt connect failed: rc={rc}")
return
print("mqtt connected")
client.subscribe(TOPIC, qos=0)
self.publish_discovery()
def on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode("utf-8"))
except Exception:
return
msg_type = str(payload.get("type", "")).strip().lower()
if msg_type == "device_check_request":
if payload.get("device_id") == str(self.device_id):
self.publish_device_state()
return
if msg_type == "sensor_trigger":
if payload.get("sensor_id") == str(self.sensor_id):
self.publish_sensor_reading()
return
if payload.get("sensor_id") == str(self.sensor_bool_id):
self.publish_bool_sensor_reading()
return
if msg_type == "actor_command":
if payload.get("actor_id") == str(self.actor_id):
self.actor_value = parse_actor_value(payload.get("value"))
print(f"actor_command received; state={self.actor_value}")
return
if payload.get("actor_id") == str(self.actor_float_id):
self.actor_float_value = parse_actor_float(payload.get("value"))
print(f"actor_command received; dimmer={self.actor_float_value}")
return
def run(self):
self.client.connect(BROKER, PORT, keepalive=30)
self.client.loop_start()
try:
while True:
time.sleep(self.publish_interval)
self.publish_sensor_reading()
self.publish_bool_sensor_reading()
finally:
self.client.loop_stop()
if __name__ == "__main__":
DeviceEmulator().run()