在FLink中,经常会遇到双流关联的的场景,比如订单流和支付流的关联,用户行为流和用户信息流等。那么,在Flink中都提供了哪些双流的关联呢?下面主要从DataStream和Flink SQL角度入手说明。
DataStream
Window Join(窗口Join)
两条流在同一个时间窗口内进行Join,窗口结束时出触发计算。
// 设置10秒的窗口期
stream1.join(stream2)
.where(r -> r.id)
.equalTo(r -> r.id)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1, e2) -> e1 + " " + e2);
Window Join的特点:
- 只能做 inner join
- 两条流的数据必须落在同一个窗口内才能匹配
- 支持 Tumbling / Sliding / Session Window
Window Join只能做 inner join,join不上的数据会直接丢弃,没有机制输出到侧输出流,所以谨慎使用。
Interval Join(区间Join)
基于事件时间,允许一条流的数据在另一条流数据的时间范围内进行匹配。
// stream1流的一个记录可以在stream2前后5秒的这个区间匹配
stream1.keyBy(r -> r.id)
.intervalJoin(stream2.keyBy(r -> r.id))
.between(Time.seconds(-5), Time.seconds(5))
.process(new ProcessJoinFunction<>() {
@Override
public void processElement(Left l, Right r, Context ctx, Collector<String> out) {
out.collect(l + " " + r);
}
});
Interval Join的特点:
- 只支持 inner join
- 只支持 event time
- 比 Window Join 更灵活,适合时间偏差场景
同样,Interval Join只能做 inner join,join不上的数据会直接丢弃,没有机制输出到侧输出流,所以谨慎使用。
CoProcessFunction(底层 API)
CoprocessFunction是最灵活的方式,可以实现inner / left / right / full outer join,以及各种自定义逻辑。
stream1.keyBy(r -> r.id)
.connect(stream2.keyBy(r -> r.id))
.process(new CoProcessFunction<Left, Right, String>() {
// 用 State 缓存两侧数据,自行实现匹配逻辑
MapState<String, Left> leftState;
MapState<String, Right> rightState;
@Override
public void processElement1(Left l, Context ctx, Collector<String> out) {
// 查 rightState,匹配则输出,否则存入 leftState
}
@Override
public void processElement2(Right r, Context ctx, Collector<String> out) {
// 查 leftState,匹配则输出,否则存入 rightState
}
});
CoProcessFunction的特点:
- 完全自定义,支持所有 join 类型
- 需要手动管理 State 和 TTL,防止状态无限增长
- 适合复杂业务逻辑
使用CoProcessFunction实现Left Join
下面使用CoProcessFunction实现Left Join。
public class LeftJoinExample {
// 左流数据
@Data
@AllArgsConstructor
public static class Order {
public String orderId;
public String userId;
public long timestamp;
}
// 右流数据
@Data
@AllArgsConstructor
public static class Payment {
public String orderId;
public double amount;
public long timestamp;
}
// 输出结果
@Data
@AllArgsConstructor
public static class JoinResult {
public String orderId;
public String userId;
public Double amount; // 右流未匹配时为 null
}
public static class LeftJoinFunction extends CoProcessFunction<Order, Payment, JoinResult> {
// 未匹配的左流数据
private MapState<String, Order> orderState;
// 右流数据缓存
private MapState<String, Payment> paymentState;
// 等待匹配的超时时间
private static final long JOIN_TIMEOUT = 10_000L;
@Override
public void open(Configuration parameters) {
orderState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("order-state", String.class, Order.class)
);
paymentState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("payment-state", String.class, Payment.class)
);
}
@Override
public void processElement1(Order order, Context ctx, Collector<JoinResult> out) throws Exception {
Payment payment = paymentState.get(order.orderId);
if (payment != null) {
// 右流已有数据,直接 join 输出
out.collect(new JoinResult(order.orderId, order.userId, payment.amount));
paymentState.remove(order.orderId);
} else {
// 右流暂无数据,缓存左流,注册超时定时器
orderState.put(order.orderId, order);
ctx.timerService().registerEventTimeTimer(order.timestamp + JOIN_TIMEOUT);
}
}
@Override
public void processElement2(Payment payment, Context ctx, Collector<JoinResult> out) throws Exception {
Order order = orderState.get(payment.orderId);
if (order != null) {
// 左流已有数据,直接 join 输出
out.collect(new JoinResult(order.orderId, order.userId, payment.amount));
orderState.remove(payment.orderId);
} else {
// 左流暂无数据,缓存右流等待
paymentState.put(payment.orderId, payment);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<JoinResult> out) throws Exception {
// 超时触发,左流数据仍未匹配,输出 null 右侧(left join 语义)
for (Map.Entry<String, Order> entry : orderState.entries()) {
Order order = entry.getValue();
if (order.timestamp + JOIN_TIMEOUT <= timestamp) {
out.collect(new JoinResult(order.orderId, order.userId, null));
orderState.remove(entry.getKey());
}
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Order> orderStream = env.fromElements(
new Order("o1", "u1", 1000L),
new Order("o2", "u2", 2000L)
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, t) -> e.timestamp)
);
DataStream<Payment> paymentStream = env.fromElements(
new Payment("o1", 99.9, 3000L)
// o2 没有对应支付,left join 应输出 null
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Payment>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, t) -> e.timestamp)
);
DataStream<JoinResult> result = orderStream.keyBy(o -> o.orderId)
.connect(paymentStream.keyBy(p -> p.orderId))
.process(new LeftJoinFunction());
result.print();
env.execute();
}
}
上面是实现Left Join的简单Demo,在生产环境中建议给状态设置TTL,并使用RocksDB state backend。
选型
| 场景 | 推荐方式 |
|---|---|
| 简单等值 join,时间对齐 | Window Join |
| 有时间偏差的 inner join | Interval Join |
| 需要 outer join 或复杂逻辑 | CoProcessFunction |
Flink SQL
如果使用Table API或者SQL,支持更丰富的语义:
| 类型 | 说明 |
|---|---|
| Regular Join | 全量 join,状态持续保留 |
| Interval Join | 基于时间区间 |
| Temporal Join | 流与版本表/维表 join |
| Lookup Join | 流关联外部维表(如 MySQL、HBase) |
Regular Join
全量 join,两侧数据都会永久保留在 state 中,适合数据量不大的场景。
-- inner join
SELECT o.order_id, p.amount
FROM orders o
JOIN payments p ON o.order_id = p.order_id
-- left join
SELECT o.order_id, p.amount
FROM orders o
LEFT JOIN payments p ON o.order_id = p.order_id
-- full outer join
SELECT o.order_id, p.amount
FROM orders o
FULL OUTER JOIN payments p ON o.order_id = p.order_id
Regular Join的特点:
- state 无限增长,生产环境需要配置 state TTL:
table.exec.state.ttl: 86400000 # 1天,单位毫秒
Interval Join
基于事件时间区间的 join,state 可以自动清理,比 Regular Join 更适合生产。
SELECT o.order_id, p.amount
FROM orders o, payments p
WHERE o.order_id = p.order_id
AND p.pay_time BETWEEN o.order_time - INTERVAL '5' MINUTE
AND o.order_time + INTERVAL '10' MINUTE
Interval Join的特点:
- 只支持 inner join
- 两张表都必须有事件时间属性
- 区间范围要合理,太大会导致 state 积压
Temporal Join(时态表Join)
流关联一张有版本概念的表,获取事件发生时刻对应的快照数据,典型场景是汇率、价格表。
-- 定义版本表(有主键 + 事件时间)
CREATE TABLE exchange_rates (
currency STRING,
rate DOUBLE,
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (...);
-- 流关联版本表,取事件发生时的汇率
SELECT o.order_id, o.amount * r.rate AS amount_usd
FROM orders o
LEFT JOIN exchange_rates FOR SYSTEM_TIME AS OF o.order_time AS r
ON o.currency = r.currency
Temporal Join的特点:
- 获取的是事件时间对应的历史快照,不是最新值
- state 可以自动清理
Lookup Join
流关联外部维表(MySQL、HBase、Redis 等),实时查询外部系统补充字段。
-- 定义外部维表
CREATE TABLE user_info (
user_id STRING,
user_name STRING,
age INT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/db',
'table-name' = 'user_info'
);
-- 使用处理时间做 lookup join
SELECT o.order_id, u.user_name, o.amount
FROM orders o
LEFT JOIN user_info FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id
Lookup Join的特点:
- 只支持处理时间(Processing Time)
- 每条左流数据触发一次外部查询
如果不想每条左流数据触发一次外部查询,可以使用缓存机制,但对于变动频繁的维表不合适。
CREATE TABLE user_info (
user_id STRING,
user_name STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/db',
'table-name' = 'user_info',
-- 开启缓存
'lookup.cache' = 'PARTIAL',
-- 最大缓存行数
'lookup.partial-cache.max-rows' = '10000',
-- 写入后过期时间
'lookup.partial-cache.expire-after-write' = '60s',
-- 读取后过期时间
'lookup.partial-cache.expire-after-access' = '30s',
-- 是否缓存空值
'lookup.partial-cache.cache-missing-key' = 'true'
);
Window Join
基于窗口的 join,窗口内数据匹配,窗口结束后 state 自动清理。
SELECT o.order_id, p.amount
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) o
JOIN TABLE(TUMBLE(TABLE payments, DESCRIPTOR(pay_time), INTERVAL '10' MINUTES)) p
ON o.order_id = p.order_id
AND o.window_start = p.window_start
AND o.window_end = p.window_end
Window Join的特点:
- 支持 Tumbling / Hopping / Cumulate 三种窗口。
- 只支持 inner join。
选型
| 类型 | outer join | state 自动清理 | 适用场景 |
|---|---|---|---|
| Regular Join | ✅ | ❌ | 数据量小,逻辑简单 |
| Interval Join | ❌ | ✅ | 有时间关联的双流 |
| Temporal Join | ✅ | ✅ | 关联历史版本快照 |
| Lookup Join | ✅ | ✅ | 关联外部维表 |
| Window Join | ❌ | ✅ | 窗口内聚合后关联 |
适用场景
1. 订单 + 支付流
订单产生后,等待支付事件到来进行关联,判断是否支付成功。时间有偏差,适合 Interval Join 或 CoProcessFunction。
2. 行为流 + 用户信息流
用户点击/浏览行为流,关联用户画像流做实时enrichment。用户信息变化不频繁,适合 Temporal Join 或 Lookup Join。
3. 广告曝光 + 点击流
曝光事件和点击事件分属两条流,需要关联计算点击率。时间窗口内匹配,适合 Window Join 或 Interval Join。
4. 日志流 + 告警规则流
实时日志流关联动态告警规则流,规则会实时更新。适合 CoProcessFunction,规则侧用 broadcast state。
5. 交易流 + 风控事件流
交易发生后,关联风控系统产生的风险事件,判断是否拦截。对延迟敏感,适合 Interval Join。
6. 多源数据合并
同一业务实体的数据来自不同系统(如 MySQL binlog + Kafka 业务流),需要合并成完整记录。适合 CoProcessFunction 自定义 state 管理。
7. 实时对账
银行/支付场景,两条流分别来自不同系统的流水记录,需要实时核对是否一致。适合 CoProcessFunction 实现 full outer join。