Tumbling Windows (TUMBLE)
Tumbling windows assign each element to a window of a specified duration. These windows have a fixed size and do not overlap. For instance, with a 5-minute tumbling window, Flink creates a new window every 5 minutes, where each record belongs to exactly one window.
Use cases: Minute-level aggregations such as page views (PV) and unique visitors (UV). A practical example is calculating concurrent users per minute or total sales per dimension per minute.
Flink 1.13 introduced two approaches for tumbling windows:
- Group Window Aggregation (deprecated in 1.13+, not recommended)
- Windowing TVF (recommended for 1.13+)
Group Window Aggregation (Batch/Streaming)
CREATE TABLE source_orders (
dimension STRING,
customer_id BIGINT,
amount BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension.length' = '1',
'fields.customer_id.min' = '1',
'fields.customer_id.max' = '100000',
'fields.amount.min' = '1',
'fields.amount.max' = '100000'
);
CREATE TABLE output_metrics (
dimension STRING,
total_count BIGINT,
total_amount BIGINT,
max_amount BIGINT,
min_amount BIGINT,
unique_customers BIGINT,
window_start_ts BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO output_metrics
SELECT
dimension,
COUNT(*) AS total_count,
SUM(amount) AS total_amount,
MAX(amount) AS max_amount,
MIN(amount) AS min_amount,
COUNT(DISTINCT customer_id) AS unique_customers,
UNIX_TIMESTAMP(CAST(TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS STRING)) * 1000 AS window_start_ts
FROM source_orders
GROUP BY
dimension,
TUMBLE(event_time, INTERVAL '1' MINUTE);
The syntax places the window function in the GROUP BY clause: TUMBLE(event_time, INTERVAL '1' MINUTE). The first argument is the event time timestamp, and the second specifies the window duration.
Windowing TVF (Streaming Only)
CREATE TABLE source_orders (
dimension STRING,
customer_id BIGINT,
amount BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension.length' = '1',
'fields.customer_id.min' = '1',
'fields.customer_id.max' = '100000',
'fields.amount.min' = '1',
'fields.amount.max' = '100000'
);
CREATE TABLE output_metrics (
dimension STRING,
window_start_ts BIGINT,
total_count BIGINT,
total_amount BIGINT,
max_amount BIGINT,
min_amount BIGINT,
unique_customers BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO output_metrics
SELECT
dimension,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 AS window_start_ts,
COUNT(*) AS total_count,
SUM(amount) AS total_amount,
MAX(amount) AS max_amount,
MIN(amount) AS min_amount,
COUNT(DISTINCT customer_id) AS unique_customers
FROM TABLE(TUMBLE(TABLE source_orders, DESCRIPTOR(event_time), INTERVAL '60' SECOND))
GROUP BY
window_start,
window_end,
dimension;
The window function appears in the FROM clause: TABLE(TUMBLE(TABLE source_orders, DESCRIPTOR(event_time), INTERVAL '60' SECOND)). The three parameters are: the source table, the time attribute descriptor, and the window size.
For event time processing, the tumbling window operator emits results when the incoming Watermark exceeds the window's end time. This mechanism ensures complete data collection before triggering computations.
Sliding Windows (HOP)
Sliding windows also have a fixed size but include a second parameter controlling the slide interval. When the slide interval is smaller than the window size, windows overlap. A record can belong to multiple windows in this case.
Use case: Computing concurrent online users with results emitted every minute, where each result covers the past 5 minutes of data.
Group Window Aggregation (Batch/Streaming)
CREATE TABLE source_orders (
dimension STRING,
customer_id BIGINT,
amount BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension.length' = '1',
'fields.customer_id.min' = '1',
'fields.customer_id.max' = '100000',
'fields.amount.min' = '1',
'fields.amount.max' = '100000'
);
CREATE TABLE output_metrics (
dimension STRING,
unique_customers BIGINT,
window_start_ts BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO output_metrics
SELECT
dimension,
UNIX_TIMESTAMP(CAST(HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS STRING)) * 1000 AS window_start_ts,
COUNT(DISTINCT customer_id) AS unique_customers
FROM source_orders
GROUP BY
dimension,
HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE);
The syntax is HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE), where the parameters are: event time timestamp, slide interval, and window size.
Windowing TVF (Streaming Only)
CREATE TABLE source_orders (
dimension STRING,
customer_id BIGINT,
amount BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension.length' = '1',
'fields.customer_id.min' = '1',
'fields.customer_id.max' = '100000',
'fields.amount.min' = '1',
'fields.amount.max' = '100000'
);
CREATE TABLE output_metrics (
dimension STRING,
unique_customers BIGINT,
window_start_ts BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO output_metrics
SELECT
dimension,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 AS window_start_ts,
COUNT(DISTINCT customer_id) AS unique_customers
FROM TABLE(HOP(TABLE source_orders, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
GROUP BY
window_start,
window_end,
dimension;
The window function is TABLE(HOP(TABLE source_orders, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES)) with four parameters: source table, time attribute, slide interval, and window size.
Session Windows (SESSION)
Session windows differ from tumbling and sliding windows because they have no fixed duration. A session window closes when a specified gap period (Session Gap) elapses with out new data arriving.
Use case: Calculating the total number of items purchased by each user during an active sesion. If a user remains inactive for 5 minutes, the session is considered closed.
Flink 1.13 does not support Session windows with Window TVF, so only Group Window Aggregation is available.
Group Window Aggregation (Batch/Streaming)
CREATE TABLE user_purchases (
dimension STRING,
customer_id BIGINT,
amount BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dimension.length' = '1',
'fields.customer_id.min' = '1',
'fields.customer_id.max' = '100000',
'fields.amount.min' = '1',
'fields.amount.max' = '100000'
);
CREATE TABLE output_metrics (
dimension STRING,
purchase_count BIGINT,
window_start_ts BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO output_metrics
SELECT
dimension,
UNIX_TIMESTAMP(CAST(SESSION_START(event_time, INTERVAL '5' MINUTE) AS STRING)) * 1000 AS window_start_ts,
COUNT(1) AS purchase_count
FROM user_purchases
GROUP BY
dimension,
SESSION(event_time, INTERVAL '5' MINUTE);
The syntax is SESSION(event_time, INTERVAL '5' MINUTE) in the GROUP BY clause. The parameters are the event time timestamp and the Session Gap interval. Sestion windows support both processing time and event time semantics, but processing time is limited to streaming jobs only.
Cumulative Windows (CUMULATE)
Cumulative windows are tumbling windows with early-fire triggering. They are particularly useful for metrics that accumulate over time, such as daily cumulative PV/UV where the result at 10:00 represents the cumulative total from 00:00 to 10:00.
Use case: Computing cumulative metrics within a period, such as daily cumulative revenue and unique user counts up to the current minute.
Sample input:
| time | id | money |
|---|---|---|
| 2021-11-01 00:01:00 | A | 3 |
| 2021-11-01 00:01:00 | B | 5 |
| 2021-11-01 00:01:00 | A | 7 |
| 2021-11-01 00:02:00 | C | 3 |
| 2021-11-01 00:03:00 | C | 10 |
Expected output:
| time | distinct_id_count | sum_money |
|---|---|---|
| 2021-11-01 00:01:00 | 2 | 15 |
| 2021-11-01 00:02:00 | 3 | 18 |
| 2021-11-01 00:03:00 | 3 | 28 |
The window opens with a maximum size and emits intermediate results at regular intervals. Only Windowing TVF supports cumulative windows.
Windowing TVF (Streaming Only)
CREATE TABLE transaction_log (
customer_id BIGINT,
revenue BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.customer_id.min' = '1',
'fields.customer_id.max' = '100000',
'fields.revenue.min' = '1',
'fields.revenue.max' = '100000'
);
CREATE TABLE daily_cumulative_metrics (
window_end_ts BIGINT,
window_start_ts BIGINT,
total_revenue BIGINT,
distinct_customers BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO daily_cumulative_metrics
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 AS window_end_ts,
window_start,
SUM(revenue) AS total_revenue,
COUNT(DISTINCT customer_id) AS distinct_customers
FROM TABLE(CUMULATE(TABLE transaction_log, DESCRIPTOR(event_time), INTERVAL '60' SECOND, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end;
The window function is TABLE(CUMULATE(TABLE transaction_log, DESCRIPTOR(event_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)) with four parameters: source table, time attribute, step granularity (trigger interval), and maximum window size.
Grouping Sets with Window TVF
Real-world scenarios often require computing metrics across multiple dimensional combinations. While writing separate queries and unioning them works, it is inefficient and verbose. Flink SQL supports GROUPING SETS within Window TVF to handle this elegantly.
Example: Computing daily cumulative unique users across different dimension combinations (overall, by age, by gender, by age+gender).
CREATE TABLE user_activity (
age_group STRING,
gender STRING,
user_id BIGINT,
event_time AS CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.age_group.length' = '1',
'fields.gender.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000'
);
CREATE TABLE multi_dim_output (
age_group STRING,
gender STRING,
unique_users BIGINT,
window_end_ts BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO multi_dim_output
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 AS window_end_ts,
IF(age_group IS NULL, 'ALL', age_group) AS age_group,
IF(gender IS NULL, 'ALL', gender) AS gender,
COUNT(DISTINCT user_id) AS unique_users
FROM TABLE(CUMULATE(TABLE user_activity, DESCRIPTOR(event_time), INTERVAL '5' SECOND, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end,
GROUPING SETS (
(),
(age_group),
(gender),
(age_group, gender)
);
Note that Flink SQL's GROUPING SETS syntax differs from Hive SQL. In Hive, you would apply it directly to the raw table without the Window TVF wrapper.