Files
cloudflare-dyndns/cloudflare_dyndns.py

349 lines
13 KiB
Python

"""
Simple Cloudflare dynamic DNS updater.
- Pulls current IPv4 from multiple sources and cross-validates.
- Every hour, checks for changes and updates any A records that had the old IP.
Config (config.ini by default):
[cloudflare]
api_token = <token>
zone_id = <zone id>
[service]
ipinfo_url = https://ipinfo.io/ip ; optional
poll_seconds = 3600 ; optional
tracker_record = _dyndns-last-ip ; TXT record name used as source of truth
; for the last IP this app set; create it
; manually in Cloudflare with the current A
; record IP before first run so the app knows
; what to look for.
Environment overrides (take precedence over config file):
CLOUDFLARE_API_TOKEN, CLOUDFLARE_ZONE_ID, IPINFO_URL, POLL_SECONDS, CONFIG_PATH,
LAST_IP_OVERRIDE, LAST_IP_FILE, TRACKER_RECORD
"""
from __future__ import annotations
import configparser
import os
import re
import sys
import time
from typing import List, Optional
import requests
IPINFO_DEFAULT = "https://ipinfo.io/ip"
IP_SOURCES = [
"https://ipinfo.io/ip",
"https://api.ipify.org",
"https://checkip.amazonaws.com",
"https://icanhazip.com",
]
DEFAULT_POLL_SECONDS = 3600
DEFAULT_LAST_IP_FILE = "last_ip.txt"
# Minimum agreeing sources needed to trust an IP result.
MIN_AGREEING_SOURCES = 2
_IPV4_RE = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
def is_valid_ipv4(ip: str) -> bool:
if not _IPV4_RE.match(ip):
return False
return all(0 <= int(part) <= 255 for part in ip.split("."))
def read_last_ip_file(path: str) -> Optional[str]:
try:
ip = open(path).read().strip()
return ip if is_valid_ipv4(ip) else None
except FileNotFoundError:
return None
def write_last_ip_file(path: str, ip: str) -> None:
try:
with open(path, "w") as f:
f.write(ip)
except OSError as exc:
print(f"Warning: could not write last IP file {path!r}: {exc}", file=sys.stderr)
def load_config(path: str) -> dict:
parser = configparser.ConfigParser()
if not os.path.exists(path):
return {}
parser.read(path)
def get_opt(section: str, key: str) -> Optional[str]:
if parser.has_option(section, key):
value = parser.get(section, key).strip()
return value if value else None
return None
cfg = {
"api_token": get_opt("cloudflare", "api_token"),
"zone_id": get_opt("cloudflare", "zone_id"),
"ipinfo_url": get_opt("service", "ipinfo_url"),
"poll_seconds": None,
"last_ip_override": get_opt("service", "last_ip_override"),
"tracker_record": get_opt("service", "tracker_record"),
}
poll_raw = get_opt("service", "poll_seconds")
if poll_raw is not None:
try:
cfg["poll_seconds"] = int(poll_raw)
except ValueError:
raise RuntimeError("poll_seconds must be an integer in config.ini")
return cfg
def get_public_ip(session: requests.Session, url: str) -> str:
resp = session.get(url, timeout=10)
resp.raise_for_status()
ip = resp.text.strip()
if not is_valid_ipv4(ip):
raise ValueError(f"Invalid IPv4 from {url!r}: {ip!r}")
return ip
def get_reliable_ip(session: requests.Session, primary_url: str) -> Optional[str]:
"""
Query multiple IP sources and return the IP that at least MIN_AGREEING_SOURCES
agree on. Returns None if no consensus can be reached.
"""
sources = [primary_url] + [s for s in IP_SOURCES if s != primary_url]
votes: dict[str, int] = {}
for url in sources:
try:
ip = get_public_ip(session, url)
votes[ip] = votes.get(ip, 0) + 1
if votes[ip] >= MIN_AGREEING_SOURCES:
return ip
except Exception as exc:
print(f" IP source {url!r} failed: {exc}", file=sys.stderr)
if votes:
# Fall back to majority if we couldn't reach the threshold.
best_ip = max(votes, key=lambda k: votes[k])
print(
f" Warning: could not get {MIN_AGREEING_SOURCES} agreeing sources "
f"(got {votes}); using {best_ip!r} with {votes[best_ip]} vote(s).",
file=sys.stderr,
)
return best_ip
return None
def fetch_records_with_ip(
session: requests.Session,
token: str,
zone_id: str,
ip: str,
) -> List[dict]:
records: List[dict] = []
page = 1
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
while True:
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
params = {"type": "A", "page": page, "per_page": 100, "content": ip}
resp = session.get(url, headers=headers, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"Cloudflare API error: {data}")
batch = data.get("result", [])
records.extend(batch)
if page >= data.get("result_info", {}).get("total_pages", 1):
break
page += 1
return records
def fetch_tracker_record(
session: requests.Session,
token: str,
zone_id: str,
name: str,
) -> tuple[Optional[str], Optional[str]]:
"""Return (record_id, ip_value) from the TXT tracker record, or (None, None)."""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
resp = session.get(url, headers=headers, params={"type": "TXT", "name": name}, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"Cloudflare API error fetching tracker record: {data}")
results = data.get("result", [])
if not results:
return None, None
rec = results[0]
# TXT content is wrapped in quotes by Cloudflare.
ip = rec["content"].strip('"').strip()
return rec["id"], ip if is_valid_ipv4(ip) else None
def upsert_tracker_record(
session: requests.Session,
token: str,
zone_id: str,
name: str,
ip: str,
record_id: Optional[str],
) -> None:
"""Create or update the TXT tracker record with the given IP."""
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
payload = {"type": "TXT", "name": name, "content": ip, "ttl": 60}
if record_id:
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}"
resp = session.put(url, headers=headers, json=payload, timeout=15)
else:
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
resp = session.post(url, headers=headers, json=payload, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"Failed to upsert tracker record {name!r}: {data}")
def update_record_ip(
session: requests.Session,
token: str,
zone_id: str,
record: dict,
new_ip: str,
) -> None:
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record['id']}"
payload = {
"type": record.get("type", "A"),
"name": record["name"],
"content": new_ip,
"ttl": record.get("ttl", 1),
"proxied": record.get("proxied", False),
}
resp = session.put(url, headers=headers, json=payload, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data.get("success"):
raise RuntimeError(f"Failed to update record {record['name']}: {data}")
def main() -> None:
config_path = os.getenv("CONFIG_PATH", "config.ini")
config = load_config(config_path)
token = os.getenv("CLOUDFLARE_API_TOKEN") or config.get("api_token")
zone_id = os.getenv("CLOUDFLARE_ZONE_ID") or config.get("zone_id")
if not token or not zone_id:
print(
"Missing Cloudflare credentials. Set env vars or fill config.ini.",
file=sys.stderr,
)
sys.exit(1)
ipinfo_url = os.getenv("IPINFO_URL") or config.get("ipinfo_url") or IPINFO_DEFAULT
poll_seconds_env = os.getenv("POLL_SECONDS")
if poll_seconds_env:
poll_seconds = int(poll_seconds_env)
else:
poll_seconds = config.get("poll_seconds") or DEFAULT_POLL_SECONDS
last_ip_override = os.getenv("LAST_IP_OVERRIDE") or config.get("last_ip_override")
last_ip_file = os.getenv("LAST_IP_FILE", DEFAULT_LAST_IP_FILE)
tracker_record = os.getenv("TRACKER_RECORD") or config.get("tracker_record")
session = requests.Session()
# Determine starting last_ip:
# explicit override > Cloudflare TXT tracker > persisted file > None
tracker_record_id: Optional[str] = None
if last_ip_override:
last_ip: Optional[str] = last_ip_override
elif tracker_record:
try:
tracker_record_id, last_ip = fetch_tracker_record(session, token, zone_id, tracker_record)
except Exception as exc:
print(f"Warning: could not read tracker record {tracker_record!r}: {exc}", file=sys.stderr)
last_ip = read_last_ip_file(last_ip_file)
else:
last_ip = read_last_ip_file(last_ip_file)
print(f"Starting Cloudflare DDNS watcher (interval={poll_seconds}s)")
print(f"Primary IP endpoint: {ipinfo_url} (with {len(IP_SOURCES) - 1} fallbacks)")
if tracker_record:
print(f"Tracker TXT record: {tracker_record} (id={tracker_record_id or 'not found — will create'})")
else:
print(f"Last IP file: {last_ip_file}")
if last_ip_override:
print(f"Starting with overridden last_ip: {last_ip_override}")
elif last_ip:
print(f"Resuming with last_ip: {last_ip}")
try:
while True:
try:
current_ip = get_reliable_ip(session, ipinfo_url)
if current_ip is None:
print("Could not determine public IP from any source; skipping this cycle.", file=sys.stderr)
time.sleep(poll_seconds)
continue
if last_ip is None:
last_ip = current_ip
write_last_ip_file(last_ip_file, last_ip)
if tracker_record:
try:
upsert_tracker_record(session, token, zone_id, tracker_record, last_ip, tracker_record_id)
print(f"Initial IP: {current_ip} (tracker record set)")
except Exception as exc:
print(f"Warning: could not set tracker record: {exc}", file=sys.stderr)
print(f"Initial IP: {current_ip}")
else:
print(f"Initial IP: {current_ip}")
elif current_ip != last_ip:
print(f"IP changed: {last_ip} -> {current_ip}")
try:
records = fetch_records_with_ip(session, token, zone_id, last_ip)
if not records:
print(
f"No A records matched old IP {last_ip!r}; "
"they may have already been updated externally."
)
else:
for rec in records:
update_record_ip(session, token, zone_id, rec, current_ip)
print(f"Updated {rec['name']} to {current_ip}")
# Advance last_ip regardless — old IP is gone, no point
# searching for it again next cycle.
last_ip = current_ip
write_last_ip_file(last_ip_file, last_ip)
if tracker_record:
try:
upsert_tracker_record(session, token, zone_id, tracker_record, last_ip, tracker_record_id)
except Exception as exc:
print(f"Warning: could not update tracker record: {exc}", file=sys.stderr)
except Exception as api_err:
print(f"Cloudflare update failed: {api_err}", file=sys.stderr)
# Do NOT advance last_ip; retry on next cycle.
else:
print(f"No change. Current IP: {current_ip}")
except Exception as exc:
# Catch-all so a transient error never kills the process.
print(f"Unexpected error in poll cycle: {exc}", file=sys.stderr)
time.sleep(poll_seconds)
except KeyboardInterrupt:
print("Stopping watcher.")
if __name__ == "__main__":
main()