Flink中是怎么处理乱序数据的

作者:Administrator 发布时间: 2026-02-27 阅读量:6 评论数:0

Flink中是怎么处理乱序数据的

在 Flink 里,“乱序”本质是 事件时间(event time) 先后顺序和 到达时间(processing time) 不一致。Flink 处理乱序数据的核心机制主要围绕:事件时间语义 + Watermark + 窗口触发/延迟 + 迟到数据处理 + 状态与一致性

一、时间语义

Flink中有三种时间语义:

  • Processing Time:按机器当前时间处理,最简单、吞吐高,但对乱序不敏感(乱序会直接导致窗口统计不准)。
  • Event Time(最常用) ​:按事件自带时间戳处理,Flink 用 Watermark 推进“事件时间进度”,从而容忍一定乱序。
  • Ingestion Time:介于两者之间(进入 Flink 时打时间戳),现在用得较少。

乱序处理几乎都建立在 Event Time 上。

二、Watermark

Watermark是乱序容忍与“事件时间进度”的关键。可以理解为:“我认为未来不会再来 时间戳 ≤ watermark 的事件了”(近似判断,允许误差)。

常见生成策略

  • 有界乱序(Bounded Out-of-Orderness) ​:最常用
    设定最大乱序程度 maxOutOfOrderness,例如 5s:
    watermark ≈ 当前观测到的最大事件时间 - 5s
    → 能容忍 5 秒内乱序,超过就可能变成迟到数据。
  • 单调递增(Monotonous Timestamps) :适合源数据严格按时间递增(基本无乱序)。
  • 自定义 WatermarkGenerator:适合多分区、多来源、需要特殊规则(比如按业务字段分组、按分区对齐等)。

关键点

  • Watermark 不是“等数据到齐”,而是“推进时间并触发计算”的机制。
  • 对于 并行 source,下游算子的 watermark 通常取 各输入分区 watermark 的最小值,因此某个分区卡住会拖慢整体事件时间推进。

三、窗口(Window)与触发(Trigger)

乱序数据最典型场景是做窗口聚合(滚动、滑动、会话窗口)。

Flink 的窗口何时“关窗”

  • 默认基于事件时间时:当 watermark >= window_end 时触发窗口计算并输出。

允许迟到:Allowed Lateness(延迟关窗)

即使窗口第一次输出了,也可以设置:

  • allowedLateness = X​:窗口在 window_end​ 后再额外等 X 时间
    在这段时间内到来的迟到事件仍会进入窗口并触发 ​更新输出(取决于输出模式/下游算子)。
  • 超过 allowedLateness 的事件才会被视为“最终迟到”。

Trigger / Evictor(高级)

  • Trigger:自定义触发逻辑(例如每来一条就触发、每 N 秒触发一次、同时满足事件数/时间等)。
  • Evictor:触发前/后剔除窗口内元素(较少用,成本高,通常能用聚合/ProcessFunction替代)。

四、迟到数据(Late Events)处理

当事件到达时,它的事件时间戳已经 ​落后于 watermark(以及超过 allowed lateness),就会变成迟到数据。常见处理策略:

  1. 直接丢弃(默认)
    简单但会损失数据(很多实时大盘能接受)。
  2. 侧输出(Side Output)收集迟到数据
    把迟到数据打到一个旁路流,做补偿计算、落库、离线回补或告警。
  3. 允许迟到并更新结果(Allowed Lateness)
    窗口结果会被修正(需要下游能接收“更新/撤回”语义,或你用 upsert sink)。
  4. 用更长乱序容忍(更大的 watermark 延迟)
    减少迟到,但会增加延迟(latency)——典型的准确性 vs 延迟权衡。

五、有序输出与排序

如果你要的是“​按事件时间严格有序输出”,Flink 不会全局帮你排序(成本太高),但你可以:

  • Keyed 后用 KeyedProcessFunction + 状态 + 定时器
    将事件暂存一段时间(比如 5s),等 watermark/定时器到了再按时间戳输出。
  • 代价:更多状态、更高延迟、可能出现内存/状态膨胀,需要 TTL、容量控制。

六、与乱序紧密相关的运行时机制

乱序本身还会影响系统行为:

  • 状态(State) :窗口/乱序缓存都依赖 state,乱序越大、allowed lateness 越大,state 留存越久。
  • State TTL:防止“永不关闭”的 key 造成状态无限增长。
  • Checkpoint & Exactly-Once:保证乱序场景下也能在故障恢复后维持一致结果(尤其窗口更新、迟到补偿更依赖一致性)。
  • 反压(Backpressure) :watermark 推进慢、窗口堆积、state 变大,都可能导致下游慢→反压。

七、常见的选型建议

  • 目标:低延迟优先,允许少量误差
    用较小乱序 watermark(如 1-3s),不设置/少设置 allowed lateness,迟到侧输出做补偿。
  • 目标:结果尽量准确,延迟可接受
    watermark 延迟设大一些(如 10-60s),再配 allowed lateness;sink 用 upsert/幂等写支持更新。
  • 目标:必须严格按事件时间有序输出
    KeyedProcessFunction 缓存排序 + watermark/定时器释放,但要严格控制状态与延迟。

八、代码示例

8.1 DataStream API

8.1.1 定义Watermark

事件模型 + Watermark(容忍乱序)

public static class Event {
    public String userId;
    public String type;      // e.g. "click"
    public long eventTime;   // epoch millis
    public double amount;    // optional

    public Event() {}
    public Event(String userId, String type, long eventTime, double amount) {
        this.userId = userId;
        this.type = type;
        this.eventTime = eventTime;
        this.amount = amount;
    }
}

WatermarkStrategy:容忍 5 秒乱序 + source 空闲检测避免拖慢 watermark

import org.apache.flink.api.common.eventtime.*;
import java.time.Duration;

WatermarkStrategy<Event> wm = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((e, ts) -> e.eventTime)
    .withIdleness(Duration.ofSeconds(30)); // 可选:分区空闲 30s 视为 idle

8.1.2 窗口聚合


// 典型大盘统计:10 秒滚动窗口,允许再等 10 秒 修正;超过 allowed lateness 的走侧输出。

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

public class WindowAggDemo {

    private static final OutputTag<Event> LATE_TAG = new OutputTag<Event>("late-events") {};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Event> source = env.fromElements(
            new Event("u1","click",  1_000L, 1),
            new Event("u1","click",  4_000L, 1),
            new Event("u1","click",  3_000L, 1),  // 乱序
            new Event("u1","click", 12_000L, 1),
            new Event("u1","click",  2_000L, 1)   // 可能迟到
        );

        WatermarkStrategy<Event> wm = WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(java.time.Duration.ofSeconds(5))
            .withTimestampAssigner((e, ts) -> e.eventTime)
            .withIdleness(java.time.Duration.ofSeconds(30));

        SingleOutputStreamOperator<Event> withWm = source.assignTimestampsAndWatermarks(wm);

        SingleOutputStreamOperator<Tuple2<String, Long>> cnt = withWm
            .keyBy(e -> e.userId)
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .allowedLateness(Time.seconds(10))     // 窗口结束后再等 10s
            .sideOutputLateData(LATE_TAG)          // 超过 allowed lateness 的进侧输出
            .aggregate(new CountAgg());

        cnt.print("WINDOW");
        cnt.getSideOutput(LATE_TAG).print("LATE");

        env.execute("Window Agg Out-of-Order Demo");
    }

    public static class CountAgg implements AggregateFunction<Event, Long, Tuple2<String, Long>> {
        private String key;
        @Override public Long createAccumulator() { return 0L; }
        @Override public Long add(Event v, Long acc) { key = v.userId; return acc + 1; }
        @Override public Tuple2<String, Long> getResult(Long acc) { return Tuple2.of(key, acc); }
        @Override public Long merge(Long a, Long b) { return a + b; }
    }
}

8.1.3 Join

Interval Join


// 两条流都要 assign watermarks,否则事件时间 join 不会按预期工作。


import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

// left stream: orders
DataStream<Event> orders = ...;   // 记得 assignTimestampsAndWatermarks(wm)
// right stream: payments
DataStream<Event> payments = ...; // 记得 assignTimestampsAndWatermarks(wm)

SingleOutputStreamOperator<String> joined =
    orders.assignTimestampsAndWatermarks(wm)
          .keyBy(e -> e.userId)
          .intervalJoin(payments.assignTimestampsAndWatermarks(wm).keyBy(e -> e.userId))
          // 匹配条件:payment.eventTime 在 order.eventTime 之后 0~600s
          .between(Time.seconds(0), Time.minutes(10))
          // 可选:迟到容忍(Interval Join 的迟到处理主要靠 watermark + 这个参数)
          // .withLowerBoundExclusive() / .withUpperBoundExclusive() 也可用
          .process(new ProcessJoinFunction<Event, Event, String>() {
              @Override
              public void processElement(Event left, Event right, Context ctx, Collector<String> out) {
                  out.collect("JOIN user=" + left.userId +
                              " orderTs=" + left.eventTime +
                              " payTs=" + right.eventTime);
              }
          });

joined.print("INTERVAL_JOIN");

Interval Join 的关键风险:状态膨胀

  • join 本质要把一侧数据在 state 里“留一段时间等待匹配”
  • 等待窗口大(between 范围大)+ 乱序大(watermark 延迟大)= state 留得更久

Window Join

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

DataStream<Event> a = ...; // with watermarks
DataStream<Event> b = ...; // with watermarks

DataStream<String> windowJoined =
    a.keyBy(e -> e.userId)
     .join(b.keyBy(e -> e.userId))
     .where(e -> e.userId)
     .equalTo(e -> e.userId)
     .window(TumblingEventTimeWindows.of(Time.seconds(10)))
     .apply((left, right) -> "WIN_JOIN user=" + left.userId +
                             " aTs=" + left.eventTime + " bTs=" + right.eventTime);

windowJoined.print("WINDOW_JOIN");

8.2.1 SQL:声明 watermark + TUMBLE 窗口聚合

CREATE TABLE events (
  user_id STRING,
  type    STRING,
  ts      TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'events',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

CREATE TABLE sink_print (
  window_start TIMESTAMP(3),
  window_end   TIMESTAMP(3),
  user_id      STRING,
  cnt          BIGINT
) WITH ('connector'='print');

INSERT INTO sink_print
SELECT window_start, window_end, user_id, COUNT(*) AS cnt
FROM TABLE(
  TUMBLE(TABLE events, DESCRIPTOR(ts), INTERVAL '10' SECOND)
)
GROUP BY window_start, window_end, user_id;

8.2.2 SQL:流-流 Interval Join

-- 写法核心:两张表都要 watermark,并在 ON 条件里写清楚时间范围。

CREATE TABLE orders (
  order_id STRING,
  user_id  STRING,
  ts       TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE payments (
  pay_id  STRING,
  user_id STRING,
  ts      TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (...);

CREATE TABLE sink_join (
  order_id STRING,
  pay_id   STRING,
  user_id  STRING,
  order_ts TIMESTAMP(3),
  pay_ts   TIMESTAMP(3)
) WITH ('connector'='print');

INSERT INTO sink_join
SELECT
  o.order_id,
  p.pay_id,
  o.user_id,
  o.ts AS order_ts,
  p.ts AS pay_ts
FROM orders o
JOIN payments p
ON o.user_id = p.user_id
-- payment 在 order 之后 0~10分钟内
AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '10' MINUTE;

强烈建议)SQL 里配状态 TTL,防止 join 状态无限长

-- 让 join / 聚合等算子的 state 有 TTL(示例 2 小时,你按业务改)
SET 'table.exec.state.ttl' = '2 h';

评论