183 lines
5.9 KiB
Python
183 lines
5.9 KiB
Python
"""
|
|
Simple Cloudflare dynamic DNS updater.
|
|
|
|
- Pulls current IPv4 from ipinfo.io (or a fallback service).
|
|
- Every hour, checks for changes and updates any A records that had the old IP.
|
|
|
|
Config (config.ini by default):
|
|
[cloudflare]
|
|
api_token = <token>
|
|
zone_id = <zone id>
|
|
|
|
[service]
|
|
ipinfo_url = https://ipinfo.io/ip ; optional
|
|
poll_seconds = 3600 ; optional
|
|
|
|
Environment overrides (take precedence over config file):
|
|
CLOUDFLARE_API_TOKEN, CLOUDFLARE_ZONE_ID, IPINFO_URL, POLL_SECONDS, CONFIG_PATH, LAST_IP_OVERRIDE
|
|
|
|
Install dependency:
|
|
pip install requests
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import configparser
|
|
import os
|
|
import sys
|
|
import time
|
|
from typing import List, Optional
|
|
|
|
import requests
|
|
|
|
IPINFO_DEFAULT = "https://ipinfo.io/ip"
|
|
FALLBACK_URL = "https://api.ipify.org"
|
|
DEFAULT_POLL_SECONDS = 3600
|
|
|
|
|
|
def load_config(path: str) -> dict:
|
|
parser = configparser.ConfigParser()
|
|
if not os.path.exists(path):
|
|
return {}
|
|
|
|
parser.read(path)
|
|
|
|
def get_opt(section: str, key: str) -> Optional[str]:
|
|
if parser.has_option(section, key):
|
|
value = parser.get(section, key).strip()
|
|
return value if value else None
|
|
return None
|
|
|
|
cfg = {
|
|
"api_token": get_opt("cloudflare", "api_token"),
|
|
"zone_id": get_opt("cloudflare", "zone_id"),
|
|
"ipinfo_url": get_opt("service", "ipinfo_url"),
|
|
"poll_seconds": None,
|
|
"last_ip_override": get_opt("service", "last_ip_override"),
|
|
}
|
|
|
|
poll_raw = get_opt("service", "poll_seconds")
|
|
if poll_raw is not None:
|
|
try:
|
|
cfg["poll_seconds"] = int(poll_raw)
|
|
except ValueError:
|
|
raise RuntimeError("poll_seconds must be an integer in config.ini")
|
|
|
|
return cfg
|
|
|
|
|
|
def get_public_ip(session: requests.Session, url: str) -> str:
|
|
resp = session.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
return resp.text.strip()
|
|
|
|
|
|
def fetch_records_with_ip(
|
|
session: requests.Session,
|
|
token: str,
|
|
zone_id: str,
|
|
ip: str,
|
|
) -> List[dict]:
|
|
# Fetch all A records matching the old IP so we can update them.
|
|
records: List[dict] = []
|
|
page = 1
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
while True:
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
|
|
params = {"type": "A", "page": page, "per_page": 100, "content": ip}
|
|
resp = session.get(url, headers=headers, params=params, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"Cloudflare API error: {data}")
|
|
batch = data.get("result", [])
|
|
records.extend(batch)
|
|
if page >= data.get("result_info", {}).get("total_pages", 1):
|
|
break
|
|
page += 1
|
|
return records
|
|
|
|
|
|
def update_record_ip(
|
|
session: requests.Session,
|
|
token: str,
|
|
zone_id: str,
|
|
record: dict,
|
|
new_ip: str,
|
|
) -> None:
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record['id']}"
|
|
payload = {
|
|
"type": record.get("type", "A"),
|
|
"name": record["name"],
|
|
"content": new_ip,
|
|
"ttl": record.get("ttl", 1),
|
|
"proxied": record.get("proxied", False),
|
|
}
|
|
resp = session.put(url, headers=headers, json=payload, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"Failed to update record {record['name']}: {data}")
|
|
|
|
|
|
def main() -> None:
|
|
config_path = os.getenv("CONFIG_PATH", "config.ini")
|
|
config = load_config(config_path)
|
|
|
|
token = os.getenv("CLOUDFLARE_API_TOKEN") or config.get("api_token")
|
|
zone_id = os.getenv("CLOUDFLARE_ZONE_ID") or config.get("zone_id")
|
|
if not token or not zone_id:
|
|
print(
|
|
"Missing Cloudflare credentials. Set env vars or fill config.ini.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
ipinfo_url = os.getenv("IPINFO_URL") or config.get("ipinfo_url") or IPINFO_DEFAULT
|
|
poll_seconds = os.getenv("POLL_SECONDS")
|
|
poll_seconds = int(poll_seconds) if poll_seconds else config.get("poll_seconds", DEFAULT_POLL_SECONDS)
|
|
last_ip_override = os.getenv("LAST_IP_OVERRIDE") or config.get("last_ip_override")
|
|
|
|
session = requests.Session()
|
|
last_ip: Optional[str] = last_ip_override
|
|
|
|
print(f"Starting Cloudflare DDNS watcher (interval={poll_seconds}s)")
|
|
print(f"Using IP endpoint: {ipinfo_url}")
|
|
if last_ip_override:
|
|
print(f"Starting with overridden last_ip: {last_ip_override}")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
current_ip = get_public_ip(session, ipinfo_url)
|
|
except Exception:
|
|
# Fallback if primary service fails.
|
|
current_ip = get_public_ip(session, FALLBACK_URL)
|
|
|
|
if last_ip is None:
|
|
last_ip = current_ip
|
|
print(f"Initial IP: {current_ip}")
|
|
elif current_ip != last_ip:
|
|
print(f"IP changed: {last_ip} -> {current_ip}")
|
|
try:
|
|
records = fetch_records_with_ip(session, token, zone_id, last_ip)
|
|
if not records:
|
|
print("No records matched old IP; nothing to update.")
|
|
else:
|
|
for rec in records:
|
|
update_record_ip(session, token, zone_id, rec, current_ip)
|
|
print(f"Updated {rec['name']} to {current_ip}")
|
|
last_ip = current_ip
|
|
except Exception as api_err:
|
|
print(f"Cloudflare update failed: {api_err}", file=sys.stderr)
|
|
else:
|
|
print(f"No change. Current IP: {current_ip}")
|
|
|
|
time.sleep(poll_seconds)
|
|
except KeyboardInterrupt:
|
|
print("Stopping watcher.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|