Initial commit
This commit is contained in:
@@ -0,0 +1,134 @@
|
||||
import socket
|
||||
import time
|
||||
import tomli
|
||||
from colocrossing_api import power_cycle, init
|
||||
|
||||
|
||||
# Function to check if an IP address is within the allowed IP ranges
|
||||
def is_valid_ip(ip, allowed_ip_ranges):
|
||||
for ip_range in allowed_ip_ranges:
|
||||
if ip in ip_range:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Load device ID mappings from map.txt
|
||||
def load_device_mappings(filename):
|
||||
device_mappings = {}
|
||||
|
||||
try:
|
||||
with open(filename, 'rb') as map_file:
|
||||
map_data = tomli.load(map_file)
|
||||
map_config = map_data['map']
|
||||
|
||||
for device_id, colo_id in map_config.items():
|
||||
device_mappings[device_id] = colo_id
|
||||
|
||||
except FileNotFoundError:
|
||||
print("Mapping file 'map.txt' not found.")
|
||||
except KeyError:
|
||||
print("Invalid mapping format in 'map.txt'.")
|
||||
|
||||
print(device_mappings)
|
||||
return device_mappings
|
||||
|
||||
|
||||
# Create a socket server
|
||||
def start_server():
|
||||
# Load configuration from config.txt
|
||||
try:
|
||||
with open('config.txt', 'rb') as config_file:
|
||||
config_data = tomli.load(config_file)
|
||||
access_config = config_data['access']
|
||||
credentials = config_data['credentials']
|
||||
|
||||
# Parse allowed IP ranges from the configuration
|
||||
allowed_ip_ranges = [
|
||||
ip.strip() for ip in access_config['IP'].split(',')
|
||||
]
|
||||
|
||||
print(f"Allowed IP Ranges: {', '.join(allowed_ip_ranges)}")
|
||||
|
||||
except FileNotFoundError:
|
||||
print("Configuration file 'config.txt' not found.")
|
||||
except KeyError:
|
||||
print("Invalid configuration format in 'config.txt'.")
|
||||
|
||||
# Load device ID mappings
|
||||
device_mappings = load_device_mappings('map.txt')
|
||||
|
||||
# Define the IP address and port to listen on
|
||||
host = '0.0.0.0'
|
||||
port = 889
|
||||
|
||||
# Create a socket
|
||||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
# Reuse the socket if it is in the TIME_WAIT state
|
||||
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
# Bind the socket to the address and port
|
||||
server_socket.bind((host, port))
|
||||
|
||||
# Listen for incoming connections
|
||||
server_socket.listen(1) # Allow only one connection at a time
|
||||
|
||||
print(f"Listening on {host}:{port}")
|
||||
|
||||
# Initializing the ColoCrossing portal
|
||||
print("Initializing ColoCrossing portal...")
|
||||
init(credentials['Username'], credentials['Password'])
|
||||
|
||||
while True:
|
||||
# Accept incoming connections
|
||||
client_socket, client_address = server_socket.accept()
|
||||
|
||||
# Check if the connecting IP is valid
|
||||
client_ip = client_address[0]
|
||||
if is_valid_ip(client_ip, allowed_ip_ranges):
|
||||
print(f"Accepted connection from {client_address}")
|
||||
else:
|
||||
client_socket.close()
|
||||
continue
|
||||
|
||||
# Receive data from the client as bytes
|
||||
data = client_socket.recv(1024)
|
||||
|
||||
# Process the data (emulate the checktks function)
|
||||
coloid = checktks(data, device_mappings)
|
||||
|
||||
# Send a response back to the client if the result is not None
|
||||
if coloid is not None:
|
||||
# Call the ColoCrossing API function with the required data
|
||||
send = power_cycle(credentials, coloid)
|
||||
if send is not None:
|
||||
client_socket.send(send.encode('latin-1'))
|
||||
else:
|
||||
print("Error sending data to ColoCrossing API.")
|
||||
|
||||
# Close the client socket
|
||||
time.sleep(5)
|
||||
client_socket.close()
|
||||
|
||||
|
||||
def checktks(data, device_mappings):
|
||||
# Extract the device ID from the received data
|
||||
device_address_hex = data[7:13]
|
||||
|
||||
# Convert the device ID bytes to a hexadecimal string and limit it to 6 characters
|
||||
device_address = ''.join(f'{byte:02X}' for byte in device_address_hex)[:6]
|
||||
|
||||
# Check if the device ID is in the mappings
|
||||
if device_address in device_mappings:
|
||||
# Device ID matches, print the corresponding COLO ID
|
||||
colo_id = device_mappings[device_address]
|
||||
print(f"Found device ID: {device_address}, COLO ID: {colo_id}")
|
||||
return colo_id
|
||||
|
||||
# Device ID doesn't match, return None
|
||||
return None
|
||||
|
||||
|
||||
# Start the server
|
||||
if __name__ == "__main__":
|
||||
start_server()
|
||||
Reference in New Issue
Block a user