examlab .net The most efficient path to the most valuable certifications.
In this note ≈ 21 min

Advanced Dataflow: Windowing and Watermarks

4,180 words · ≈ 21 min read ·

Master Dataflow windowing, watermarks, triggers, allowed lateness, and late data handling for the GCP Professional Data Engineer exam with practical patterns.

Do 20 practice questions → Free · No signup · PDE

Introduction to Dataflow Windowing, Watermarks and Late Data

Streaming pipelines on Google Cloud rarely fail because the code is wrong. They fail because the engineer treated time as a single number. Dataflow windowing, watermarks, and late data are the three primitives that force you to think about time the way reality actually behaves: messy, out of order, and occasionally hours behind. This note walks through every knob the Apache Beam model gives you on the Dataflow runner, with the kind of scenarios the PDE exam loves to set as traps.

By the end of this guide you should be able to pick a window type without hesitation, explain why your watermark is stuck, and decide whether to drain or cancel a job at 3 a.m. without paging anyone.

白話文解釋(Plain English Explanation)

Think of Windowing Like a Restaurant Splitting the Day Into Shifts

A restaurant does not count revenue per individual customer. It counts per shift: breakfast, lunch, dinner. That grouping is a fixed window. If the manager wants a rolling "how busy were we in the last two hours, refreshed every thirty minutes" report, that is a sliding window. If the question is "how long did this single table stay and what did they order," the boundary is defined by the customer's own activity — that is a session window. Dataflow lets you pick whichever shift definition matches the question you are asking, and you can even ask multiple shifts of the same data at once.

Think of Watermarks Like the Last Postcard From a Slow Mail Route

Imagine you run a mailroom and postcards keep arriving stamped with the date they were written. You want to close the books on "all postcards written before noon today" — but the postal service is slow and unreliable. The watermark is your best guess of "I am now confident that everything written before time T has arrived." It is not a guarantee. It is a heuristic the system advances based on how the data is actually flowing in. When the watermark passes the end of a window, that window is ready to fire. Postcards that show up after that point are late, and you decide whether to reopen the books or just toss them.

Think of Triggers Like a Pizza Oven Timer

The window says "this batch of dough is for the 7 p.m. pizza." The trigger says "but I want a sneak peek every five minutes anyway, plus the final result when the timer dings, plus a corrected result if someone slides in extra toppings ten minutes late." Early triggers give you speculative results. The on-time trigger fires when the watermark crosses the window end. Late triggers handle stragglers within your allowed lateness. Accumulation mode decides whether each fire shows the new toppings only (DISCARDING) or the entire pizza so far (ACCUMULATING).

Think of Late Data Like Receipts Found in an Old Jacket

You already filed last quarter's expenses. Then you find a crumpled receipt in a coat pocket. Allowed lateness is your company policy on how long you will accept old receipts before refusing to amend the books. Side outputs for late data are the drawer where you stash the ones you refuse to file but still want to keep around for forensic review.

Core Concepts of Dataflow Windowing, Watermarks and Late Data

Beam treats every element in a streaming pipeline as a tuple of value, event time, window, and pane info. Every concept below is a way of deciding when to compute aggregates over groups of those tuples.

The timestamp of when the event actually happened in the real world, attached to the element by the source. For Pub/Sub it can be the publish time or a custom attribute extracted with withTimestampAttribute.

The wall clock time on the worker when the element is being processed. This is what naive systems use, and it is almost always wrong for analytics on real-world events.

A monotonically advancing estimate of "no more event-time data older than this should arrive." Each PCollection has its own watermark; the runner propagates it through transforms.

The four window types you must know for the PDE exam are fixed, sliding, session, and global. Each handles a different shape of question, and each has a different default trigger when used with streaming.

A fixed window of one minute partitions the timeline into non-overlapping one-minute buckets. A sliding window of five minutes with a one-minute period produces overlapping windows that update every minute. A session window has no fixed duration; it groups elements per key separated by gaps no longer than the configured gap duration. The global window holds everything in a single window forever, which is fine for batch but requires a non-default trigger to be useful in streaming.

The default trigger in streaming is AfterWatermark.pastEndOfWindow() with no allowed lateness and DISCARDING mode. If you do not configure anything else, late data is silently dropped. This is the single most common cause of "my pipeline lost data" tickets. See: https://beam.apache.org/documentation/programming-guide/#triggers

Architecture and Design Patterns

A streaming Dataflow pipeline almost always has the same skeleton: a source that assigns event timestamps and emits a watermark, one or more Window.into(...) transforms that group elements by time, a GroupByKey or Combine that produces aggregates per window per key, and a sink that writes either per-pane outputs or final results.

The architectural choice that bites people is where to apply the window. If you window before a join, both sides need compatible windowing or Beam will reject the pipeline. If you window after a stateful transform, the timestamps may have been advanced and you will lose the tight grouping you wanted. The typical pattern is: read from Pub/Sub with a custom timestamp attribute, immediately window into the analysis grain, then perform Combine.perKey and write to BigQuery using the streaming inserts API or to Bigtable for low-latency lookups.

For dashboards that need both freshness and correctness, the lambda-style answer is two pipelines: one streaming with early triggers feeding a low-latency table, one batch reprocessing the same data daily and overwriting a "canonical" table. The kappa-style answer, which Dataflow makes practical, is one streaming pipeline with accumulatingFiredPanes() and a meaningfully large allowed lateness, writing to a BigQuery table partitioned by event time so corrections naturally land in the right partition.

For dashboards, write the pane info (PaneInfo.getTiming, PaneInfo.getIndex) into BigQuery as columns. Downstream queries can then WHERE pane_timing = 'ON_TIME' to ignore speculative early panes when accuracy matters. See: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/windowing/PaneInfo.html

GCP Service Deep Dive: Dataflow Runner Behavior

Dataflow is the managed runner for Apache Beam on Google Cloud, and several windowing behaviors are runner-specific worth memorizing.

Watermark Sources

For Pub/Sub, the watermark is computed by the Pub/Sub I/O connector based on either the publish timestamp or the custom timestamp attribute you provided to PubsubIO.readMessagesWithAttributes().withTimestampAttribute("event_ts"). The subscription itself tracks the oldest unacknowledged message, which is what makes the watermark advance even when individual messages are delayed. Be aware that if you supply a custom timestamp attribute and a single old message gets stuck, the watermark will refuse to move past it for the entire pipeline. This shows up as windows never firing in production.

For Kafka via KafkaIO, the watermark is per partition and the runner takes the minimum across all partitions assigned to the workers. An idle partition will hold the watermark back unless you configure withWatermarkFn or use withCreateTime and idle partition detection.

For files (TextIO.read().watchForNewFiles(...)), the watermark advances based on file modification times.

Triggers in Practice

PCollection<KV<String, Long>> hits = events
    .apply(Window.<Event>into(FixedWindows.of(Duration.standardMinutes(5)))
        .triggering(AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(30)))
            .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardMinutes(30))
        .accumulatingFiredPanes())
    .apply(Count.perElement());

This snippet says: bucket events into 5-minute windows by event time; emit a speculative result every 30 seconds of processing time while the window is still open; emit the on-time pane when the watermark crosses the window end; emit a corrected pane for every late element that arrives within 30 minutes of the window end; and each pane contains the cumulative count, not just the delta.

Window + trigger cheat sheet for the PDE exam. Fixed (FixedWindows.of): non-overlapping, clock-aligned, default for "per hour / per minute" aggregates. Sliding (SlidingWindows.of(...).every(...)): overlapping, each element lands in size/period windows — a 5-min size / 1-min period means each element appears in 5 windows. Session (Sessions.withGapDuration): per-key gaps, no fixed length. Trigger composition: AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(...)).withLateFirings(...) paired with .withAllowedLateness(Duration) and either .accumulatingFiredPanes() (full aggregate per pane, use with MERGE/upsert sinks) or .discardingFiredPanes() (delta only, use with append-only sinks like Bigtable counters). See: https://beam.apache.org/documentation/programming-guide/#windowing

Accumulation Modes

DISCARDING mode means each pane contains only the elements that arrived since the previous fire. Downstream consumers must add deltas to get a running total. This is cheaper because each pane is smaller, and it is the right choice when writing to systems that natively support increments such as Bigtable counters.

ACCUMULATING mode means each pane contains the full aggregate so far. Downstream consumers should overwrite previous values. This is the right choice for BigQuery streaming inserts where you MERGE on a window key, or for dashboards that re-display the latest pane.

Mixing ACCUMULATING with a sink that does append-only inserts produces double-counted rows. Every pane re-emits everything, so window 12:00–12:05 might land in BigQuery five times: speculative count 100, 250, 400, on-time count 500, late count 510. If you SUM those, you get 1760 instead of 510. Either use DISCARDING with append, or use ACCUMULATING with MERGE keyed on (window_start, window_end, group_key). See: https://beam.apache.org/documentation/programming-guide/#window-accumulation-modes

Side Outputs for Late Data

Beam does not have a built-in "late data side output" for windowing — the elements either fall within allowed lateness and re-fire the window, or they are dropped. The pattern to capture droppable elements is to insert a DoFn before the window that compares the element timestamp with context.timestamp() against an estimate of the current watermark, and tag elements you suspect will be dropped with a TupleTag. A more robust approach is to widen the allowed lateness and explicitly route late panes to a "late audit" table by inspecting PaneInfo.getTiming() == LATE in a downstream transform.

PCollectionTuple split = windowed.apply(ParDo.of(new DoFn<KV<String, Long>, KV<String, Long>>() {
    @ProcessElement
    public void process(ProcessContext c, BoundedWindow w) {
        if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
            c.output(LATE_TAG, c.element());
        } else {
            c.output(c.element());
        }
    }
}).withOutputTags(MAIN_TAG, TupleTagList.of(LATE_TAG)));

split.get(MAIN_TAG).apply(BigQueryIO.writeTableRows()...);
split.get(LATE_TAG).apply(BigQueryIO.writeTableRows().to("project:ds.late_audit"));

Drain vs Cancel

When you stop a streaming Dataflow job you have two choices. Cancel terminates immediately; in-flight bundles are abandoned and their elements remain unacknowledged in Pub/Sub, so the next job sees them again. This is fast but unsafe for stateful pipelines because timers and state are discarded.

Drain stops accepting new input from sources, advances the watermark to infinity so all windows fire their final panes, finishes processing in-flight data, and only then shuts down. Drain is the correct choice when retiring a job you intend to replace, because it lets aggregations complete cleanly. Drain does not work for jobs reading from PubsubIO if you also want exactly-once delivery to the new job — you typically pair drain with a snapshot of the pipeline state and an --update flag on the new job.

You cannot resume a drained job. Drain is one-way: the pipeline is finalized and the watermark is advanced past everything. If you need to pause and later resume processing the same backlog, use --update to swap the running job in place instead. See: https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

Common Pitfalls and Trade-offs

The Pub/Sub stuck watermark is the number-one production issue. It happens when you set a custom timestamp attribute and a single old message sits unacknowledged. The subscription's oldest unacked timestamp drives the watermark, which then refuses to advance, which then prevents any window from closing. Symptoms are perfectly healthy throughput metrics but no output landing in BigQuery and a stale data_watermark_age metric in Cloud Monitoring. Resolution: ack-and-skip the poison message via dead-letter routing, or ensure your timestamp attribute is genuinely monotonic.

Session windows can be expensive on hot keys. If a single user has continuous activity, the session window for that key never closes and state accumulates indefinitely. Use a Combine.perKey with a custom merging accumulator rather than GroupByKey to keep memory bounded, and consider a maximum session length cap implemented via processing-time triggers.

Sliding windows multiply your data. A five-minute sliding window with a one-minute period assigns each element to five windows. Storage and CPU scale linearly with the multiplier, so a thirty-minute sliding window with a one-minute period is thirty times the work of a non-overlapping fixed window. The PDE exam loves to slip a "we want a one-hour rolling average updated every second" requirement into a question; the correct answer is usually "do not use sliding windows for that, use a stateful DoFn with a queue."

Allowed lateness consumes worker memory because state for each window is retained until the watermark passes window-end plus allowed lateness. A 24-hour allowed lateness on per-user session windows can require gigabytes of state per worker. Choose allowed lateness based on actual measured event-time skew at the 99th percentile, not on a "just in case" round number.

Setting allowedLateness(Duration.ZERO) is not the same as omitting it. Both behave identically in the default case, but if you later add withLateFirings, an explicit zero will silently disable them. Always set allowed lateness explicitly to the value you want and document why. See: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/windowing/Window.html

Best Practices

  • Always assign event timestamps at the source. Do not rely on processing time for anything that will be analyzed by humans later.
  • Measure event-time skew in production for two weeks before picking allowed lateness. Use the data_watermark_age and system_lag metrics in Cloud Monitoring.
  • Prefer Combine.perKey over GroupByKey for windowed aggregations. The combiner runs partial aggregates on the worker before the shuffle and dramatically reduces network traffic.
  • For BigQuery sinks, partition the output table by event time, not by ingestion time. Late corrections then land in the historical partition where they belong.
  • Write PaneInfo.timing and PaneInfo.index into your output rows. Future-you will need them when debugging "why does this dashboard show different numbers than yesterday."
  • Use --enableStreamingEngine for production streaming pipelines. It moves state and shuffle off the workers into a managed service, reducing cost and making --update operations more reliable.
  • For session windows on high-cardinality keys, monitor state size with the total_streaming_data_processed and per-stage state metrics. Set up alerts before workers OOM.
  • When changing windowing logic, test the --update path in a staging project first. Some windowing changes are not state-compatible and require draining and restarting from a checkpoint.

Real-World Use Case: Ride-Hailing Surge Pricing

Picture a mid-sized regional ride-hailing company with about 80,000 active drivers and 1.5 million daily ride requests. The pricing team wants surge multipliers updated every 30 seconds, computed from the ratio of unfulfilled requests to available drivers in each H3 hexagon over the trailing five minutes. Mobile network conditions mean ride-request events sometimes arrive five to ten minutes late from drivers in basements and parking garages.

The pipeline reads from a Pub/Sub topic with a custom request_event_time attribute. Events flow into a sliding window of five minutes with a 30-second period, giving the team a fresh five-minute view every half minute. The trigger is AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))) with allowed lateness of ten minutes and ACCUMULATING mode. A Combine.perKey over the H3 hex computes the ratio. Output writes to Bigtable keyed on (hex_id, window_end_ts) so the mobile app can do single-row reads.

The team learned three things the hard way. First, sliding windows with a 30-second period create ten overlapping windows per element, so they had to scale up Streaming Engine and add a Combine before windowing to deduplicate identical events. Second, drivers in dead zones produced a bimodal distribution of event-time delay, so they tuned allowed lateness from "just in case" 30 minutes down to a measured 10 minutes and cut state storage by two-thirds. Third, they migrated cancel-then-redeploy to drain-then-update for code releases, eliminating the 90-second window of stale prices that previously hit users during deploys.

Exam Tips

The PDE exam tests this topic with scenario questions. The wording usually hides the answer in two places: the description of how often results are needed, and the description of how late events can arrive.

If the scenario says "we need to compute average session duration per user," the answer is session windows with a gap duration matching the inactivity threshold. Fixed windows are the wrong answer because user sessions do not align to clock minutes.

If the scenario mentions "rolling average over the last N minutes refreshed every M minutes," the answer is sliding windows with size N and period M. But watch for "every second" — that is the trap that points you to a stateful DoFn instead.

If the scenario describes "events can arrive up to 4 hours late and we need correct totals eventually," the answer involves allowed lateness of at least 4 hours and ACCUMULATING mode, plus a sink that supports merge or upsert semantics.

If the question mentions stopping a pipeline and asks about data integrity, drain is almost always the correct answer. Cancel is correct only when the question explicitly says "abandon in-flight data" or there is a corruption scenario.

If the question asks about Pub/Sub watermarks not advancing, the answer mentions the oldest unacknowledged message and the withTimestampAttribute configuration. Look for the trap option that suggests changing the window type — windowing has nothing to do with a stuck watermark at the source.

Memorize: streaming default trigger fires once at the watermark with DISCARDING and zero allowed lateness. Any other behavior must be explicitly configured. Half the windowing exam traps come from candidates assuming sensible defaults that do not exist. See: https://beam.apache.org/documentation/programming-guide/#default-trigger

Frequently Asked Questions (FAQ)

What is the difference between event time and processing time in Dataflow?

Event time is the timestamp the source attached to the element, representing when the event actually occurred in the real world. Processing time is the wall clock on the worker when Beam happens to be handling the element. Event time is what you almost always want for analytics, because it produces consistent results regardless of how slow or fast the pipeline runs. Processing time is appropriate only for things like "did my pipeline have a hiccup in the last minute," where you genuinely want a worker-clock answer.

How does the watermark advance for a Pub/Sub source in Dataflow?

The Pub/Sub I/O connector tracks the oldest unacknowledged message in the subscription. If you did not configure a custom timestamp attribute, the watermark is computed from the publish timestamps of those messages. If you configured withTimestampAttribute("your_attr"), the watermark is computed from that attribute on the oldest unacked message. This means a single stuck message with a very old timestamp will pin the watermark in place, preventing windows from closing.

When should I use a session window instead of a fixed window?

Use a session window when the natural boundary of your aggregation is defined by the activity of the entity itself rather than by clock time. User browsing sessions, IoT device activity bursts, and customer support conversations are all session-shaped. Use a fixed window when the question is about a clock-aligned period such as "revenue per hour" or "errors per minute."

What happens to late data if I do not configure allowed lateness?

It is silently dropped. The default streaming trigger fires once when the watermark passes the end of the window with zero allowed lateness, and any element arriving after that point is discarded without warning. The Cloud Monitoring metric dataflow.googleapis.com/job/dropped_late_data_count will tick up but most teams do not alert on it until they notice missing data downstream.

What is the difference between drain and cancel for a Dataflow streaming job?

Cancel terminates the job immediately, abandoning in-flight data and unacknowledging Pub/Sub messages. Drain stops reading from sources, advances the watermark to infinity so all windows close and emit their final panes, finishes processing in-flight elements, then shuts down. Drain preserves correctness; cancel preserves time. Use drain for planned shutdowns and code releases. Use cancel when the pipeline is corrupted or stuck in a way that drain cannot resolve.

Can I change windowing logic on a running pipeline using --update?

Some changes work, others do not. Adding triggers, changing allowed lateness, and swapping accumulation mode generally work because the underlying state representation does not change. Switching from fixed to sliding windows, changing window duration, or moving from windowed to global-windowed transforms are state-incompatible and will be rejected by the update validation step. For state-incompatible changes, drain the old job, take a snapshot if you need backfill, and start a fresh job.

Why are my sliding windows producing duplicate-looking rows in BigQuery?

A sliding window assigns each element to multiple overlapping windows, and each window emits its own pane. If you write panes directly to BigQuery without a window key, you get one row per window per group, which looks like duplicates but is correct. Add window_start and window_end to the row schema so each row is distinguishable, and aggregate downstream by selecting the most recent window or filtering to non-overlapping intervals.

Further Reading

Official sources

More PDE topics