# dag /rotate_airflow_logs_to_oci.py import os import sys import json import time from datetime import datetime, timedelta from pathlib import Path import pendulum from airflow import DAG from airflow.operators.python import PythonOperator # Config setup1 MRDS_ENV = (os.getenv("MRDS_ENV") or "tst").strip().lower() if MRDS_ENV not in {"dev", "tst", "acc", "prd"}: MRDS_ENV = "tst" # Namespace (tenancy Object Storage namespace) NAMESPACE = (os.getenv("BUCKET_NAMESPACE") or "frcnomajoc7v").strip() # Buckets per env (USED) DEFAULT_BUCKETS = { "dev": "mrds_airflow_logs_dev", "tst": "mrds_airflow_logs_tst", "acc": "mrds_airflow_logs_acc", "prd": "mrds_airflow_logs_prd", } # Airflow log path LOG_DIR = Path("/opt/airflow/logs") STDOUT_LOG = LOG_DIR / "stdout.log" DAG_PM_DIR = LOG_DIR / "dag_processor_manager" DAG_PM_LOG = DAG_PM_DIR / "dag_processor_manager.log" # Object key prefix in the bucket (folder) OBJECT_PREFIX = os.getenv("LOGS_OBJECT_PREFIX", "AIRFLOW_STD_OUT_LOG_ROTATE").strip("/") # Helpers def pp(tag: str, obj) -> None: try: print(f"[{tag}] {json.dumps(obj, indent=2, sort_keys=True)}", flush=True) except Exception: print(f"[{tag}] {obj!r}", flush=True) def _oci_client(): """ Create an OCI Object Storage client... Order: Resource Principals -> Instance Principals. """ import oci region = os.getenv("OCI_REGION") or os.getenv("OCI_RESOURCE_PRINCIPAL_REGION") or "eu-frankfurt-1" # Resource Principals try: rp_signer = oci.auth.signers.get_resource_principals_signer() cfg = {"region": region} pp("oci.auth", {"mode": "resource_principals", "region": region}) return oci.object_storage.ObjectStorageClient(cfg, signer=rp_signer) except Exception as e: pp("oci.auth.rp_unavailable", str(e)) # Instance Principals try: ip_signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner() cfg = {"region": region} pp("oci.auth", {"mode": "instance_principals", "region": region}) return oci.object_storage.ObjectStorageClient(cfg, signer=ip_signer) except Exception as e: pp("oci.auth.ip_unavailable", str(e)) raise RuntimeError("No OCI credentials found (Resource Principals or Instance Principals).") def _resolve_bucket() -> str: """Always use the bucket mapped from DEFAULT_BUCKETS based on MRDS_ENV.""" return DEFAULT_BUCKETS[MRDS_ENV] def _resolve_namespace(client) -> str: """Use provided NAMESPACE if set; otherwise discover it.""" if NAMESPACE: return NAMESPACE ns = client.get_namespace().data pp("oci.namespace.discovered", ns) return ns def _rotate(src_path: Path, rotated_path: Path) -> bool: """ a) Copy current log to a dated file and truncate original. b) Returns True if a rotated file was created (and non-empty), else False. """ if not src_path.exists(): pp("rotate.skip", {"reason": "missing", "path": str(src_path)}) return False try: content = src_path.read_text(encoding="utf-8", errors="ignore") except Exception: content = src_path.read_bytes().decode("utf-8", errors="ignore") if not content.strip(): pp("rotate.skip", {"reason": "empty", "path": str(src_path)}) return False rotated_path.write_text(content, encoding="utf-8", errors="ignore") # truncate original src_path.write_text("", encoding="utf-8") pp("rotate.done", {"src": str(src_path), "rotated": str(rotated_path), "bytes": len(content.encode("utf-8"))}) return True def _upload(client, namespace: str, bucket: str, object_name: str, file_path: Path) -> None: import oci for attempt in range(1, 4): try: with file_path.open("rb") as fh: resp = client.put_object(namespace, bucket, object_name, fh) pp("oci.put_object.ok", {"bucket": bucket, "name": object_name, "status": getattr(resp, "status", None)}) return except Exception as e: pp("oci.put_object.error", {"attempt": attempt, "bucket": bucket, "name": object_name, "error": str(e)}) if attempt < 3: time.sleep(1.5 * attempt) else: raise def _process_one(client, namespace: str, bucket: str, date_str: str, src: Path, base_name: str): rotated = src.with_name(f"{src.name}.{date_str}") if not _rotate(src, rotated): return # Build object key (no leading slash) obj = f"{OBJECT_PREFIX}/{date_str}/{base_name}" _upload(client, namespace, bucket, obj, rotated) try: rotated.unlink(missing_ok=True) pp("cleanup.rotated_removed", str(rotated)) except Exception as e: pp("cleanup.rotated_remove_failed", {"path": str(rotated), "error": str(e)}) def rotate_and_upload_main(): try: client = _oci_client() namespace = _resolve_namespace(client) bucket = _resolve_bucket() # compute "yesterday" at runtime (UTC) so it's always current date_str = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d") pp("uploader.config", { "env": MRDS_ENV, "namespace": namespace, "bucket": bucket, "object_prefix": OBJECT_PREFIX, "date": date_str, "log_dir": str(LOG_DIR), }) # Small delay to ensure logs are flushed before rotation time.sleep(3) _process_one(client, namespace, bucket, date_str, STDOUT_LOG, "stdout.log") _process_one(client, namespace, bucket, date_str, DAG_PM_LOG, "dag_processor_manager.log") pp("uploader.done", {"env": MRDS_ENV}) except Exception as e: pp("uploader.failed", str(e)) raise # DAG definition default_args = { "owner": "data-eng", "retries": 1, "retry_delay": pendulum.duration(minutes=5), } with DAG( dag_id="rotate_airflow_logs_to_oci", description="Rotate Airflow logs and upload to OCI Object Storage daily.", start_date=datetime(2025, 10, 21), schedule_interval="0 0 * * *", # runs daily right after 00:00 UTC catchup=False, default_args=default_args, tags=["logs", "oci", "maintenance"], ) as dag: rotate_and_upload = PythonOperator( task_id="rotate_and_upload_logs", python_callable=rotate_and_upload_main, ) rotate_and_upload