kafka-flink-iceberg
kafka-flink-iceberg 模拟生成数据发送到kafka,读取kafka数据,通过flink写入iceberg
1. kafka数据准备
- 准备merge数据
nohup java -jar stream-mock-1.0-SNAPSHOT.jar -t test -e merge -r 5 -s 1 &
2. SQL_client
- 启动flink
- 启动sql_client
sql-client.sh embedded -d $FLINK_HOME/conf/sql-client-hive.yaml -j flink_sql_client_conf/iceberg-flink-runtime-0.11.0.jar -j flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.11-1.11.1.jar - 建kafka源表表
use catalog myhive;
create database kafka_source;
CREATE TABLE kafka_source.merge_kafka_iceberg (
carPlate STRING,
plateColor INT,
siteNo STRING,
snapshotTime STRING,
snapshotTimestamp BIGINT,
snapshotDate STRING,
direction INT,
topic STRING,
extractTime STRING,
eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(snapshotTimestamp / 1000,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'stream-server:9092',
'properties.group.id' = 'sql_client_test_1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
select * from myhive.kafka_source.merge_kafka;
创建iceberg表
-- 创建iceberg&hive catalog
CREATE CATALOG iceberg_hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://hadoop-dev-1:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://stream-hdfs/user/hive/warehouse'
);
-- 切换catalog
use catalog iceberg_hive_catalog;
use iceberg_hive_db;
-- 创建iceberg表
-- CREATE TABLE merge_iceberg_partition_1 (
-- carPlate STRING,
-- plateColor INT,
-- siteNo STRING,
-- snapshotTime STRING,
-- snapshotTimestamp BIGINT,
-- snapshotDate STRING,
-- direction INT,
-- topic STRING,
-- extractTime STRING
-- ) PARTITIONED BY (snapshotDate);
-- 创建表如果是要跟hive打通,需要用javaAPI建表
INSERT into iceberg_hive_catalog.iceberg_hive_db.merge_iceberg_partition SELECT carPlate,plateColor,siteNo,snapshotTime,snapshotTimestamp,snapshotDate,direction,topic,extractTime from myhive.kafka_source.merge_kafka;
-- insert into merge_iceberg_partition values('晋B329SL',1,'G009233001000910020','2021-03-22 17:09:36',1616404176933,'20210322',1,'mock','2021-03-22 17:09:36');
mock数据
usage: Kafka producer Application [-b <kafkaProp>] [-e <arg>] [-h] [-l]
[-r <arg>] [-s <arg>] -t <arg>
-b,--bootstrap.servers <kafkaProp> kafka bootstrap servers.
<string>
eg:-b 1.1.1.1:9092,1.1.1.2:9092
-e,--entity <arg> mock entity: speed/radar/default.
<string>
eg:-e speed
-h,--help Print help
-l,--logPrint Whether to print logs. eg:-l
-r,--recordSize <arg> record size.
<int>
eg:-r 1
-s,--sleepSecond <arg> sleep second.
<double>
eg:-s 0.01
-t,--topic <arg> send records to kafka topic.
<string>
eg:-t test
<!-- 4天的数据量 60*60*24*4/0.2=1728000 -->
[admin@stream-server mock]$ nohup java -jar stream-mock-1.0-SNAPSHOT.jar -t test -e merge -r 1728000 -s 0.2 &
遇到问题
- kafka数据已经写入了data目录,但是查不到结果
没有开启checkpoint,
vim flink-conf.yaml
# state.backend: filesystem
state.backend: filesystem
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: hdfs://stream-hdfs/sql-client/flink-checkpoints
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://stream-hdfs/sql-client/flink-checkpoints
# state.backend.incremental: false
state.backend.incremental: false
execution.checkpointing.interval: 5s
