flink写入clickhouse

flink及clickhouse官方都是没有提供flink写clickhouse的SQL API

ClickHouse 建表

DROP TABLE IF EXISTS dev.person_score;

CREATE TABLE IF NOT EXISTS dev.person_score
(
    `pid` UInt8, 
    `name` String, 
    `age` UInt8,
    `score` UInt8,
    `create_time` datetime DEFAULT now()
)
ENGINE = TinyLog();

insert into person_score(`pid`,`name`,`age`,`score`) VALUES(0,'test',66,100)

select * from person_score;

SQL-Client DataGen建表语句

-- 删除表定义
drop table if exists person_score_datagen;

-- 创建表定义
CREATE TABLE if not exists person_score_datagen (
	id INT, 
	name STRING, 
	age INT,
	score INT,
	ts AS LOCALTIMESTAMP, 
	WATERMARK FOR ts AS ts ) 
WITH (
	'connector' = 'datagen',
-- 	每秒生成的行数:2
	'rows-per-second' = '2',
-- 	字段id选用序列生成器
	'fields.id.kind' = 'sequence',
	'fields.id.start' = '1',
	'fields.id.end' = '20',
-- 	随机生成器生成字符的长度:6
	'fields.name.length' = '6',
	'fields.age.min' = '20',
	'fields.age.max' = '30',
-- 	随机生成器的最小值:1
	'fields.score.min' = '60',
-- 	随机生成器的最大值:100
	'fields.score.max' = '100'
);

select * from person_score_datagen;
drop table if exists person_score_ck;

CREATE TABLE if not exists person_score_ck (
    pid INT,
    name VARCHAR,
    age INT,
    score INT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:clickhouse://hadoop-dev-3:8123/dev',
    'table-name' = 'person_score',
    'driver' = 'ru.yandex.clickhouse.ClickHouseDriver',
    'username' = 'default',
    'password' = 'ck_pwd',
    'lookup.cache.max-rows' = '5',
    'lookup.cache.ttl' = '10s'
);

flink及clickhouse官方都是没有提供flink写clickhouse的SQL API,对于DataStream API是可以通过clickhouse-jdbc,自定义SourceFunction、RichSinkFunction去读写clickhouse,相对比较简单。

DataStream API

  • SourceFunction 自己实现了一个数据模拟生成器的SourceFunction
import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * Created by azhe on 2021-03-09 15:42
 */
public class CustomerSource implements SourceFunction<DefaultEntity> {

  private List list;

  public CustomerSource(int recordSize) {
    this.list = MockEntitys.mockDefault(recordSize);
  }

  int index = 0;
  Boolean isRunning = true;

  @Override
  public void run(SourceContext<DefaultEntity> ctx) throws Exception {

    while (isRunning&&index<list.size()) {
      DefaultEntity obj = (DefaultEntity)list.get(index);
      ctx.collect(obj);
      index += 1;
      Thread.sleep(1000);
    }
  }

  @Override
  public void cancel() {

    isRunning = false;
  }
}
  • RichSinkFunction 实现SinkFunction将DataStream数据输出到clickhouse,数据格式没有做通用的,需要根据数据格式定制化编写,不够灵活。
import java.sql.Connection;
import java.sql.PreparedStatement;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * Created by azhe on 2021-03-09 17:40
 */
public class ClickHouseSink extends RichSinkFunction<DefaultEntity> {
  Connection connection = null;

  String sql;

  public ClickHouseSink(String sql) {
    this.sql = sql;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    connection = ClickHouseUtil.getConn();
  }

  @Override
  public void close() throws Exception {
    super.close();
    if (connection != null) {
      connection.close();
    }
  }

  @Override
  public void invoke(DefaultEntity defaultEntity, Context context) throws Exception {
    PreparedStatement preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setLong(1, defaultEntity.getId());
    preparedStatement.setString(2, defaultEntity.getName());
    preparedStatement.addBatch();

    long startTime = System.currentTimeMillis();
    int[] ints = preparedStatement.executeBatch();
    connection.commit();
    long endTime = System.currentTimeMillis();
    System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length);
  }
}

SQL API

根据官方提供的JDBC连接器改造,修改源码,支持Clickhouse,目前是根据flink_2.11-1.12.0版本重新编译

  • ClickHouseDialect
package org.apache.flink.connector.jdbc.dialect;

import org.apache.flink.connector.jdbc.internal.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

/** JDBC dialect for ClickHouse Created by azhe on 2021-03-10 20:43 . */
public class ClickHouseDialect extends AbstractDialect {

    private static final long serialVersionUID = 1L;

    // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
    // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
    private static final int MAX_TIMESTAMP_PRECISION = 6;
    private static final int MIN_TIMESTAMP_PRECISION = 1;

    // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
    // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
    private static final int MAX_DECIMAL_PRECISION = 65;
    private static final int MIN_DECIMAL_PRECISION = 1;

    @Override
    public int maxDecimalPrecision() {
        return MAX_DECIMAL_PRECISION;
    }

    @Override
    public int minDecimalPrecision() {
        return MIN_DECIMAL_PRECISION;
    }

    @Override
    public int maxTimestampPrecision() {
        return MAX_TIMESTAMP_PRECISION;
    }

    @Override
    public int minTimestampPrecision() {
        return MIN_TIMESTAMP_PRECISION;
    }

    @Override
    public String dialectName() {
        return "ClickHouse";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:clickhouse:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new ClickHouseRowConverter(rowType);
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "`" + identifier + "`";
    }

    @Override
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        return Optional.of(getInsertIntoStatement(tableName, fieldNames));
    }

    @Override
    public List<LogicalTypeRoot> unsupportedTypes() {
        // The data types used in Mysql are list at:
        // https://dev.mysql.com/doc/refman/8.0/en/data-types.html

        // TODO: We can't convert BINARY data type to
        //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
        // LegacyTypeInfoDataTypeConverter.
        return Arrays.asList(
                LogicalTypeRoot.BINARY,
                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
                LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
                LogicalTypeRoot.INTERVAL_YEAR_MONTH,
                LogicalTypeRoot.INTERVAL_DAY_TIME,
                LogicalTypeRoot.ARRAY,
                LogicalTypeRoot.MULTISET,
                LogicalTypeRoot.MAP,
                LogicalTypeRoot.ROW,
                LogicalTypeRoot.DISTINCT_TYPE,
                LogicalTypeRoot.STRUCTURED_TYPE,
                LogicalTypeRoot.NULL,
                LogicalTypeRoot.RAW,
                LogicalTypeRoot.SYMBOL,
                LogicalTypeRoot.UNRESOLVED);
    }
}
  • JdbcDialects
package org.apache.flink.connector.jdbc.dialect;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

/** Default JDBC dialects. */
public final class JdbcDialects {

    private static final List<JdbcDialect> DIALECTS =
            Arrays.asList(
                    new DerbyDialect(),
                    new MySQLDialect(),
                    new PostgresDialect(),
                    //  增加ClickHouseDialect
                    new ClickHouseDialect());

    /** Fetch the JdbcDialect class corresponding to a given database url. */
    public static Optional<JdbcDialect> get(String url) {
        for (JdbcDialect dialect : DIALECTS) {
            if (dialect.canHandle(url)) {
                return Optional.of(dialect);
            }
        }
        return Optional.empty();
    }
}
  • ClickHouseRowConverter
package org.apache.flink.connector.jdbc.internal.converter;

import org.apache.flink.table.types.logical.RowType;

/** Created by azhe on 2021-03-10 21:04 . */
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {

    private static final long serialVersionUID = 1L;

    @Override
    public String converterName() {
        return "ClickHouse";
    }

    public ClickHouseRowConverter(RowType rowType) {
        super(rowType);
    }
}

然后就是编译运行。

SQL-Client 插入数据到ClickHouse


INSERT INTO person_score_ck SELECT 2,'test2',30,80;

insert into person_score_ck
SELECT id as pid, name, age, score FROM person_score_datagen;