Files
powerdns-proxy/backend/ovh_backend.py
2023-08-06 22:08:20 +02:00

114 lines
3.6 KiB
Python

import configparser
import urllib
import ovh
import json
import ipaddress
def reverse_ip_to_standard(reverse_ip):
parts = reverse_ip.split(".")
parts = parts[:4]
standard_ip = ".".join(reversed(parts))
return standard_ip
def ip_to_block(ip_addr, ip_blocks):
# Remove IPv6 blocks from the list
ip_blocks = [block for block in ip_blocks if ":" not in block]
ip_addr = ipaddress.ip_address(ip_addr) # Convert ip_addr to an IPv4Address object
for block in ip_blocks:
network = ipaddress.ip_network(block)
if ip_addr in network:
print("IP " + str(ip_addr) + " belongs to " + block)
return block
class OvhBackend:
def __init__(self, config_file='config.ini'):
self.client = None
self.config_file = config_file
self.load_credentials()
def load_credentials(self):
config = configparser.ConfigParser()
config.read(self.config_file)
if "OVH" not in config:
raise ValueError("OVH section not found in config.ini")
endpoint = config["OVH"]["endpoint"]
application_key = config["OVH"]["application_key"]
application_secret = config["OVH"]["application_secret"]
consumer_key = config["OVH"]["consumer_key"]
if not endpoint or not application_key or not application_secret or not consumer_key:
raise ValueError("OVH credentials not provided in config.ini")
self.client = ovh.Client(
endpoint=endpoint,
application_key=application_key,
application_secret=application_secret,
consumer_key=consumer_key
)
def update_record(self, reverse_ip, record):
# Check if the record contains the necessary data
if not record or "content" not in record[0]:
print("Invalid record data. 'content' field is missing.")
return None
domain = record[0]["content"].strip(".")
ip_blocks = self.client.get("/ip")
ip_blocks = json.dumps(ip_blocks)
ip_blocks = json.loads(ip_blocks)
ip = reverse_ip_to_standard(reverse_ip)
block = ip_to_block(ip, ip_blocks)
if block is None:
print("Error: no block found for IP " + ip)
return
print("Updating RDNS record for: " + ip + " in block " + block + " to: " + record[0]["content"])
block = urllib.parse.quote(block, safe="")
path = "/ip/" + block + "/reverse"
try:
self.client.post(f"{path}",
ipReverse=ip,
reverse=domain,
)
except ovh.exceptions.APIError as e:
print("Error: " + str(e))
else:
print("Updated RDNS record for: " + ip)
return None
def create_record(self, reverse_ip, record):
self.update_record(reverse_ip, record)
pass
def delete_record(self, reverse_ip):
ip_blocks = self.client.get("/ip")
ip_blocks = json.dumps(ip_blocks)
ip_blocks = json.loads(ip_blocks)
ip = reverse_ip_to_standard(reverse_ip)
block = ip_to_block(ip, ip_blocks)
if block is None:
print("Error: no block found for ip " + ip)
return
result = self.client.delete(f"/ip/{ip}/reverse" + block)
if result.status_code == 200:
print("Deleted RDNS record for: " + ip)
return result.json()
else:
print(f"Failed to delete rDNS data. Status code: {result.status_code}")
print(result.json())
return None
pass