CREATE ROUTINE LOAD [数据库名].[任务名] ON [目标表]
COLUMNS (列1, 列2, ...) -- 指定映射字段(可选)
PROPERTIES
(
"format" = "json",
"desired_concurrent_number" = "3", -- 并发数(建议≤Kafka分区数)
"max_batch_interval" = "20", -- 单次导入间隔(秒)
"max_batch_rows" = "200000", -- 单批次最大行数
"max_batch_size" = "209715200" -- 单批次大小(字节)
)
FROM KAFKA
(
"kafka_broker_list" = "kafka1:9092,kafka2:9092", -- Kafka集群地址
"kafka_topic" = "your_topic", -- Topic名称
"property.group.id" = "doris_consumer_group", -- 消费者组
"property.security.protocol" = "SASL_PLAINTEXT", -- 安全协议(可选)
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "admin",
"property.sasl.password" = "password"
)
监控任务状态
-- 查看所有 Routine Load 任务
SHOW ROUTINE LOAD;
-- 查看指定任务详情(包括消费进度、错误信息)
SHOW ROUTINE LOAD FOR [任务名]\G
-- 查看ods库下面的所有任务
use ods;
SHOW ALL ROUTINE LOAD
管理任务
-- 暂停任务
PAUSE ROUTINE LOAD FOR [任务名];
-- 恢复任务
RESUME ROUTINE LOAD FOR [任务名];
-- 删除任务
STOP ROUTINE LOAD FOR [任务名];
create routine load ods.sync_bs_app_intercept_request_data_kafka on ods_bs_app_intercept_request_data_rl
columns(imei, app_package, version, model, ip, province, city, district, platform, standard_package, intercept_level, occur_date, week_date, month_date, create_time)
properties(
"format" = "json"
)FROM KAFKA
(
"kafka_broker_list" = "rom-kafka-0:9092,rom-kafka-1:9092,rom-kafka-2:9092",
"kafka_topic" = "app_intercept_request_data",
"property.group.id" = "doris_consumer_group",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
RESUME ROUTINE LOAD FOR ods.sync_bs_app_intercept_request_data_kafka;
STOP ROUTINE LOAD FOR ods.sync_bs_app_intercept_request_data_kafka;
PAUSE ROUTINE LOAD FOR ods.sync_bs_app_intercept_request_data_kafka;
CREATE ROUTINE LOAD testdb.example_routine_load_json ON test_routineload_tbl
COLUMNS(user_id,name,age)
PROPERTIES(
"format"="json",
"jsonpaths"="[\"$.user_id\",\"$.name\",\"$.age\"]"
)
FROM KAFKA(
"kafka_broker_list" = "192.168.88.62:9092",
"kafka_topic" = "test-routine-load-json",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
{
"lv1": "inspect",
"lv2": "competitor",
"lv3": "",
"mt": 1752998857000,
"task_id": "1125072005390201010797",
"site_no": "01010797",
"competitor_feedback": "",
"competitor_proof": "manager/change/20250720/01010797/691272.jpg",
"competitor_type": "无",
"price_diff": "未知"
}
CREATE TABLE test.ods_quna_inspect_competitor_last_rl
(
`site_no` varchar(255) NULL COMMENT '机器编号',
`status` string comment '审核状态字段(审核通过或待审核的:枚举值)',
`mt` datetime NULL COMMENT '更新时间',
`dt` date NULL COMMENT '日期'
) ENGINE = OLAP UNIQUE KEY(`site_no`)
COMMENT '巡检任务 竞品反馈信息 最新一条信息 实时同步'
DISTRIBUTED BY HASH(`site_no`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
CREATE
ROUTINE LOAD
test.ods_quna_inspect_competitor_last_rl ON ods_quna_inspect_competitor_last_rl
COLUMNS(site_no, `status`, t_mt,mt=FROM_UNIXTIME(t_mt/1000,'yyyy-MM-dd HH:mm:ss'), dt=FROM_UNIXTIME(t_mt/1000,'yyyy-MM-dd'))
PROPERTIES(
"format"="json",
c
)
FROM KAFKA
(
"kafka_broker_list" = "qzsh-storenode01:9092,qzsh-storenode02:9092,qzsh-storenode03:9092",
"kafka_topic" = "ods_quna_inspect_competitor_kafka",
"property.group.id" = "doris_consumer_group_20250730",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);