import socket import time import tomli from colocrossing_api import power_cycle, init # Function to check if an IP address is within the allowed IP ranges def is_valid_ip(ip, allowed_ip_ranges): for ip_range in allowed_ip_ranges: if ip in ip_range: return True return False # Load device ID mappings from map.txt def load_device_mappings(filename): device_mappings = {} try: with open(filename, 'rb') as map_file: map_data = tomli.load(map_file) map_config = map_data['map'] for device_id, colo_id in map_config.items(): device_mappings[device_id] = colo_id except FileNotFoundError: print("Mapping file 'map.txt' not found.") except KeyError: print("Invalid mapping format in 'map.txt'.") print(device_mappings) return device_mappings # Create a socket server def start_server(): # Load configuration from config.txt try: with open('config.txt', 'rb') as config_file: config_data = tomli.load(config_file) access_config = config_data['access'] credentials = config_data['credentials'] # Parse allowed IP ranges from the configuration allowed_ip_ranges = [ ip.strip() for ip in access_config['IP'].split(',') ] print(f"Allowed IP Ranges: {', '.join(allowed_ip_ranges)}") except FileNotFoundError: print("Configuration file 'config.txt' not found.") except KeyError: print("Invalid configuration format in 'config.txt'.") # Load device ID mappings device_mappings = load_device_mappings('map.txt') # Define the IP address and port to listen on host = '0.0.0.0' port = 889 # Create a socket server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Reuse the socket if it is in the TIME_WAIT state server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Bind the socket to the address and port server_socket.bind((host, port)) # Listen for incoming connections server_socket.listen(1) # Allow only one connection at a time print(f"Listening on {host}:{port}") # Initializing the ColoCrossing portal print("Initializing ColoCrossing portal...") init(credentials['Username'], credentials['Password']) while True: # Accept incoming connections client_socket, client_address = server_socket.accept() # Check if the connecting IP is valid client_ip = client_address[0] if is_valid_ip(client_ip, allowed_ip_ranges): print(f"Accepted connection from {client_address}") else: client_socket.close() continue # Receive data from the client as bytes data = client_socket.recv(1024) # Process the data (emulate the checktks function) coloid = checktks(data, device_mappings) # Send a response back to the client if the result is not None if coloid is not None: # Call the ColoCrossing API function with the required data send = power_cycle(credentials, coloid) if send is not None: client_socket.send(send.encode('latin-1')) else: print("Error sending data to ColoCrossing API.") # Close the client socket time.sleep(5) client_socket.close() def checktks(data, device_mappings): # Extract the device ID from the received data device_address_hex = data[7:13] # Convert the device ID bytes to a hexadecimal string and limit it to 6 characters device_address = ''.join(f'{byte:02X}' for byte in device_address_hex)[:6] # Check if the device ID is in the mappings if device_address in device_mappings: # Device ID matches, print the corresponding COLO ID colo_id = device_mappings[device_address] print(f"Found device ID: {device_address}, COLO ID: {colo_id}") return colo_id # Device ID doesn't match, return None return None # Start the server if __name__ == "__main__": start_server()