Flink中的双流关联

作者:Administrator 发布时间: 2026-03-09 阅读量:11 评论数:0

在FLink中,经常会遇到双流关联的的场景,比如订单流和支付流的关联,用户行为流和用户信息流等。那么,在Flink中都提供了哪些双流的关联呢?下面主要从DataStream和Flink SQL角度入手说明。


DataStream

Window Join(窗口Join)

两条流在同一个时间窗口内进行Join,窗口结束时出触发计算。

// 设置10秒的窗口期
stream1.join(stream2)
    .where(r -> r.id)
    .equalTo(r -> r.id)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .apply((e1, e2) -> e1 + " " + e2);

Window Join的特点

  • 只能做 inner join
  • 两条流的数据必须落在同一个窗口内才能匹配
  • 支持 Tumbling / Sliding / Session Window

Window Join只能做 inner join,join不上的数据会直接丢弃,没有机制输出到侧输出流,所以谨慎使用。


Interval Join(区间Join)

基于事件时间,允许一条流的数据在另一条流数据的时间范围内进行匹配。

// stream1流的一个记录可以在stream2前后5秒的这个区间匹配
stream1.keyBy(r -> r.id)
    .intervalJoin(stream2.keyBy(r -> r.id))
    .between(Time.seconds(-5), Time.seconds(5))
    .process(new ProcessJoinFunction<>() {
        @Override
        public void processElement(Left l, Right r, Context ctx, Collector<String> out) {
            out.collect(l + " " + r);
        }
    });

Interval Join的特点:

  • 只支持 inner join
  • 只支持 event time
  • 比 Window Join 更灵活,适合时间偏差场景

同样,Interval Join只能做 inner join,join不上的数据会直接丢弃,没有机制输出到侧输出流,所以谨慎使用。


CoProcessFunction(底层 API)

CoprocessFunction是最灵活的方式,可以实现inner / left / right / full outer join,以及各种自定义逻辑。

stream1.keyBy(r -> r.id)
    .connect(stream2.keyBy(r -> r.id))
    .process(new CoProcessFunction<Left, Right, String>() {
        // 用 State 缓存两侧数据,自行实现匹配逻辑
        MapState<String, Left> leftState;
        MapState<String, Right> rightState;

        @Override
        public void processElement1(Left l, Context ctx, Collector<String> out) {
            // 查 rightState,匹配则输出,否则存入 leftState
        }

        @Override
        public void processElement2(Right r, Context ctx, Collector<String> out) {
            // 查 leftState,匹配则输出,否则存入 rightState
        }
    });

CoProcessFunction的特点:

  • 完全自定义,支持所有 join 类型
  • 需要手动管理 State 和 TTL,防止状态无限增长
  • 适合复杂业务逻辑

使用CoProcessFunction实现Left Join

下面使用CoProcessFunction实现Left Join。


public class LeftJoinExample {

    // 左流数据
    @Data
    @AllArgsConstructor
    public static class Order {
        public String orderId;
        public String userId;
        public long timestamp;
    }

    // 右流数据
    @Data
    @AllArgsConstructor
    public static class Payment {
        public String orderId;
        public double amount;
        public long timestamp;
    }

    // 输出结果
    @Data
    @AllArgsConstructor
    public static class JoinResult {
        public String orderId;
        public String userId;
        public Double amount; // 右流未匹配时为 null
    }

    public static class LeftJoinFunction extends CoProcessFunction<Order, Payment, JoinResult> {

        // 未匹配的左流数据
        private MapState<String, Order> orderState;
        // 右流数据缓存
        private MapState<String, Payment> paymentState;

        // 等待匹配的超时时间
        private static final long JOIN_TIMEOUT = 10_000L;

        @Override
        public void open(Configuration parameters) {
            orderState = getRuntimeContext().getMapState(
                new MapStateDescriptor<>("order-state", String.class, Order.class)
            );
            paymentState = getRuntimeContext().getMapState(
                new MapStateDescriptor<>("payment-state", String.class, Payment.class)
            );
        }

        @Override
        public void processElement1(Order order, Context ctx, Collector<JoinResult> out) throws Exception {
            Payment payment = paymentState.get(order.orderId);
            if (payment != null) {
                // 右流已有数据,直接 join 输出
                out.collect(new JoinResult(order.orderId, order.userId, payment.amount));
                paymentState.remove(order.orderId);
            } else {
                // 右流暂无数据,缓存左流,注册超时定时器
                orderState.put(order.orderId, order);
                ctx.timerService().registerEventTimeTimer(order.timestamp + JOIN_TIMEOUT);
            }
        }

        @Override
        public void processElement2(Payment payment, Context ctx, Collector<JoinResult> out) throws Exception {
            Order order = orderState.get(payment.orderId);
            if (order != null) {
                // 左流已有数据,直接 join 输出
                out.collect(new JoinResult(order.orderId, order.userId, payment.amount));
                orderState.remove(payment.orderId);
            } else {
                // 左流暂无数据,缓存右流等待
                paymentState.put(payment.orderId, payment);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<JoinResult> out) throws Exception {
            // 超时触发,左流数据仍未匹配,输出 null 右侧(left join 语义)
            for (Map.Entry<String, Order> entry : orderState.entries()) {
                Order order = entry.getValue();
                if (order.timestamp + JOIN_TIMEOUT <= timestamp) {
                    out.collect(new JoinResult(order.orderId, order.userId, null));
                    orderState.remove(entry.getKey());
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Order> orderStream = env.fromElements(
            new Order("o1", "u1", 1000L),
            new Order("o2", "u2", 2000L)
        ).assignTimestampsAndWatermarks(
            WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((e, t) -> e.timestamp)
        );

        DataStream<Payment> paymentStream = env.fromElements(
            new Payment("o1", 99.9, 3000L)
            // o2 没有对应支付,left join 应输出 null
        ).assignTimestampsAndWatermarks(
            WatermarkStrategy.<Payment>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((e, t) -> e.timestamp)
        );

        DataStream<JoinResult> result = orderStream.keyBy(o -> o.orderId)
            .connect(paymentStream.keyBy(p -> p.orderId))
            .process(new LeftJoinFunction());

        result.print();
        env.execute();
    }
}

上面是实现Left Join的简单Demo,在生产环境中建议给状态设置TTL,并使用RocksDB state backend。


选型

场景推荐方式
简单等值 join,时间对齐Window Join
有时间偏差的 inner joinInterval Join
需要 outer join 或复杂逻辑CoProcessFunction

Flink SQL

如果使用Table API或者SQL,支持更丰富的语义:

类型说明
Regular Join全量 join,状态持续保留
Interval Join基于时间区间
Temporal Join流与版本表/维表 join
Lookup Join流关联外部维表(如 MySQL、HBase)

Regular Join

全量 join,两侧数据都会永久保留在 state 中,适合数据量不大的场景。

-- inner join
SELECT o.order_id, p.amount
FROM orders o
JOIN payments p ON o.order_id = p.order_id

-- left join
SELECT o.order_id, p.amount
FROM orders o
LEFT JOIN payments p ON o.order_id = p.order_id

-- full outer join
SELECT o.order_id, p.amount
FROM orders o
FULL OUTER JOIN payments p ON o.order_id = p.order_id

Regular Join的特点:

  • state 无限增长,生产环境需要配置 state TTL:
table.exec.state.ttl: 86400000  # 1天,单位毫秒

Interval Join

基于事件时间区间的 join,state 可以自动清理,比 Regular Join 更适合生产。

SELECT o.order_id, p.amount
FROM orders o, payments p
WHERE o.order_id = p.order_id
  AND p.pay_time BETWEEN o.order_time - INTERVAL '5' MINUTE 
                     AND o.order_time + INTERVAL '10' MINUTE

Interval Join的特点:

  • 只支持 inner join
  • 两张表都必须有事件时间属性
  • 区间范围要合理,太大会导致 state 积压

Temporal Join(时态表Join)

流关联一张有版本概念的表,获取事件发生时刻对应的快照数据,典型场景是汇率、价格表。

-- 定义版本表(有主键 + 事件时间)
CREATE TABLE exchange_rates (
    currency     STRING,
    rate         DOUBLE,
    update_time  TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
    PRIMARY KEY (currency) NOT ENFORCED
) WITH (...);

-- 流关联版本表,取事件发生时的汇率
SELECT o.order_id, o.amount * r.rate AS amount_usd
FROM orders o
LEFT JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency

Temporal Join的特点:

  • 获取的是事件时间对应的历史快照,不是最新值
  • state 可以自动清理

Lookup Join

流关联外部维表(MySQL、HBase、Redis 等),实时查询外部系统补充字段。

-- 定义外部维表
CREATE TABLE user_info (
    user_id   STRING,
    user_name STRING,
    age       INT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/db',
    'table-name' = 'user_info'
);

-- 使用处理时间做 lookup join
SELECT o.order_id, u.user_name, o.amount
FROM orders o
LEFT JOIN user_info FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id

Lookup Join的特点:

  • 只支持处理时间(Processing Time)
  • 每条左流数据触发一次外部查询

如果不想每条左流数据触发一次外部查询,可以使用缓存机制,但对于变动频繁的维表不合适。

CREATE TABLE user_info (
    user_id   STRING,
    user_name STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/db',
    'table-name' = 'user_info',
    
    -- 开启缓存
    'lookup.cache' = 'PARTIAL',
    -- 最大缓存行数
    'lookup.partial-cache.max-rows' = '10000',
    -- 写入后过期时间
    'lookup.partial-cache.expire-after-write' = '60s',
    -- 读取后过期时间
    'lookup.partial-cache.expire-after-access' = '30s',
    -- 是否缓存空值
    'lookup.partial-cache.cache-missing-key' = 'true'
);


Window Join

基于窗口的 join,窗口内数据匹配,窗口结束后 state 自动清理。

SELECT o.order_id, p.amount
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) o
JOIN TABLE(TUMBLE(TABLE payments, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES)) p
ON o.order_id = p.order_id
AND o.window_start = p.window_start
AND o.window_end = p.window_end

Window Join的特点:

  • 支持 Tumbling / Hopping / Cumulate 三种窗口。
  • 只支持 inner join。

选型

类型outer joinstate 自动清理适用场景
Regular Join数据量小,逻辑简单
Interval Join有时间关联的双流
Temporal Join关联历史版本快照
Lookup Join关联外部维表
Window Join窗口内聚合后关联

适用场景

1. 订单 + 支付流
订单产生后,等待支付事件到来进行关联,判断是否支付成功。时间有偏差,适合 Interval Join 或 CoProcessFunction。

2. 行为流 + 用户信息流
用户点击/浏览行为流,关联用户画像流做实时enrichment。用户信息变化不频繁,适合 Temporal Join 或 Lookup Join。

3. 广告曝光 + 点击流
曝光事件和点击事件分属两条流,需要关联计算点击率。时间窗口内匹配,适合 Window Join 或 Interval Join。

4. 日志流 + 告警规则流
实时日志流关联动态告警规则流,规则会实时更新。适合 CoProcessFunction,规则侧用 broadcast state。

5. 交易流 + 风控事件流
交易发生后,关联风控系统产生的风险事件,判断是否拦截。对延迟敏感,适合 Interval Join。

6. 多源数据合并
同一业务实体的数据来自不同系统(如 MySQL binlog + Kafka 业务流),需要合并成完整记录。适合 CoProcessFunction 自定义 state 管理。

7. 实时对账
银行/支付场景,两条流分别来自不同系统的流水记录,需要实时核对是否一致。适合 CoProcessFunction 实现 full outer join。

评论