114 lines
3.6 KiB
Python
114 lines
3.6 KiB
Python
import configparser
|
|
import urllib
|
|
|
|
import ovh
|
|
import json
|
|
import ipaddress
|
|
|
|
|
|
def reverse_ip_to_standard(reverse_ip):
|
|
parts = reverse_ip.split(".")
|
|
parts = parts[:4]
|
|
standard_ip = ".".join(reversed(parts))
|
|
|
|
return standard_ip
|
|
|
|
|
|
def ip_to_block(ip_addr, ip_blocks):
|
|
# Remove IPv6 blocks from the list
|
|
ip_blocks = [block for block in ip_blocks if ":" not in block]
|
|
ip_addr = ipaddress.ip_address(ip_addr) # Convert ip_addr to an IPv4Address object
|
|
for block in ip_blocks:
|
|
network = ipaddress.ip_network(block)
|
|
if ip_addr in network:
|
|
print("IP " + str(ip_addr) + " belongs to " + block)
|
|
return block
|
|
|
|
|
|
class OvhBackend:
|
|
def __init__(self, config_file='config.ini'):
|
|
self.client = None
|
|
self.config_file = config_file
|
|
self.load_credentials()
|
|
|
|
def load_credentials(self):
|
|
config = configparser.ConfigParser()
|
|
config.read(self.config_file)
|
|
|
|
if "OVH" not in config:
|
|
raise ValueError("OVH section not found in config.ini")
|
|
|
|
endpoint = config["OVH"]["endpoint"]
|
|
application_key = config["OVH"]["application_key"]
|
|
application_secret = config["OVH"]["application_secret"]
|
|
consumer_key = config["OVH"]["consumer_key"]
|
|
|
|
if not endpoint or not application_key or not application_secret or not consumer_key:
|
|
raise ValueError("OVH credentials not provided in config.ini")
|
|
|
|
self.client = ovh.Client(
|
|
endpoint=endpoint,
|
|
application_key=application_key,
|
|
application_secret=application_secret,
|
|
consumer_key=consumer_key
|
|
)
|
|
|
|
def update_record(self, reverse_ip, record):
|
|
# Check if the record contains the necessary data
|
|
if not record or "content" not in record[0]:
|
|
print("Invalid record data. 'content' field is missing.")
|
|
return None
|
|
|
|
domain = record[0]["content"].strip(".")
|
|
|
|
ip_blocks = self.client.get("/ip")
|
|
ip_blocks = json.dumps(ip_blocks)
|
|
ip_blocks = json.loads(ip_blocks)
|
|
|
|
ip = reverse_ip_to_standard(reverse_ip)
|
|
block = ip_to_block(ip, ip_blocks)
|
|
|
|
if block is None:
|
|
print("Error: no block found for IP " + ip)
|
|
return
|
|
|
|
print("Updating RDNS record for: " + ip + " in block " + block + " to: " + record[0]["content"])
|
|
block = urllib.parse.quote(block, safe="")
|
|
path = "/ip/" + block + "/reverse"
|
|
try:
|
|
self.client.post(f"{path}",
|
|
ipReverse=ip,
|
|
reverse=domain,
|
|
)
|
|
except ovh.exceptions.APIError as e:
|
|
print("Error: " + str(e))
|
|
else:
|
|
print("Updated RDNS record for: " + ip)
|
|
return None
|
|
|
|
def create_record(self, reverse_ip, record):
|
|
self.update_record(reverse_ip, record)
|
|
pass
|
|
|
|
def delete_record(self, reverse_ip):
|
|
ip_blocks = self.client.get("/ip")
|
|
ip_blocks = json.dumps(ip_blocks)
|
|
ip_blocks = json.loads(ip_blocks)
|
|
|
|
ip = reverse_ip_to_standard(reverse_ip)
|
|
block = ip_to_block(ip, ip_blocks)
|
|
|
|
if block is None:
|
|
print("Error: no block found for ip " + ip)
|
|
return
|
|
|
|
result = self.client.delete(f"/ip/{ip}/reverse" + block)
|
|
if result.status_code == 200:
|
|
print("Deleted RDNS record for: " + ip)
|
|
return result.json()
|
|
else:
|
|
print(f"Failed to delete rDNS data. Status code: {result.status_code}")
|
|
print(result.json())
|
|
return None
|
|
pass
|