#!/usr/bin/env python3 import json import os import random import time import uuid from datetime import datetime, timezone import paho.mqtt.client as mqtt BROKER = "localhost" PORT = 1883 TOPIC = "lambdaiot" DISCOVERY_TOPIC = f"{TOPIC}/discovery" DNS_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8") def generate_mac() -> str: return ":".join(f"{random.randint(0, 255):02x}" for _ in range(6)) def mac_to_device_uuid(mac: str) -> uuid.UUID: return uuid.uuid5(DNS_NAMESPACE, mac.replace(":", "").lower()) def now_rfc3339() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def parse_actor_value(value): if isinstance(value, bool): return 1 if value else 0 if isinstance(value, (int, float)): return 1 if value != 0 else 0 if isinstance(value, str): if value.strip().lower() in {"true", "on", "1"}: return 1 if value.strip().lower() in {"false", "off", "0"}: return 0 return 0 def parse_actor_float(value): if isinstance(value, bool): return 1.0 if value else 0.0 if isinstance(value, (int, float)): return float(value) if isinstance(value, str): if value.strip().lower() in {"true", "on"}: return 1.0 if value.strip().lower() in {"false", "off"}: return 0.0 try: return float(value) except ValueError: return 0.0 return 0.0 def load_env_file(path: str) -> dict: env = {} try: with open(path, "r", encoding="utf-8") as handle: for line in handle: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) env[key.strip()] = value.strip() except FileNotFoundError: return {} return env class DeviceEmulator: def __init__(self): self.mac_address = generate_mac() self.device_id = mac_to_device_uuid(self.mac_address) self.sensor_id = uuid.uuid5(self.device_id, "sensor-0") self.sensor_bool_id = uuid.uuid5(self.device_id, "sensor-1") self.actor_id = uuid.uuid5(self.device_id, "actor-0") self.actor_float_id = uuid.uuid5(self.device_id, "actor-1") self.actor_value = 0 self.actor_float_value = 0.0 self.temperature = 22.5 self.switch_state = False self.publish_interval = 5 env_path = os.path.join(os.path.dirname(__file__), ".env") env = load_env_file(env_path) self.mqtt_user = env.get("MOSQ_USER", "").strip() self.mqtt_pass = env.get("MOSQ_PASS", "").strip() self.client = mqtt.Client(client_id=f"emulator-{self.device_id}") if self.mqtt_user or self.mqtt_pass: self.client.username_pw_set(self.mqtt_user, self.mqtt_pass) self.client.on_connect = self.on_connect self.client.on_message = self.on_message def discovery_payload(self): return { "mac_address": self.mac_address, "device": { "id": str(self.device_id), "name": "Emulated Device", "description": "Python MQTT emulator", "location": "Localhost", "status_id": 1, }, "sensors": [ { "id": str(self.sensor_id), "name": "Temperature", "type": "DHT22", "data_type_id": 2, }, { "id": str(self.sensor_bool_id), "name": "Presence", "type": "PIR", "data_type_id": 1, } ], "actors": [ { "id": str(self.actor_id), "name": "Binary Switch", "type": "Relay", "data_type_id": 1, }, { "id": str(self.actor_float_id), "name": "Dimmer", "type": "PWM", "data_type_id": 2, } ], } def publish_discovery(self): payload = self.discovery_payload() self.client.publish(DISCOVERY_TOPIC, json.dumps(payload), qos=1) print(f"discovery published: device_id={self.device_id}") def publish_sensor_reading(self): self.temperature = round(self.temperature + random.uniform(-0.5, 0.5), 2) payload = { "type": "sensor_reading", "sensor_id": str(self.sensor_id), "value": self.temperature, "value_at": now_rfc3339(), } self.client.publish(TOPIC, json.dumps(payload), qos=0) print(f"sensor_reading published: {self.temperature}C") def publish_bool_sensor_reading(self): self.switch_state = not self.switch_state payload = { "type": "sensor_reading", "sensor_id": str(self.sensor_bool_id), "value": self.switch_state, "value_at": now_rfc3339(), } self.client.publish(TOPIC, json.dumps(payload), qos=0) print(f"sensor_reading published: presence={self.switch_state}") def publish_device_state(self): payload = { "type": "device_check_response", "device_id": str(self.device_id), "status": "ok", } self.client.publish(TOPIC, json.dumps(payload), qos=0) print("device_check_response published") def on_connect(self, client, userdata, flags, rc): if rc != 0: print(f"mqtt connect failed: rc={rc}") return print("mqtt connected") client.subscribe(TOPIC, qos=0) self.publish_discovery() def on_message(self, client, userdata, msg): try: payload = json.loads(msg.payload.decode("utf-8")) except Exception: return msg_type = str(payload.get("type", "")).strip().lower() if msg_type == "device_check_request": if payload.get("device_id") == str(self.device_id): self.publish_device_state() return if msg_type == "sensor_trigger": if payload.get("sensor_id") == str(self.sensor_id): self.publish_sensor_reading() return if payload.get("sensor_id") == str(self.sensor_bool_id): self.publish_bool_sensor_reading() return if msg_type == "actor_command": if payload.get("actor_id") == str(self.actor_id): self.actor_value = parse_actor_value(payload.get("value")) print(f"actor_command received; state={self.actor_value}") return if payload.get("actor_id") == str(self.actor_float_id): self.actor_float_value = parse_actor_float(payload.get("value")) print(f"actor_command received; dimmer={self.actor_float_value}") return def run(self): self.client.connect(BROKER, PORT, keepalive=30) self.client.loop_start() try: while True: time.sleep(self.publish_interval) self.publish_sensor_reading() self.publish_bool_sensor_reading() finally: self.client.loop_stop() if __name__ == "__main__": DeviceEmulator().run()