SQL 客户端

Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的表程序中。

此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了 Java/Scala 程序员对 Flink 的使用。

SQL 客户端 的目的是提供一种简单的方式来编写、调试和提交表程序到 Flink 集群上,而无需写一行 Java 或 Scala 代码。SQL 客户端命令行界面(CLI) 能够在命令行中检索和可视化分布式应用中实时产生的结果。

系统环境

  • MAC OS
  • flink1.12.0

运行环境

  1. 启动集群 start-cluster.sh
  2. 启动SQL-Client sql-client.sh embedded

DataGen连接器

  • DataGen 连接器允许按数据生成规则进行读取。
  • DataGen 连接器可以使用计算列语法。 这使您可以灵活地生成记录。
  • DataGen 连接器是内置的。

注意 不支持复杂类型: Array,Map,Row。 请用计算列构造这些类型。

SQL-Client DataGen建表语句

-- 删除表定义
drop table if exists person_score_datagen;
-- 创建表定义
CREATE TABLE if not exists person_score_datagen (
	id INT, 
	name STRING, 
	age INT,
	score INT,
	ts AS LOCALTIMESTAMP, 
	WATERMARK FOR ts AS ts ) 
WITH (
	'connector' = 'datagen',
-- 	每秒生成的行数:2
	'rows-per-second' = '2',
-- 	字段id选用序列生成器
	'fields.id.kind' = 'sequence',
	'fields.id.start' = '1',
	'fields.id.end' = '20',
-- 	随机生成器生成字符的长度:6
	'fields.name.length' = '6',
	'fields.age.min' = '20',
	'fields.age.max' = '30',
-- 	随机生成器的最小值:1
	'fields.score.min' = '60',
-- 	随机生成器的最大值:100
	'fields.score.max' = '100'
);

DataGen 连接器 参数

参数是否必选默认参数数据类型描述
connector必须(none)String指定要使用的连接器,这里是 ‘datagen’。
rows-per-second可选10000Long每秒生成的行数,用以控制数据发出速率。
fields.#.kind可选randomString指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。
fields.#.min可选(Minimum value of type)(Type of field)随机生成器的最小值,适用于数字类型。
fields.#.max可选(Maximum value of type)(Type of field)随机生成器的最大值,适用于数字类型。
fields.#.length可选100Integer随机生成器生成字符的长度,适用于 char、varchar、string。
fields.#.start可选(none)(Type of field)序列生成器的起始值。
fields.#.end可选(none)(Type of field)序列生成器的结束值。

查询结果

select * from person_score_datagen;

MySQL连接器

依赖包

  1. maven依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
  1. 启动Jar包依赖
  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.40.jar

MySQL建表语句

DROP TABLE IF EXISTS `person_score`;
CREATE TABLE `person_score` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `pid` bigint(20) NOT NULL COMMENT '一个批次内顺序id',
  `name` varchar(32) NOT NULL COMMENT '姓名',
  `age` int(3) NOT NULL COMMENT '年龄',
  `score` int(5) NOT NULL COMMENT '得分',
  `creator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '创建者',
  `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='人员分数表';

INSERT INTO person_score( `pid`, `name`, `age`, `score`, `creator`) VALUES ( 1, 'test', 66, 100, 'Init');

SQL-Client JDBC建表语句

-- 删除定义表
drop table if exists person_score_mysql;

CREATE TABLE if not exists person_score_mysql (
	pid BIGINT,
  name STRING, 
	age INT,
	score INT,
	creator STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://stream-dev:3306/dev',
    'table-name' = 'person_score',
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = 'mysql@123',
    'lookup.cache.max-rows' = '5',
    'lookup.cache.ttl' = '10s'
);

select * from person_score_mysql;

JDBC 连接器 参数

参数是否必选默认参数数据类型描述
connector必选的(none)String指定要使用的连接器,此处应为’jdbc’。
url必选的(none)StringJDBC数据库URL。
table-name必选的(none)String要连接的JDBC表的名称。
driver可选的(none)String用于连接到该URL的JDBC驱动程序的类名,如果未设置,它将自动从URL派生。
username可选的(none)StringJDBC用户名。如果同时指定了两者’username’,则’password’必须同时指定两者。
password可选的(none)StringJDBC密码。
scan.partition.column可选的(none)String用于对输入进行分区的列名。有关更多详细信息,请参见以下“分区扫描”部分。
scan.partition.num可选的(none)Integer分区数。
scan.partition.lower-bound可选的(none)Integer第一个分区的最小值。
scan.partition.upper-bound可选的(none)Integer最后一个分区的最大值。
scan.fetch-size可选的0Integer每次往返读取时应从数据库中获取的行数。如果指定的值为零,则忽略提示。
scan.auto-commit可选的trueBoolean在JDBC驱动程序上设置自动提交标志,该标志确定是否在事务中自动提交每个语句。一些JDBC驱动程序,特别是 Postgres,可能要求将此设置为false以便流式传输结果。
lookup.cache.max-rows可选的(none)Integer查找缓存的最大行数(超过此值),最旧的行将过期。默认情况下,查找缓存处于禁用状态。有关更多详细信息,请参见下面的“查找缓存”部分。
lookup.cache.ttl可选的(none)Duration查找缓存中每一行的最长生存时间,在这段时间内,最旧的行将过期。默认情况下,查找缓存处于禁用状态。有关更多详细信息,请参见下面的“查找缓存”部分。
lookup.max-retries可选的3Integer查找数据库失败时的最大重试时间。
sink.buffer-flush.max-rows可选的100Integer刷新前缓冲记录的最大大小。可以设置为零以禁用它。
sink.buffer-flush.interval可选的1sDuration刷新间隔不断变化,在这段时间内,异步线程将刷新数据。可以设置'0’为禁用它。注意,‘sink.buffer-flush.max-rows’可以将其设置为'0’刷新间隔设置,以允许对缓冲的动作进行完全异步处理。
sink.max-retries可选的3Integer如果将记录写入数据库失败,则最大重试时间。

SQL-Client 插入数据到MySQL

insert into person_score_mysql
SELECT id as pid, name, age, score,'sql-client' FROM person_score_datagen;

执行结果 Flink UI