Introduction to Cloud Composer Workflow Orchestration
Cloud Composer workflow orchestration is the GCP-managed flavour of Apache Airflow, and it is the default answer on the PDE exam whenever a question describes "scheduled, multi-step, cross-service" data pipelines. Instead of stringing Cloud Functions together with Pub/Sub topics or scripting Cron jobs on a Compute Engine VM, you write a Python DAG once and Composer handles scheduling, retries, lineage, and observability across BigQuery, Dataflow, Dataproc, GKE, Cloud Storage, and external systems.
This study note walks through everything PDE candidates need to recognise: the runtime architecture, DAG mechanics, the operator catalogue (BigQueryInsertJobOperator, DataflowStartFlexTemplateOperator, GKEStartPodOperator), XCom, sensors, environment sizing (small / medium / large), the Composer 2 vs Composer 3 split, IAM, Secret Manager integration, and how to monitor a DAG before it silently rots.
白話文解釋(Plain English Explanation)
The Wedding Planner Who Never Sleeps
A Cloud Composer environment behaves like a wedding planner running a hundred weddings on the same Saturday. Each wedding has the same skeleton: book the venue, send invitations, hire the caterer, confirm the band, run the ceremony. The planner does not bake the cake or play the violin. They only watch the clipboard, tick boxes, call vendors, and panic gracefully when the florist is late. Airflow is that planner. The cake is BigQuery. The band is Dataflow. The florist is your flaky third-party API. Composer is the agency that hired the planner, gave her an office (GKE), a filing cabinet (Cloud SQL), and a fax machine to the outside world (Cloud Storage DAG bucket).
A Recipe Card vs the Actual Cooking
A DAG file is a recipe card, not the meal. The recipe says "preheat oven, mix batter, bake 30 minutes, frost when cool." It does not contain flour. Beginners often confuse the two and stuff heavy pandas transformations directly into the DAG file, which is like writing the entire cookbook on the back of one index card. The recipe should only describe order and timing. The cooking — the actual data crunching — belongs in Dataflow, BigQuery, or a container that GKEStartPodOperator launches. Airflow tasks should hand work off, then wait politely.
The Train Station Departures Board
Airflow's scheduler is the departures board at a train station. Trains (DAG runs) leave on a schedule (schedule_interval). Some trains depend on the previous train arriving (the depends_on_past flag). Sensors are the platform attendants who refuse to let the train depart until the inbound passengers (an upstream file in GCS) have arrived. XCom is the small parcel one conductor hands the next conductor on a connecting train: a job ID, a row count, a file path. You never ship a freight container of data through XCom. If the parcel does not fit in a coat pocket, push it to GCS and pass the path instead.
Core Concepts of Cloud Composer Workflow Orchestration
A handful of vocabulary words unlock most of the exam questions.
A DAG (Directed Acyclic Graph) is a Python module that defines tasks and dependencies. The Airflow scheduler parses every .py file in the DAG bucket on a regular cadence (default 30 seconds), so files must be cheap to import. Top-level code runs on every parse, so an innocent pd.read_csv() at module scope will hammer your scheduler.
An Operator is a template for one unit of work. BashOperator runs a shell command, PythonOperator runs a callable, BigQueryInsertJobOperator submits a SQL job, DataflowStartFlexTemplateOperator launches a Dataflow Flex Template, and GKEStartPodOperator schedules a Kubernetes Pod on a GKE cluster of your choosing. Each instantiation of an operator is a Task.
A Task Instance is one execution of one task for one DAG run. The matrix of (DAG, run date, task id, try number) uniquely identifies it. The Airflow UI's grid view is essentially a pivot of this matrix.
XCom ("cross-communication") is the lightweight key-value channel for passing return values between tasks. Default backend is the metadata DB, capped around 48 KB on Composer; for anything larger configure a custom XCom backend that writes to GCS.
A Sensor is a special operator that polls until a condition becomes true: a file lands in GCS (GCSObjectExistenceSensor), a BigQuery partition is non-empty (BigQueryTablePartitionExistenceSensor), or a Pub/Sub message arrives. Always run sensors in mode="reschedule" for long waits — mode="poke" holds a worker slot hostage.
A Pool is a concurrency budget. Tag tasks that hit a fragile API with the same pool to throttle parallelism without rewriting code.
A Python definition of tasks and their dependencies, with no cycles, that Airflow's scheduler turns into runs at a configured interval. See Airflow DAGs.
Architecture and Design Patterns
A Composer environment is not a single VM. It is a small constellation of managed services that you should be able to sketch on a whiteboard during the exam.
The scheduler, web server, triggerer, and workers all run as pods on a GKE cluster. In Composer 1 this cluster lived in your project. In Composer 2 the GKE Autopilot cluster runs in a Google-managed tenant project, and you only see the namespace. In Composer 3 (GA in 2024) even the GKE substrate is invisible; the environment runs on Google's serverless platform and you simply pick CPU, memory, and storage sliders. Composer 3 also bills per-second instead of per-hour and starts in roughly five minutes versus twenty-five for Composer 2.
The metadata database is a Cloud SQL for PostgreSQL instance. It stores DAG state, task instance history, XCom values, variables, and connections. You do not get superuser access. Backups are automated.
The DAG bucket is a Cloud Storage bucket that Composer mounts into every worker via Cloud Storage FUSE. Drop a .py file in gs://<env>/dags/, and within one parse cycle the scheduler picks it up. The same bucket holds plugins/, data/, and logs/ prefixes.
A common architecture pattern is the fan-out / fan-in DAG: a single sensor waits for an upstream signal, fans out to dozens of partition-level processing tasks executed in parallel by a TaskGroup, then fans back into a single quality check and a notification task. The exam loves this shape because it maps cleanly to "process yesterday's regional sales files in parallel, then aggregate."
Another pattern is the branching DAG using BranchPythonOperator: based on the output of an upstream task, choose which downstream task to execute. Useful for "if validation passes, load to prod; otherwise route to quarantine."
Composer 1 reached end of full support in September 2024 and end of all support in September 2025. New environments must use Composer 2 or Composer 3. Migration is one-way and requires a fresh environment plus DAG re-deployment. Reference: Composer versioning overview.
GCP Service Deep Dive
Operator catalogue you will actually be tested on
BigQueryInsertJobOperator is the modern way to run any BigQuery job — query, load, extract, copy. The legacy BigQueryOperator and BigQueryExecuteQueryOperator are deprecated; if you see them in exam answer choices alongside BigQueryInsertJobOperator, the latter wins. Configuration is a dict matching the BigQuery REST API, which means parameterised queries, destination tables, write dispositions, and labels all flow through naturally.
DataflowStartFlexTemplateOperator and DataflowStartSqlJobOperator launch Dataflow jobs. Always pair these with DataflowJobStatusSensor if a downstream task depends on the job's success — the start operator returns immediately after submission, and skipping the sensor produces a race condition where downstream tasks read empty tables.
GKEStartPodOperator is the escape hatch for everything Airflow does not have a first-class operator for. It launches a Pod on either the Composer GKE cluster (Composer 2) or a separate GKE cluster you specify (recommended). Use it for custom Python code that needs heavy CPU, GPUs, or libraries you do not want to bake into the Airflow image. Pair it with Workload Identity so the Pod authenticates as a dedicated service account rather than the Composer worker SA.
KubernetesPodOperator is the older sibling that runs against the in-cluster Kubernetes API. On Composer 2 it lands on the Composer GKE cluster's composer-user-workloads namespace. On Composer 3 it has been replaced; use GKEStartPodOperator against an external cluster instead.
CloudDataFusionStartPipelineOperator, DataprocSubmitJobOperator, DataprocCreateBatchOperator (for Dataproc Serverless), and BeamRunPythonPipelineOperator round out the catalogue. The exam will frequently test whether you know which operator pairs with which managed service — Dataproc Serverless uses the Batch family, not the cluster family.
XCom, sensors, and the small-payload rule
XCom is for handles, not haystacks. A BigQuery job ID, a file path, a row count, a model version — fine. A 50 MB JSON blob — never. The metadata DB is a single PostgreSQL instance and bloating it with large XComs degrades scheduler performance for every other DAG in the environment. When in doubt, write to GCS and pass the URI.
Sensors come in three modes. poke (default) holds a worker slot for the entire wait, which is fine for short waits but catastrophic for waits over a few minutes. reschedule releases the slot between pokes — preferred for any wait beyond five minutes. smart_sensor was deprecated in Airflow 2.4; the modern replacement is Deferrable operators backed by the triggerer component, which can wait for thousands of conditions on a single async process. Composer 2 and 3 both ship with the triggerer enabled.
Scheduling that actually works
schedule_interval accepts cron strings, timedelta objects, or Airflow presets like @daily. A subtle point that trips engineers: a DAG with schedule_interval="@daily" and start_date=datetime(2026, 5, 1) does not run at midnight on May 1. It runs at midnight on May 2, processing the data for May 1. Airflow always runs at the end of the interval. The data_interval_start and data_interval_end macros, not the legacy execution_date, are the correct way to reason about this in Airflow 2.x.
Use catchup=False on most production DAGs to prevent Airflow from back-filling every missed interval after a deployment. Use max_active_runs=1 for DAGs whose tasks are not idempotent across overlapping runs.
Composer environments: small, medium, large
Composer 2 and 3 both expose three preset sizes plus a custom slider.
Small is roughly 0.5 vCPU per scheduler, 0.5 vCPU per worker, and a small Cloud SQL — enough for a few hundred task instances per day. Suitable for personal projects and dev environments.
Medium doubles those resources and is the typical production starting point for a team running tens of DAGs.
Large is for environments with hundreds of DAGs and thousands of concurrent task instances; it provisions a beefier Cloud SQL and more scheduler replicas. If you outgrow large, split into multiple environments grouped by business domain rather than scaling endlessly.
You can also pick custom in Composer 3 and dial each component independently — useful when you have many small DAGs (scheduler-bound) versus few heavy DAGs (worker-bound).
Worker autoscaling on Composer 2 and 3 scales between min_workers and max_workers based on the queued task count. If your DAGs run in bursts (every hour on the hour), set min_workers low and max_workers generously — you will pay only for the spikes. See Cloud Composer overview.
Composer 2 vs Composer 3 — the version question
Expect at least one PDE question forcing you to choose between Composer 2 and Composer 3. The honest summary:
| Dimension | Composer 2 | Composer 3 |
|---|---|---|
| Substrate | GKE Autopilot in tenant project | Fully serverless |
| Environment creation | ~25 minutes | ~5 minutes |
| Billing granularity | Per-hour | Per-second |
| Networking | VPC-native, Private IP supported | VPC-native, simpler private setup |
| Customer-managed CMEK | Yes | Yes |
| Airflow versions | Airflow 2.x | Airflow 2.x (latest patch faster) |
| When to choose | Existing environments, KubernetesPodOperator on the in-cluster namespace | New environments, faster spin-up, lower idle cost |
For greenfield deployments in 2026, Composer 3 is the default recommendation. Migrate Composer 2 environments opportunistically — there is no auto-upgrade path, you provision a new Composer 3 environment and re-deploy DAGs.
IAM and Secret Manager
Two service accounts matter. The environment service account is what Composer uses to manage the underlying GKE / Cloud SQL / GCS resources during environment creation and updates; it needs roles/composer.worker and the project-level Composer Service Agent role. The worker service account is what your tasks impersonate when they call BigQuery, Dataflow, GCS, etc. Best practice: do not reuse the default Compute Engine service account. Create a dedicated SA, grant it only the roles its DAGs need (roles/bigquery.dataEditor, roles/dataflow.developer, etc.), and attach it at environment creation time.
For per-DAG isolation, use GKEStartPodOperator with Workload Identity so each Pod runs as its own SA — the Composer worker SA never sees the credentials.
Airflow Connections and Variables can be backed by Secret Manager instead of the metadata DB. Set secrets_backend to airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend in the Airflow configuration overrides. Connections become airflow-connections-<conn_id> secrets and Variables become airflow-variables-<var_name>. This gives you audit logs, rotation, and IAM-controlled access for every credential, instead of plaintext rows in PostgreSQL.
PDE answer choices that mention storing DAG credentials in Airflow Variables, the metadata Cloud SQL, or .py source files are wrong by design. The expected pattern is CloudSecretManagerBackend configured under [secrets] backend, a dedicated worker service account (never the default Compute Engine SA), and roles/secretmanager.secretAccessor granted on the specific secret — not project-wide. For per-DAG isolation against a separate GKE cluster, use GKEStartPodOperator with Workload Identity so each Pod authenticates as its own SA. Reference: Composer access control.
The worker service account inherits whatever permissions you grant it across the entire project. A DAG that "only" reads BigQuery can still delete Cloud Storage buckets if the SA has roles/storage.admin. Practice least privilege, and consider one Composer environment per data domain so the blast radius of an over-permissioned SA is bounded. Reference: Composer access control.
Monitoring DAGs before they rot
Composer pipes scheduler, worker, and DAG logs to Cloud Logging automatically. Each task instance log is also browsable from the Airflow UI grid view, but Logging is the source of truth for alerting.
The exam-relevant metrics live under the composer.googleapis.com/environment/ and composer.googleapis.com/workflow/ namespaces. The handful you should recognise:
workflow/run_count— DAG run completions, partitioned by status (success / failed)workflow/run_duration— how long DAG runs take end-to-endtask_instance/run_duration— per-task latencyenvironment/database/cpu_utilization— Cloud SQL pressure, the leading indicator of scheduler slownessenvironment/worker/pod_eviction_count— worker OOM and preemption signal
Build a Cloud Monitoring alerting policy on workflow/run_count filtered to state=failed for any DAG tagged tier=critical, and route to PagerDuty via a notification channel. For SLA monitoring, set the sla parameter on individual tasks; missed SLAs surface in the Airflow UI and emit a Cloud Logging entry you can alert on.
Common Pitfalls and Trade-offs
The single most common Composer outage is scheduler overload caused by expensive top-level DAG code. Every parse cycle re-imports every .py file in the bucket. If your DAG file calls bigquery.Client().query("SELECT ...") at module scope to dynamically generate tasks, you have just turned every parse into a billable BigQuery job and a 10-second hang. Move dynamic generation behind a cached function or pre-generate a JSON manifest in CI.
The second most common is XCom abuse. A DAG that pushes a 5 MB DataFrame to XCom will work fine in dev with one DAG, then poison the metadata DB in prod with a hundred. Cap XCom payloads at a few KB and use GCS for anything larger.
A third pitfall is the start_date confusion. Setting start_date=datetime.now() produces non-deterministic behaviour because every parse evaluates now() again. Always pin start_date to a fixed past date.
Trade-offs worth thinking about:
- Composer vs Cloud Workflows. If your orchestration is pure GCP API calls with no Python logic and no need for the Airflow ecosystem, Cloud Workflows is cheaper and simpler. Composer earns its keep when you need the operator catalogue, custom Python, plugins, or cross-cloud connections.
- Composer vs Cloud Scheduler + Cloud Functions. For a single scheduled trigger, Scheduler + Functions is fine. For anything with dependencies, retries, backfills, or observability, Composer wins on day one and dominates by month six.
- One big environment vs many small ones. Many small environments give blast-radius isolation and per-team cost attribution but multiply the management surface. One big environment is cheaper per DAG but a single bad DAG can starve everyone.
Best Practices
- Pin every
start_dateto a fixeddatetimeliteral, neverdatetime.now()ordays_ago(). - Set
catchup=Falseby default; opt in to backfilling consciously. - Use
TaskGroup(Airflow 2.x) instead of the deprecatedSubDagOperator. - Move all secrets to Secret Manager via the
CloudSecretManagerBackend. - Run sensors in
reschedulemode or, better, use deferrable operators backed by the triggerer. - Tag DAGs with
owner,tier, anddomainso monitoring policies can target subsets. - Keep top-level DAG file imports under one second; profile with
airflow dags list-import-errorsandairflow dags show. - Use a dedicated worker service account per environment, never the default Compute Engine SA.
Real-World Use Case
A mid-sized European fintech with about 200 engineers runs a daily revenue-recognition pipeline on Composer 3. The pipeline starts at 02:00 UTC with a GCSObjectExistenceSensor waiting for the previous day's transaction extract from their core banking partner — a flat-file dump that lands somewhere between 02:00 and 03:30 depending on partner-side load. The sensor runs in reschedule mode so it does not hold a worker slot during the hour-long wait.
Once the file lands, a TaskGroup fans out into seventeen parallel DataflowStartFlexTemplateOperator tasks, one per country, each transforming the local-currency rows into the canonical schema. A DataflowJobStatusSensor follows each Dataflow task to confirm completion. Then a BigQueryInsertJobOperator runs the consolidation MERGE into the revenue_recognized partitioned table, followed by a BigQueryCheckOperator that asserts row counts are within 5% of a seven-day moving average. If the check fails, a BranchPythonOperator routes to a quarantine path that pages the on-call engineer; if it passes, a final GKEStartPodOperator runs a custom Python container that pushes journal entries to NetSuite via REST.
The environment is Composer 3 medium, costing roughly 800 USD per month. Worker autoscaling sits at one minimum worker for most of the day and spikes to twelve workers during the 02:00 fan-out window. All credentials (NetSuite API key, partner SFTP password) live in Secret Manager. Cloud Monitoring alerts on workflow/run_count{state=failed} for any DAG tagged tier=critical, with a 15-minute pager threshold. The team migrated from Composer 2 in early 2026 and reports environment creation dropping from 28 minutes to 4 minutes, which materially improved their disaster-recovery rehearsals.
Exam Tips
PDE questions about Cloud Composer cluster around a predictable handful of decisions. Memorise the patterns:
- "Schedule a multi-step pipeline with retries and dependencies across BigQuery, Dataflow, and GCS" → Cloud Composer.
- "Trigger a single Cloud Function on a cron schedule" → Cloud Scheduler, not Composer.
- "Orchestrate GCP API calls with no custom Python and minimal cost" → Cloud Workflows.
- "Migrate from Composer 1" → Provision new Composer 2 or Composer 3 environment, redeploy DAGs. There is no in-place upgrade.
- "Pass a 100 MB intermediate result between two tasks" → Write to GCS, pass the URI through XCom. Never the data itself.
- "Wait for a file to land in GCS before processing" →
GCSObjectExistenceSensorinreschedulemode, or the deferrable variant. - "Run a custom Python container with GPU" →
GKEStartPodOperatoragainst a separate GKE cluster with Workload Identity. - "Store database passwords used by DAGs" → Secret Manager backend, not Airflow Variables.
- "DAG runs at midnight but processes yesterday's data — when does the run actually trigger?" → At the end of the interval, so 00:00 of the following day.
Composer scheduling rule. Airflow runs a DAG at the end of its interval, not the start. A @daily DAG with start_date=2026-05-01 first runs at 00:00 on 2026-05-02, with data_interval_start=2026-05-01 and data_interval_end=2026-05-02. Reference: Airflow DAGs.
Frequently Asked Questions (FAQ)
When should I pick Cloud Composer over Cloud Workflows?
Pick Composer when you need the Airflow ecosystem — operators, sensors, plugins, custom Python, backfilling, the rich UI, and connections to non-GCP systems. Pick Workflows when your orchestration is a sequence of GCP API calls with simple branching, no Python, and you want to pay per execution rather than for an always-on environment. A useful heuristic: if the team already knows Airflow or the pipeline has more than ten tasks with cross-service dependencies, Composer earns its keep.
What is the right way to pass data between Airflow tasks on Composer?
For small handles — job IDs, file paths, row counts, model versions — use XCom. The default backend stores XComs in the metadata Cloud SQL database, capped at roughly 48 KB per value on Composer. For anything larger, write the payload to Cloud Storage from the upstream task and push only the GCS URI through XCom; the downstream task reads from GCS. Bloating XCom with large payloads degrades scheduler performance for every DAG in the environment.
How do I migrate from Composer 1 or Composer 2 to Composer 3?
There is no in-place upgrade between major versions. The migration pattern is: provision a new Composer 3 environment alongside the existing one, copy DAG files to the new bucket, recreate Variables and Connections (ideally via the Secret Manager backend so this step is one-time), test in parallel, then cut traffic over by pausing DAGs in the old environment and unpausing in the new. Composer 1 is out of support entirely as of September 2025; treat that migration as urgent rather than optional.
Why does my DAG fail with "DAG not found" right after I upload it?
The Airflow scheduler parses the DAG bucket on a cadence (default 30 seconds) and the file goes through Cloud Storage FUSE before the scheduler sees it. Wait one or two parse cycles. If the DAG still does not appear, run airflow dags list-import-errors from the Composer CLI — usually a syntax error, a missing import, or a top-level call that times out is preventing the file from importing cleanly.
How should I store secrets used by my DAGs?
Use Secret Manager via the Airflow secrets backend. Set the configuration override [secrets] backend = airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend. Connections become airflow-connections-<conn_id> secrets and Variables become airflow-variables-<var_name> secrets. This gives you IAM-controlled access, audit logs, and rotation, instead of plaintext rows in the metadata database. The Composer worker service account needs roles/secretmanager.secretAccessor on the relevant secrets.
Can I run Composer in a private network with no public IPs?
Yes. Composer 2 and Composer 3 both support private IP environments, where the GKE workloads, Cloud SQL, and the Airflow web server are reachable only through your VPC. Composer 3 simplifies this further by exposing the web server through a Google-managed private endpoint without requiring a separate Cloud NAT for outbound traffic to public package registries (PyPI mirrors are configured separately). Plan IP ranges carefully — the GKE pod and service ranges are non-trivial to change after creation.
Related Topics
- Dataflow Architecture Selection — most Composer DAGs end up launching Dataflow jobs; understanding template types directly shapes your operator choice.
- BigQuery Data Modeling and Clustering — partitioning strategy determines whether your
BigQueryInsertJobOperatorMERGEs scan gigabytes or terabytes. - Batch vs Streaming Design — Composer is a batch-first tool; knowing when to switch to a streaming runtime saves you from forcing every problem into a DAG.
Further Reading
- Cloud Composer overview — official architecture, components, and version comparison.
- Composer versioning overview — version support windows, end-of-life dates, and migration guidance.
- Apache Airflow DAGs — upstream documentation for DAG mechanics, scheduling, and task dependencies.