diff --git a/colocrossing_api.py b/colocrossing_api.py new file mode 100644 index 0000000..9398869 --- /dev/null +++ b/colocrossing_api.py @@ -0,0 +1,67 @@ +# colocrossing_api.py +from selenium import webdriver +from selenium.webdriver.support.ui import Select + +# Initialize the WebDriver (make sure you have the appropriate WebDriver installed) +driver = webdriver.Chrome() # You can use other WebDriver options like Firefox, Edge, etc. + +# URL for login +login_url = "https://portal.colocrossing.com/auth/login" +profile_url = "https://portal.colocrossing.com/devices2/view/profile/" + + +def init(username, password): + # Navigate to the login page + driver.get(login_url) + + # Find the username and password input fields and submit button by 'id' + username_field = driver.find_element(by="id", value="login_username") + password_field = driver.find_element(by="id", value="login_password") + submit_button = driver.find_element(by="id", value="login_button") + + # Fill in the login form + username_field.send_keys(username) + password_field.send_keys(password) + + # Submit the login form + submit_button.click() + + # Wait for a moment to ensure the login process completes (you may need to customize the waiting time) + driver.implicitly_wait(10) # Wait for 10 seconds (adjust as needed) + + # Try Click the Acknowledge button catch the exception if it doesn't exist + # noinspection PyBroadException + try: + accept_button = driver.find_element(by="id", value="acknowledge_button") + accept_button.click() + except Exception: + return "No Acknowledge Button found" + + +def power_cycle(credentials, device_id): + # Navigate to the device profile page + driver.get(profile_url + str(device_id)) + + # Wait for a moment to ensure the page loads (you may need to customize the waiting time) + driver.implicitly_wait(10) # Wait for 10 seconds (adjust as needed) + + # Tick the checkbox with class control_pdu_port_checkbox + checkbox = driver.find_element(by="class name", value="control_pdu_port_checkbox") + checkbox.click() + + # Set the select with name control_action to option 1 + select = driver.find_element(by="name", value="control_action") + select_option = Select(select) + select_option.select_by_value("1") + + # Click the button with class control_pdu_port_button + button = driver.find_element(by="class name", value="control_pdu_port_button") + button.click() + + # Click the button that contains "Confirm" + confirm_button = driver.find_element(by="xpath", value="//button//span[contains(text(), 'Confirm')]") + confirm_button.click() + + # Check if "PDU ports have been turned on." is in the page body + if "PDU ports have been turned on." in driver.page_source: + return "#TO__PCOK" diff --git a/config.txt b/config.txt new file mode 100644 index 0000000..d5dd9c1 --- /dev/null +++ b/config.txt @@ -0,0 +1,6 @@ +[access] +IP = "1.0.0.0/24, 2.3.4.5/32" + +[credentials] +Username = "user@example.com" +Password = "x" diff --git a/main.py b/main.py new file mode 100644 index 0000000..12af4e3 --- /dev/null +++ b/main.py @@ -0,0 +1,134 @@ +import socket +import time +import tomli +from colocrossing_api import power_cycle, init + + +# Function to check if an IP address is within the allowed IP ranges +def is_valid_ip(ip, allowed_ip_ranges): + for ip_range in allowed_ip_ranges: + if ip in ip_range: + return True + return False + + +# Load device ID mappings from map.txt +def load_device_mappings(filename): + device_mappings = {} + + try: + with open(filename, 'rb') as map_file: + map_data = tomli.load(map_file) + map_config = map_data['map'] + + for device_id, colo_id in map_config.items(): + device_mappings[device_id] = colo_id + + except FileNotFoundError: + print("Mapping file 'map.txt' not found.") + except KeyError: + print("Invalid mapping format in 'map.txt'.") + + print(device_mappings) + return device_mappings + + +# Create a socket server +def start_server(): + # Load configuration from config.txt + try: + with open('config.txt', 'rb') as config_file: + config_data = tomli.load(config_file) + access_config = config_data['access'] + credentials = config_data['credentials'] + + # Parse allowed IP ranges from the configuration + allowed_ip_ranges = [ + ip.strip() for ip in access_config['IP'].split(',') + ] + + print(f"Allowed IP Ranges: {', '.join(allowed_ip_ranges)}") + + except FileNotFoundError: + print("Configuration file 'config.txt' not found.") + except KeyError: + print("Invalid configuration format in 'config.txt'.") + + # Load device ID mappings + device_mappings = load_device_mappings('map.txt') + + # Define the IP address and port to listen on + host = '0.0.0.0' + port = 889 + + # Create a socket + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Reuse the socket if it is in the TIME_WAIT state + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # Bind the socket to the address and port + server_socket.bind((host, port)) + + # Listen for incoming connections + server_socket.listen(1) # Allow only one connection at a time + + print(f"Listening on {host}:{port}") + + # Initializing the ColoCrossing portal + print("Initializing ColoCrossing portal...") + init(credentials['Username'], credentials['Password']) + + while True: + # Accept incoming connections + client_socket, client_address = server_socket.accept() + + # Check if the connecting IP is valid + client_ip = client_address[0] + if is_valid_ip(client_ip, allowed_ip_ranges): + print(f"Accepted connection from {client_address}") + else: + client_socket.close() + continue + + # Receive data from the client as bytes + data = client_socket.recv(1024) + + # Process the data (emulate the checktks function) + coloid = checktks(data, device_mappings) + + # Send a response back to the client if the result is not None + if coloid is not None: + # Call the ColoCrossing API function with the required data + send = power_cycle(credentials, coloid) + if send is not None: + client_socket.send(send.encode('latin-1')) + else: + print("Error sending data to ColoCrossing API.") + + # Close the client socket + time.sleep(5) + client_socket.close() + + +def checktks(data, device_mappings): + # Extract the device ID from the received data + device_address_hex = data[7:13] + + # Convert the device ID bytes to a hexadecimal string and limit it to 6 characters + device_address = ''.join(f'{byte:02X}' for byte in device_address_hex)[:6] + + # Check if the device ID is in the mappings + if device_address in device_mappings: + # Device ID matches, print the corresponding COLO ID + colo_id = device_mappings[device_address] + print(f"Found device ID: {device_address}, COLO ID: {colo_id}") + return colo_id + + # Device ID doesn't match, return None + return None + + +# Start the server +if __name__ == "__main__": + start_server() diff --git a/map.txt b/map.txt new file mode 100644 index 0000000..12108c4 --- /dev/null +++ b/map.txt @@ -0,0 +1,5 @@ +# Mapping +# Rebooter ID = ColoCrossing ID +[map] +000001 = "12345" +000002 = "67890" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..69223af --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +selenium==4.12.0 +tomli==2.0.1 \ No newline at end of file