""" Simple Cloudflare dynamic DNS updater. - Pulls current IPv4 from multiple sources and cross-validates. - Every hour, checks for changes and updates any A records that had the old IP. Config (config.ini by default): [cloudflare] api_token = zone_id = [service] ipinfo_url = https://ipinfo.io/ip ; optional poll_seconds = 3600 ; optional tracker_record = _dyndns-last-ip ; TXT record name used as source of truth ; for the last IP this app set; create it ; manually in Cloudflare with the current A ; record IP before first run so the app knows ; what to look for. Environment overrides (take precedence over config file): CLOUDFLARE_API_TOKEN, CLOUDFLARE_ZONE_ID, IPINFO_URL, POLL_SECONDS, CONFIG_PATH, LAST_IP_OVERRIDE, LAST_IP_FILE, TRACKER_RECORD """ from __future__ import annotations import configparser import os import re import sys import time from typing import List, Optional import requests IPINFO_DEFAULT = "https://ipinfo.io/ip" IP_SOURCES = [ "https://ipinfo.io/ip", "https://api.ipify.org", "https://checkip.amazonaws.com", "https://icanhazip.com", ] DEFAULT_POLL_SECONDS = 3600 DEFAULT_LAST_IP_FILE = "last_ip.txt" # Minimum agreeing sources needed to trust an IP result. MIN_AGREEING_SOURCES = 2 _IPV4_RE = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") def is_valid_ipv4(ip: str) -> bool: if not _IPV4_RE.match(ip): return False return all(0 <= int(part) <= 255 for part in ip.split(".")) def read_last_ip_file(path: str) -> Optional[str]: try: ip = open(path).read().strip() return ip if is_valid_ipv4(ip) else None except FileNotFoundError: return None def write_last_ip_file(path: str, ip: str) -> None: try: with open(path, "w") as f: f.write(ip) except OSError as exc: print(f"Warning: could not write last IP file {path!r}: {exc}", file=sys.stderr) def load_config(path: str) -> dict: parser = configparser.ConfigParser() if not os.path.exists(path): return {} parser.read(path) def get_opt(section: str, key: str) -> Optional[str]: if parser.has_option(section, key): value = parser.get(section, key).strip() return value if value else None return None cfg = { "api_token": get_opt("cloudflare", "api_token"), "zone_id": get_opt("cloudflare", "zone_id"), "ipinfo_url": get_opt("service", "ipinfo_url"), "poll_seconds": None, "last_ip_override": get_opt("service", "last_ip_override"), "tracker_record": get_opt("service", "tracker_record"), } poll_raw = get_opt("service", "poll_seconds") if poll_raw is not None: try: cfg["poll_seconds"] = int(poll_raw) except ValueError: raise RuntimeError("poll_seconds must be an integer in config.ini") return cfg def get_public_ip(session: requests.Session, url: str) -> str: resp = session.get(url, timeout=10) resp.raise_for_status() ip = resp.text.strip() if not is_valid_ipv4(ip): raise ValueError(f"Invalid IPv4 from {url!r}: {ip!r}") return ip def get_reliable_ip(session: requests.Session, primary_url: str) -> Optional[str]: """ Query multiple IP sources and return the IP that at least MIN_AGREEING_SOURCES agree on. Returns None if no consensus can be reached. """ sources = [primary_url] + [s for s in IP_SOURCES if s != primary_url] votes: dict[str, int] = {} for url in sources: try: ip = get_public_ip(session, url) votes[ip] = votes.get(ip, 0) + 1 if votes[ip] >= MIN_AGREEING_SOURCES: return ip except Exception as exc: print(f" IP source {url!r} failed: {exc}", file=sys.stderr) if votes: # Fall back to majority if we couldn't reach the threshold. best_ip = max(votes, key=lambda k: votes[k]) print( f" Warning: could not get {MIN_AGREEING_SOURCES} agreeing sources " f"(got {votes}); using {best_ip!r} with {votes[best_ip]} vote(s).", file=sys.stderr, ) return best_ip return None def fetch_records_with_ip( session: requests.Session, token: str, zone_id: str, ip: str, ) -> List[dict]: records: List[dict] = [] page = 1 headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} while True: url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records" params = {"type": "A", "page": page, "per_page": 100, "content": ip} resp = session.get(url, headers=headers, params=params, timeout=15) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError(f"Cloudflare API error: {data}") batch = data.get("result", []) records.extend(batch) if page >= data.get("result_info", {}).get("total_pages", 1): break page += 1 return records def fetch_tracker_record( session: requests.Session, token: str, zone_id: str, name: str, ) -> tuple[Optional[str], Optional[str]]: """Return (record_id, ip_value) from the TXT tracker record, or (None, None).""" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records" resp = session.get(url, headers=headers, params={"type": "TXT", "name": name}, timeout=15) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError(f"Cloudflare API error fetching tracker record: {data}") results = data.get("result", []) if not results: return None, None rec = results[0] # TXT content is wrapped in quotes by Cloudflare. ip = rec["content"].strip('"').strip() return rec["id"], ip if is_valid_ipv4(ip) else None def upsert_tracker_record( session: requests.Session, token: str, zone_id: str, name: str, ip: str, record_id: Optional[str], ) -> None: """Create or update the TXT tracker record with the given IP.""" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} payload = {"type": "TXT", "name": name, "content": ip, "ttl": 60} if record_id: url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}" resp = session.put(url, headers=headers, json=payload, timeout=15) else: url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records" resp = session.post(url, headers=headers, json=payload, timeout=15) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError(f"Failed to upsert tracker record {name!r}: {data}") def update_record_ip( session: requests.Session, token: str, zone_id: str, record: dict, new_ip: str, ) -> None: headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record['id']}" payload = { "type": record.get("type", "A"), "name": record["name"], "content": new_ip, "ttl": record.get("ttl", 1), "proxied": record.get("proxied", False), } resp = session.put(url, headers=headers, json=payload, timeout=15) resp.raise_for_status() data = resp.json() if not data.get("success"): raise RuntimeError(f"Failed to update record {record['name']}: {data}") def main() -> None: config_path = os.getenv("CONFIG_PATH", "config.ini") config = load_config(config_path) token = os.getenv("CLOUDFLARE_API_TOKEN") or config.get("api_token") zone_id = os.getenv("CLOUDFLARE_ZONE_ID") or config.get("zone_id") if not token or not zone_id: print( "Missing Cloudflare credentials. Set env vars or fill config.ini.", file=sys.stderr, ) sys.exit(1) ipinfo_url = os.getenv("IPINFO_URL") or config.get("ipinfo_url") or IPINFO_DEFAULT poll_seconds_env = os.getenv("POLL_SECONDS") if poll_seconds_env: poll_seconds = int(poll_seconds_env) else: poll_seconds = config.get("poll_seconds") or DEFAULT_POLL_SECONDS last_ip_override = os.getenv("LAST_IP_OVERRIDE") or config.get("last_ip_override") last_ip_file = os.getenv("LAST_IP_FILE", DEFAULT_LAST_IP_FILE) tracker_record = os.getenv("TRACKER_RECORD") or config.get("tracker_record") session = requests.Session() # Determine starting last_ip: # explicit override > Cloudflare TXT tracker > persisted file > None tracker_record_id: Optional[str] = None if last_ip_override: last_ip: Optional[str] = last_ip_override elif tracker_record: try: tracker_record_id, last_ip = fetch_tracker_record(session, token, zone_id, tracker_record) except Exception as exc: print(f"Warning: could not read tracker record {tracker_record!r}: {exc}", file=sys.stderr) last_ip = read_last_ip_file(last_ip_file) else: last_ip = read_last_ip_file(last_ip_file) print(f"Starting Cloudflare DDNS watcher (interval={poll_seconds}s)") print(f"Primary IP endpoint: {ipinfo_url} (with {len(IP_SOURCES) - 1} fallbacks)") if tracker_record: print(f"Tracker TXT record: {tracker_record} (id={tracker_record_id or 'not found — will create'})") else: print(f"Last IP file: {last_ip_file}") if last_ip_override: print(f"Starting with overridden last_ip: {last_ip_override}") elif last_ip: print(f"Resuming with last_ip: {last_ip}") try: while True: try: current_ip = get_reliable_ip(session, ipinfo_url) if current_ip is None: print("Could not determine public IP from any source; skipping this cycle.", file=sys.stderr) time.sleep(poll_seconds) continue if last_ip is None: last_ip = current_ip write_last_ip_file(last_ip_file, last_ip) if tracker_record: try: upsert_tracker_record(session, token, zone_id, tracker_record, last_ip, tracker_record_id) print(f"Initial IP: {current_ip} (tracker record set)") except Exception as exc: print(f"Warning: could not set tracker record: {exc}", file=sys.stderr) print(f"Initial IP: {current_ip}") else: print(f"Initial IP: {current_ip}") elif current_ip != last_ip: print(f"IP changed: {last_ip} -> {current_ip}") try: records = fetch_records_with_ip(session, token, zone_id, last_ip) if not records: print( f"No A records matched old IP {last_ip!r}; " "they may have already been updated externally." ) else: for rec in records: update_record_ip(session, token, zone_id, rec, current_ip) print(f"Updated {rec['name']} to {current_ip}") # Advance last_ip regardless — old IP is gone, no point # searching for it again next cycle. last_ip = current_ip write_last_ip_file(last_ip_file, last_ip) if tracker_record: try: upsert_tracker_record(session, token, zone_id, tracker_record, last_ip, tracker_record_id) except Exception as exc: print(f"Warning: could not update tracker record: {exc}", file=sys.stderr) except Exception as api_err: print(f"Cloudflare update failed: {api_err}", file=sys.stderr) # Do NOT advance last_ip; retry on next cycle. else: print(f"No change. Current IP: {current_ip}") except Exception as exc: # Catch-all so a transient error never kills the process. print(f"Unexpected error in poll cycle: {exc}", file=sys.stderr) time.sleep(poll_seconds) except KeyboardInterrupt: print("Stopping watcher.") if __name__ == "__main__": main()