349 lines
13 KiB
Python
349 lines
13 KiB
Python
"""
|
|
Simple Cloudflare dynamic DNS updater.
|
|
|
|
- Pulls current IPv4 from multiple sources and cross-validates.
|
|
- Every hour, checks for changes and updates any A records that had the old IP.
|
|
|
|
Config (config.ini by default):
|
|
[cloudflare]
|
|
api_token = <token>
|
|
zone_id = <zone id>
|
|
|
|
[service]
|
|
ipinfo_url = https://ipinfo.io/ip ; optional
|
|
poll_seconds = 3600 ; optional
|
|
tracker_record = _dyndns-last-ip ; TXT record name used as source of truth
|
|
; for the last IP this app set; create it
|
|
; manually in Cloudflare with the current A
|
|
; record IP before first run so the app knows
|
|
; what to look for.
|
|
|
|
Environment overrides (take precedence over config file):
|
|
CLOUDFLARE_API_TOKEN, CLOUDFLARE_ZONE_ID, IPINFO_URL, POLL_SECONDS, CONFIG_PATH,
|
|
LAST_IP_OVERRIDE, LAST_IP_FILE, TRACKER_RECORD
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import configparser
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
from typing import List, Optional
|
|
|
|
import requests
|
|
|
|
IPINFO_DEFAULT = "https://ipinfo.io/ip"
|
|
IP_SOURCES = [
|
|
"https://ipinfo.io/ip",
|
|
"https://api.ipify.org",
|
|
"https://checkip.amazonaws.com",
|
|
"https://icanhazip.com",
|
|
]
|
|
DEFAULT_POLL_SECONDS = 3600
|
|
DEFAULT_LAST_IP_FILE = "last_ip.txt"
|
|
# Minimum agreeing sources needed to trust an IP result.
|
|
MIN_AGREEING_SOURCES = 2
|
|
|
|
_IPV4_RE = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
|
|
|
|
|
|
def is_valid_ipv4(ip: str) -> bool:
|
|
if not _IPV4_RE.match(ip):
|
|
return False
|
|
return all(0 <= int(part) <= 255 for part in ip.split("."))
|
|
|
|
|
|
def read_last_ip_file(path: str) -> Optional[str]:
|
|
try:
|
|
ip = open(path).read().strip()
|
|
return ip if is_valid_ipv4(ip) else None
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
def write_last_ip_file(path: str, ip: str) -> None:
|
|
try:
|
|
with open(path, "w") as f:
|
|
f.write(ip)
|
|
except OSError as exc:
|
|
print(f"Warning: could not write last IP file {path!r}: {exc}", file=sys.stderr)
|
|
|
|
|
|
def load_config(path: str) -> dict:
|
|
parser = configparser.ConfigParser()
|
|
if not os.path.exists(path):
|
|
return {}
|
|
|
|
parser.read(path)
|
|
|
|
def get_opt(section: str, key: str) -> Optional[str]:
|
|
if parser.has_option(section, key):
|
|
value = parser.get(section, key).strip()
|
|
return value if value else None
|
|
return None
|
|
|
|
cfg = {
|
|
"api_token": get_opt("cloudflare", "api_token"),
|
|
"zone_id": get_opt("cloudflare", "zone_id"),
|
|
"ipinfo_url": get_opt("service", "ipinfo_url"),
|
|
"poll_seconds": None,
|
|
"last_ip_override": get_opt("service", "last_ip_override"),
|
|
"tracker_record": get_opt("service", "tracker_record"),
|
|
}
|
|
|
|
poll_raw = get_opt("service", "poll_seconds")
|
|
if poll_raw is not None:
|
|
try:
|
|
cfg["poll_seconds"] = int(poll_raw)
|
|
except ValueError:
|
|
raise RuntimeError("poll_seconds must be an integer in config.ini")
|
|
|
|
return cfg
|
|
|
|
|
|
def get_public_ip(session: requests.Session, url: str) -> str:
|
|
resp = session.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
ip = resp.text.strip()
|
|
if not is_valid_ipv4(ip):
|
|
raise ValueError(f"Invalid IPv4 from {url!r}: {ip!r}")
|
|
return ip
|
|
|
|
|
|
def get_reliable_ip(session: requests.Session, primary_url: str) -> Optional[str]:
|
|
"""
|
|
Query multiple IP sources and return the IP that at least MIN_AGREEING_SOURCES
|
|
agree on. Returns None if no consensus can be reached.
|
|
"""
|
|
sources = [primary_url] + [s for s in IP_SOURCES if s != primary_url]
|
|
votes: dict[str, int] = {}
|
|
|
|
for url in sources:
|
|
try:
|
|
ip = get_public_ip(session, url)
|
|
votes[ip] = votes.get(ip, 0) + 1
|
|
if votes[ip] >= MIN_AGREEING_SOURCES:
|
|
return ip
|
|
except Exception as exc:
|
|
print(f" IP source {url!r} failed: {exc}", file=sys.stderr)
|
|
|
|
if votes:
|
|
# Fall back to majority if we couldn't reach the threshold.
|
|
best_ip = max(votes, key=lambda k: votes[k])
|
|
print(
|
|
f" Warning: could not get {MIN_AGREEING_SOURCES} agreeing sources "
|
|
f"(got {votes}); using {best_ip!r} with {votes[best_ip]} vote(s).",
|
|
file=sys.stderr,
|
|
)
|
|
return best_ip
|
|
|
|
return None
|
|
|
|
|
|
def fetch_records_with_ip(
|
|
session: requests.Session,
|
|
token: str,
|
|
zone_id: str,
|
|
ip: str,
|
|
) -> List[dict]:
|
|
records: List[dict] = []
|
|
page = 1
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
while True:
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
|
|
params = {"type": "A", "page": page, "per_page": 100, "content": ip}
|
|
resp = session.get(url, headers=headers, params=params, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"Cloudflare API error: {data}")
|
|
batch = data.get("result", [])
|
|
records.extend(batch)
|
|
if page >= data.get("result_info", {}).get("total_pages", 1):
|
|
break
|
|
page += 1
|
|
return records
|
|
|
|
|
|
def fetch_tracker_record(
|
|
session: requests.Session,
|
|
token: str,
|
|
zone_id: str,
|
|
name: str,
|
|
) -> tuple[Optional[str], Optional[str]]:
|
|
"""Return (record_id, ip_value) from the TXT tracker record, or (None, None)."""
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
|
|
resp = session.get(url, headers=headers, params={"type": "TXT", "name": name}, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"Cloudflare API error fetching tracker record: {data}")
|
|
results = data.get("result", [])
|
|
if not results:
|
|
return None, None
|
|
rec = results[0]
|
|
# TXT content is wrapped in quotes by Cloudflare.
|
|
ip = rec["content"].strip('"').strip()
|
|
return rec["id"], ip if is_valid_ipv4(ip) else None
|
|
|
|
|
|
def upsert_tracker_record(
|
|
session: requests.Session,
|
|
token: str,
|
|
zone_id: str,
|
|
name: str,
|
|
ip: str,
|
|
record_id: Optional[str],
|
|
) -> None:
|
|
"""Create or update the TXT tracker record with the given IP."""
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
payload = {"type": "TXT", "name": name, "content": ip, "ttl": 60}
|
|
if record_id:
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record_id}"
|
|
resp = session.put(url, headers=headers, json=payload, timeout=15)
|
|
else:
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records"
|
|
resp = session.post(url, headers=headers, json=payload, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"Failed to upsert tracker record {name!r}: {data}")
|
|
|
|
|
|
def update_record_ip(
|
|
session: requests.Session,
|
|
token: str,
|
|
zone_id: str,
|
|
record: dict,
|
|
new_ip: str,
|
|
) -> None:
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
url = f"https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{record['id']}"
|
|
payload = {
|
|
"type": record.get("type", "A"),
|
|
"name": record["name"],
|
|
"content": new_ip,
|
|
"ttl": record.get("ttl", 1),
|
|
"proxied": record.get("proxied", False),
|
|
}
|
|
resp = session.put(url, headers=headers, json=payload, timeout=15)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not data.get("success"):
|
|
raise RuntimeError(f"Failed to update record {record['name']}: {data}")
|
|
|
|
|
|
def main() -> None:
|
|
config_path = os.getenv("CONFIG_PATH", "config.ini")
|
|
config = load_config(config_path)
|
|
|
|
token = os.getenv("CLOUDFLARE_API_TOKEN") or config.get("api_token")
|
|
zone_id = os.getenv("CLOUDFLARE_ZONE_ID") or config.get("zone_id")
|
|
if not token or not zone_id:
|
|
print(
|
|
"Missing Cloudflare credentials. Set env vars or fill config.ini.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
ipinfo_url = os.getenv("IPINFO_URL") or config.get("ipinfo_url") or IPINFO_DEFAULT
|
|
poll_seconds_env = os.getenv("POLL_SECONDS")
|
|
if poll_seconds_env:
|
|
poll_seconds = int(poll_seconds_env)
|
|
else:
|
|
poll_seconds = config.get("poll_seconds") or DEFAULT_POLL_SECONDS
|
|
|
|
last_ip_override = os.getenv("LAST_IP_OVERRIDE") or config.get("last_ip_override")
|
|
last_ip_file = os.getenv("LAST_IP_FILE", DEFAULT_LAST_IP_FILE)
|
|
tracker_record = os.getenv("TRACKER_RECORD") or config.get("tracker_record")
|
|
|
|
session = requests.Session()
|
|
|
|
# Determine starting last_ip:
|
|
# explicit override > Cloudflare TXT tracker > persisted file > None
|
|
tracker_record_id: Optional[str] = None
|
|
if last_ip_override:
|
|
last_ip: Optional[str] = last_ip_override
|
|
elif tracker_record:
|
|
try:
|
|
tracker_record_id, last_ip = fetch_tracker_record(session, token, zone_id, tracker_record)
|
|
except Exception as exc:
|
|
print(f"Warning: could not read tracker record {tracker_record!r}: {exc}", file=sys.stderr)
|
|
last_ip = read_last_ip_file(last_ip_file)
|
|
else:
|
|
last_ip = read_last_ip_file(last_ip_file)
|
|
|
|
print(f"Starting Cloudflare DDNS watcher (interval={poll_seconds}s)")
|
|
print(f"Primary IP endpoint: {ipinfo_url} (with {len(IP_SOURCES) - 1} fallbacks)")
|
|
if tracker_record:
|
|
print(f"Tracker TXT record: {tracker_record} (id={tracker_record_id or 'not found — will create'})")
|
|
else:
|
|
print(f"Last IP file: {last_ip_file}")
|
|
if last_ip_override:
|
|
print(f"Starting with overridden last_ip: {last_ip_override}")
|
|
elif last_ip:
|
|
print(f"Resuming with last_ip: {last_ip}")
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
current_ip = get_reliable_ip(session, ipinfo_url)
|
|
if current_ip is None:
|
|
print("Could not determine public IP from any source; skipping this cycle.", file=sys.stderr)
|
|
time.sleep(poll_seconds)
|
|
continue
|
|
|
|
if last_ip is None:
|
|
last_ip = current_ip
|
|
write_last_ip_file(last_ip_file, last_ip)
|
|
if tracker_record:
|
|
try:
|
|
upsert_tracker_record(session, token, zone_id, tracker_record, last_ip, tracker_record_id)
|
|
print(f"Initial IP: {current_ip} (tracker record set)")
|
|
except Exception as exc:
|
|
print(f"Warning: could not set tracker record: {exc}", file=sys.stderr)
|
|
print(f"Initial IP: {current_ip}")
|
|
else:
|
|
print(f"Initial IP: {current_ip}")
|
|
elif current_ip != last_ip:
|
|
print(f"IP changed: {last_ip} -> {current_ip}")
|
|
try:
|
|
records = fetch_records_with_ip(session, token, zone_id, last_ip)
|
|
if not records:
|
|
print(
|
|
f"No A records matched old IP {last_ip!r}; "
|
|
"they may have already been updated externally."
|
|
)
|
|
else:
|
|
for rec in records:
|
|
update_record_ip(session, token, zone_id, rec, current_ip)
|
|
print(f"Updated {rec['name']} to {current_ip}")
|
|
# Advance last_ip regardless — old IP is gone, no point
|
|
# searching for it again next cycle.
|
|
last_ip = current_ip
|
|
write_last_ip_file(last_ip_file, last_ip)
|
|
if tracker_record:
|
|
try:
|
|
upsert_tracker_record(session, token, zone_id, tracker_record, last_ip, tracker_record_id)
|
|
except Exception as exc:
|
|
print(f"Warning: could not update tracker record: {exc}", file=sys.stderr)
|
|
except Exception as api_err:
|
|
print(f"Cloudflare update failed: {api_err}", file=sys.stderr)
|
|
# Do NOT advance last_ip; retry on next cycle.
|
|
else:
|
|
print(f"No change. Current IP: {current_ip}")
|
|
|
|
except Exception as exc:
|
|
# Catch-all so a transient error never kills the process.
|
|
print(f"Unexpected error in poll cycle: {exc}", file=sys.stderr)
|
|
|
|
time.sleep(poll_seconds)
|
|
except KeyboardInterrupt:
|
|
print("Stopping watcher.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|