Files
colocrossing-pdu/main.py
2023-09-07 20:47:02 +02:00

135 lines
4.1 KiB
Python

import socket
import time
import tomli
from colocrossing_api import power_cycle, init
# Function to check if an IP address is within the allowed IP ranges
def is_valid_ip(ip, allowed_ip_ranges):
for ip_range in allowed_ip_ranges:
if ip in ip_range:
return True
return False
# Load device ID mappings from map.txt
def load_device_mappings(filename):
device_mappings = {}
try:
with open(filename, 'rb') as map_file:
map_data = tomli.load(map_file)
map_config = map_data['map']
for device_id, colo_id in map_config.items():
device_mappings[device_id] = colo_id
except FileNotFoundError:
print("Mapping file 'map.txt' not found.")
except KeyError:
print("Invalid mapping format in 'map.txt'.")
print(device_mappings)
return device_mappings
# Create a socket server
def start_server():
# Load configuration from config.txt
try:
with open('config.txt', 'rb') as config_file:
config_data = tomli.load(config_file)
access_config = config_data['access']
credentials = config_data['credentials']
# Parse allowed IP ranges from the configuration
allowed_ip_ranges = [
ip.strip() for ip in access_config['IP'].split(',')
]
print(f"Allowed IP Ranges: {', '.join(allowed_ip_ranges)}")
except FileNotFoundError:
print("Configuration file 'config.txt' not found.")
except KeyError:
print("Invalid configuration format in 'config.txt'.")
# Load device ID mappings
device_mappings = load_device_mappings('map.txt')
# Define the IP address and port to listen on
host = '0.0.0.0'
port = 889
# Create a socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Reuse the socket if it is in the TIME_WAIT state
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Bind the socket to the address and port
server_socket.bind((host, port))
# Listen for incoming connections
server_socket.listen(1) # Allow only one connection at a time
print(f"Listening on {host}:{port}")
# Initializing the ColoCrossing portal
print("Initializing ColoCrossing portal...")
init(credentials['Username'], credentials['Password'])
while True:
# Accept incoming connections
client_socket, client_address = server_socket.accept()
# Check if the connecting IP is valid
client_ip = client_address[0]
if is_valid_ip(client_ip, allowed_ip_ranges):
print(f"Accepted connection from {client_address}")
else:
client_socket.close()
continue
# Receive data from the client as bytes
data = client_socket.recv(1024)
# Process the data (emulate the checktks function)
coloid = checktks(data, device_mappings)
# Send a response back to the client if the result is not None
if coloid is not None:
# Call the ColoCrossing API function with the required data
send = power_cycle(credentials, coloid)
if send is not None:
client_socket.send(send.encode('latin-1'))
else:
print("Error sending data to ColoCrossing API.")
# Close the client socket
time.sleep(5)
client_socket.close()
def checktks(data, device_mappings):
# Extract the device ID from the received data
device_address_hex = data[7:13]
# Convert the device ID bytes to a hexadecimal string and limit it to 6 characters
device_address = ''.join(f'{byte:02X}' for byte in device_address_hex)[:6]
# Check if the device ID is in the mappings
if device_address in device_mappings:
# Device ID matches, print the corresponding COLO ID
colo_id = device_mappings[device_address]
print(f"Found device ID: {device_address}, COLO ID: {colo_id}")
return colo_id
# Device ID doesn't match, return None
return None
# Start the server
if __name__ == "__main__":
start_server()