3.Nifi典型案例
NIFI 典型案例 [toc]
课程目标
1、离线同步 Mysql 数据到 DFS
2、Json 内容转换为 Hive 支持的文本格式
3、实时同步 Mysql 数据到 Hive
4、Kafka 的使用
1.离线同步 Mysql 数据到 hdfs
大数据数据仓库系统中,经常需要进行数据同步操作,可以使用 nifi 来进行灵活的全流程操作。
准备工作:
- 启动 Mysql 服务(5.7 版本),在 Mysql 中运行
\资料\mysql\nifi_test.sql中的 SQL 语句。 - 启动 Hadoop 集群(与 NiFi 集群在同一个可访问的局域网网段)
1.1 处理器流程
QueryDatabaseTable ——> ConvertAvroToJSON ——> SplitJson ——> PutHDFS
QueryDatabaseTable 读取 Mysql 数据,ConvertAvroToJSON 将数据转换为可阅读的 Json 格式,再通过 SplitJson 进行切割获得单独的对象,PutHDFS 将所有对象写入 HDFS 中。
1.2 处理器说明
QueryDatabaseTable
描述
生成 SQL 选择查询,或使用提供的语句,并执行该语句以获取其指定的“最大值”列中的值大于先前看到的最大值的所有行。查询结果将转换为 Avro 格式。几种属性都支持表达式语言,但不允许传入连接。变量注册表可用于为包含表达式语言的任何属性提供值。如果需要利用流文件属性来执行这些查询,则可以将 GenerateTableFetch 和/或 ExecuteSQL 处理器用于此目的。使用流技术,因此支持任意大的结果集。使用标准调度方法,可以将该处理器调度为在计时器或 cron 表达式上运行。该处理器只能在主节点上运行。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示所有默认值,以及属性是否支持 NiFi 表达式语言。
| 名称 | 默认值 | 描述 |
|---|---|---|
| Database Connection Pooling Service | 用于获得与数据库的连接的 Controller Service。DBCPConnectionPoolLookupDBCPConnectionPooHiveConnectionPool | |
| Database Type | 泛型 | 数据库的类型/风格,用于生成特定于数据库的代码。在许多情况下,通用类型就足够了,但是某些数据库(例如 Oracle)需要自定义 SQL 子句。GenericOracleOracle 12+MS SQL 2012+MS SQL 2008MySQL |
| Table Name | 要查询的数据库表的名称。使用自定义查询时,此属性用于别名查询,并在 FlowFile 上显示为属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Columns to Return | 查询中要使用的列名的逗号分隔列表。如果您的数据库需要对名称进行特殊处理(例如,引号),则每个名称都应包括这种处理。如果未提供任何列名,则将返回指定表中的所有列。注意:对于给定的表使用一致的列名很重要,这样增量提取才能正常工作。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Additional WHERE clause | 构建 SQL 查询时要在 WHERE 条件中添加的自定义子句。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Custom Query | 用于检索数据的自定义 SQL 查询。代替从其他属性构建 SQL 查询,此查询将包装为子查询。查询必须没有 ORDER BY 语句。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Maximum-value Columns | 列名的逗号分隔列表。自处理器开始运行以来,处理器将跟踪返回的每一列的最大值。使用多列意味着列列表的顺序,并且期望每列的值比前一列的值增长得更慢。因此,使用多个列意味着列的层次结构,通常用于分区表。该处理器只能用于检索自上次检索以来已添加/更新的那些行。请注意,某些 JDBC 类型(例如位/布尔值)不利于保持最大值,因此这些类型的列不应在此属性中列出,并且会在处理期间导致错误。如果未提供任何列,则将考虑表中的所有行,这可能会对性能产生影响。注意:对于给定的表使用一致的最大值列名称很重要,这样增量提取才能正常工作。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Max Wait Time | 0 秒 | 正在运行的 SQL 选择查询所允许的最长时间,零表示没有限制。少于 1 秒的最长时间将等于零。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Fetch Size | 0 | 一次要从结果集中获取的结果行数。这是对数据库驱动程序的提示,可能不被尊重和/或精确。如果指定的值为零,则忽略提示。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Max Rows Per Flow File | 0 | 一个 FlowFile 中将包含的最大结果行数。这将使您可以将非常大的结果集分解为多个 FlowFiles。如果指定的值为零,那么所有行都将在单个 FlowFile 中返回。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Output Batch Size | 0 | 提交流程会话之前要排队的输出 FlowFiles 的数量。设置为零时,将在处理所有结果集行并且输出 FlowFiles 准备好转移到下游关系时提交会话。对于较大的结果集,这可能导致在处理器执行结束时传输大量的 FlowFiles。如果设置了此属性,则当指定数量的 FlowFiles 准备好进行传输时,将提交会话,从而将 FlowFiles 释放到下游关系。注意:设置此属性后,将不会在 FlowFiles 上设置 maxvalue。*和 fragment.count 属性。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Maximum Number of Fragments | 0 | 最大片段数。如果指定的值为零,那么将返回所有片段。当此处理器提取大表时,这可以防止 OutOfMemoryError。注意:设置此属性可能会导致数据丢失,因为未按顺序排列传入结果,并且片段可能会在任意边界处终止,其中结果集中不包含行。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Normalize Table/Column Names | 假 | 是否将列名中的非 Avro 兼容字符更改为 Avro 兼容字符。例如,冒号和句号将更改为下划线,以建立有效的 Avro 记录。真正假 |
| Transaction Isolation Level | 此设置将为支持此设置的驱动程序设置数据库连接的事务隔离级别。TRANSACTION_NONETRANSACTION_READ_COMMITTEDTRANSACTION_READ_UNCOMMITTEDTRANSACTION_REPEATABLE_READTRANSACTION_SERIALIZABLE | |
| Use Avro Logical Types | 假 | 是否对 DECIMAL / NUMBER,DATE,TIME 和 TIMESTAMP 列使用 Avro 逻辑类型。如果禁用,则写为字符串。如果启用,则使用逻辑类型并将其写为其基础类型,特别是 DECIMAL / NUMBER 为逻辑“十进制”:以具有附加精度和小数位元数据的字节形式写入,DATE 为逻辑“ date-millis”:以 int 表示天自 Unix 时代(1970-01-01)起,TIME 为逻辑’time-millis’:写为 int,表示自 Unix 纪元以来的毫秒数; TIMESTAMP 为逻辑’timestamp-millis’:写为长时,表示自 Unix 纪元以来的毫秒数。如果书面 Avro 记录的阅读者也知道这些逻辑类型,则可以根据阅读器的实现在更多上下文中反序列化这些值。真正假 |
| Default Decimal Precision | 10 | 当将 DECIMAL / NUMBER 值写入为“十进制” Avro 逻辑类型时,需要表示可用位数的特定“精度”。通常,精度是由列数据类型定义或数据库引擎默认定义的。但是,某些数据库引擎可以返回未定义的精度(0)。写入那些未定义的精度数字时,将使用“默认十进制精度”。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Default Decimal Scale | 0 | 当将 DECIMAL / NUMBER 值写入为“十进制” Avro 逻辑类型时,需要一个特定的“标度”来表示可用的十进制数字。通常,规模是由列数据类型定义或数据库引擎默认定义的。但是,当返回未定义的精度(0)时,某些数据库引擎的比例也可能不确定。写入那些未定义的数字时,将使用“默认小数位数”。如果一个值的小数位数超过指定的小数位数,那么该值将被四舍五入,例如 1.53 在小数位数为 0 时变为 2,在小数位数 1 时变为 1.5。 支持表达式语言:true(仅使用变量注册表进行评估) |
ConvertAvroToJSON
描述
将 Binary Avro 记录转换为 JSON 对象。该处理器提供了 Avro 字段到 JSON 字段的直接映射,因此,生成的 JSON 将具有与 Avro 文档相同的层次结构。请注意,Avro 模式信息将丢失,因为这不是从二进制 Avro 到 JSON 格式的 Avro 的转换。输出 JSON 编码为 UTF-8 编码。如果传入的 FlowFile 包含多个 Avro 记录的流,则生成的 FlowFile 将包含一个 JSON Array,其中包含所有 Avro 记录或 JSON 对象序列。如果传入的 FlowFile 不包含任何记录,则输出为空 JSON 对象。空/单个 Avro 记录 FlowFile 输入可以根据“包装单个记录”的要求选择包装在容器中。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示任何默认值。
| 名称 | 默认值 | 允许值 | 描述 |
|---|---|---|---|
| JSON 容器选项 | 数组 | 没有数组 | 确定如何显示记录流:作为单个 Object 序列(无)(即,将每个 Object 写入新行),或者作为 Objects 数组(array)。 |
| 包装单条记录 | 假 | 真正假 | 确定是否将空记录或单个记录的结果输出包装在“ JSON 容器选项”指定的容器数组中 |
| Avro 模式 | 如果 Avro 记录不包含架构(仅基准),则必须在此处指定。 |
SplitJson
描述
该处理器使用 JsonPath 表达式指定需要的数组元素,将 JSON 数组分割为多个单独的流文件。每个生成的流文件都由指定数组的一个元素组成,并传输到关系“split”,原始文件传输到关系“original”。如果没有找到指定的 JsonPath,或者没有对数组元素求值,则将原始文件路由到“failure”,不会生成任何文件。
该处理器需要使用人员掌握 JsonPath 表达式语言。
属性配置
在下面的列表中,必需属性的名称以粗体显示。任何其他属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值),以及属性是否支持表达式语言。
| 属性名称 | 默认值 | 可选值 | 描述 |
|---|---|---|---|
| JsonPath Expression | 一个 JsonPath 表达式,它指定用以分割的数组元素。 | ||
| Null Value Representation | 1 | empty stringthe string ’null' | 指定结果为空值时的表示形式。 |
PutHDFS
描述
将 FlowFile 数据写入 Hadoop 分布式文件系统(HDFS)
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示所有默认值,以及属性是否支持 NiFi 表达式语言。
| 名称 | 默认值 | 允许值 | 描述 |
|---|---|---|---|
| Hadoop Configuration Resources | 由文件或逗号分隔的文件列表,其中包含 Hadoop 文件系统配置。否则,Hadoop 将在类路径中搜索“ core-site.xml”和“ hdfs-site.xml”文件,或者将恢复为默认配置。要使用 swebhdfs,请参阅 PutHDFS 文档的“其他详细信息”部分。 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Kerberos Credentials Service | 指定应用于 Kerberos 身份验证的 Kerberos 凭据控制器服务 | ||
| Kerberos Principal | Kerberos 主体作为身份验证。需要在您的 nifi.properties 中设置 nifi.kerberos.krb5.file。 支持的表达语言:true(仅使用变量注册表进行评估) | ||
| Kerberos Keytab | 与主体关联的 Kerberos 密钥表。需要在您的 nifi.properties 中设置 nifi.kerberos.krb5.file。 支持的表达语言:true(仅使用变量注册表进行评估) | ||
| Kerberos Relogin Period | 4 小时 | 尝试重新登录 kerberos 之前应该经过的时间。此属性已被弃用,并且对处理没有影响。现在,重新登录会自动发生。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Additional Classpath Resources | 以逗号分隔的文件和/或目录的路径列表,该列表将添加到类路径中,并用于加载本机库。指定目录时,该目录中所有具有的文件都将添加到类路径中,但不包括其他子目录。 | ||
| Directory | 文件应写入的父 HDFS 目录。如果目录不存在,将创建该目录。 支持表达式语言:true(将使用流文件属性和变量注册表进行评估) | ||
| Conflict Resolution Strategy | 失败 | 更换 忽视 失败 附加 | 指示当输出目录中已经存在同名文件时应该怎么办 |
| Block Size | 写入 HDFS 的每个块的大小。这将覆盖 Hadoop 配置 | ||
| IO Buffer Size | IO 期间用于缓冲文件内容的内存量。这将覆盖 Hadoop 配置 | ||
| Replication | HDFS 复制每个文件的次数。这将覆盖 Hadoop 配置 | ||
| Permissions umask | 用八进制数表示的 umask,用于确定写入 HDFS 的文件的权限。这将覆盖 Hadoop 属性“ fs.permissions.umask-mode”。如果未定义此属性和“ fs.permissions.umask-mode”,则将使用 Hadoop 默认值“ 022”。 | ||
| Remote Owner | 写入后,将 HDFS 文件的所有者更改为此值。仅当 NiFi 以具有 HDFS 超级用户特权来更改所有者的用户身份运行时才有效 支持表达式语言:true(将使用流文件属性和变量注册表进行评估) | ||
| Remote Group | 写入后,将 HDFS 文件的组更改为此值。仅当 NiFi 以具有 HDFS 超级用户特权来更改组的用户身份运行时才有效 支持表达式语言:true(将使用流文件属性和变量注册表进行评估) | ||
| Compression codec | 没有 | 没有 默认 邮编 邮编 LZ4 LZO 贪睡 自动 | 没有描述。 |
| Ignore Locality | 假 | 真正假 | 指示 HDFS 系统忽略位置规则,以便在群集中随机分配数据 |
1.3 操作
1.3.1 创建组

1.3.2 创建 QueryDatabaseTable

1.3.3 创建并配置 Mysql 连接池
1.3.3.1 创建连接池

1.3.3.2 配置连接池

Database Connection URL = jdbc:mysql://192.168.52.6:3306/nifi_test?characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
Database Driver Class Name = com.mysql.jdbc.Driver
#此处的jar包需要提前上传到nifi服务器中
Database Driver Location(s) = /export/download/jars/mysql-connector-java-5.1.40.jar
Database User = root
Password = 123456
1.3.3.3 启动连接池

1.3.4 配置 QueryDatabaseTable


Custom Query = select id,name,mobile,email,son_json from user_info_nifi limit 15
1.3.5 创建配置 ConvertAvroToJSON
1.3.5.1 创建配置 ConvertAvroToJSON

1.3.5.2 连接

1.3.5.3 负载均衡消费数据

1.3.6 创建配置 SplitJson
1.3.6.1 SplitJson 配置

JsonPath Expression = $.*
1.3.6.2 连接

1.3.7 创建配置 PutHDFS


Hadoop Configuration Resources = /export/download/config/hdfs-site.xml,/export/download/config/core-site.xml
Directory = /user/hive/warehouse/nifi_test.db/user_info_nifi
Conflict Resolution Strategy = append
1.3.8 运行查看效果
1.3.8.1 启动 QueryDatabaseTable,并查看队列中数据

1.3.8.2 启动 ConvertAvroToJSON,并查看队列中数据

1.3.8.3 启动 SplitJson,并查看队列中数据

1.3.8.4 启动 PutHDFS,并查看处理器接收和输出的数据

1.3.8.5 查看 HDFS 数据

2. Json 内容转换为 Hive 支持的文本格式
在向 HDFS 同步数据的示例中,我们保存的文本内容是 Json 格式的,如图:

如果数据需要被 Hive 的外部表所使用,那么目前的 Json 数据格式是不满足要求的,我们如何将 Json 格式数据转换为 Hive 所需要的文本格式呢?
2.1 处理器流程
QueryDatabaseTable ——> ConvertAvroToJSON ——> SplitJson ——> EvaluateJsonPath ——> ReplaceText ——> PutHDFS
这里的重点是,增加了 EvaluateJsonPath 和 ReplaceText 处理器,EvaluateJsonPath 用来提取 json 中的属性,ReplaceText 用来替换掉 FlowFile 中的内容,以使内容符合 Hive 外部表所支持的文本格式。
- 将 Json 数据中的属性值提取出来;
- 转换为\t 分割字段;\n 分割行数据的格式。
2.2 处理器说明
EvaluateJsonPath
描述
该处理器根据流文件的内容计算一个或多个 JsonPath 表达式。这些表达式的结果被写入到 FlowFile 属性,或者写入到 FlowFile 本身的内容中,这取决于处理器的配置。通过添加用户自定义的属性来输入 jsonpath,添加的属性的名称映射到输出流中的属性名称(如果目标是 flowfile-attribute;否则,属性名将被忽略)。属性的值必须是有效的 JsonPath 表达式。“auto-detect”的返回类型将根据配置的目标进行确定。当“Destination”被设置为“flowfile-attribute”时,将使用“scalar”的返回类型。当“Destination”被设置为“flowfile-content”时,将使用“JSON”返回类型。如果 JsonPath 计算为 JSON 数组或 JSON 对象,并且返回类型设置为“scalar”,则流文件将不进行修改,并将路由到失败。如果所提供的 JsonPath 计算为指定的值,JSON 的返回类型可以返回“scalar”。如果目标是“flowfile-content”,并且 JsonPath 没有计算到一个已定义的路径,那么流文件将被路由到“unmatched”,无需修改其内容。如果目标是“flowfile-attribute”,而表达式不匹配任何内容,那么将使用空字符串创建属性作为值,并且 FlowFile 将始终被路由到“matched”。
属性配置
在下面的列表中,必需属性的名称以粗体显示。任何其他属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值),以及属性是否支持表达式语言。
| 属性名称 | 默认值 | 可选值 | 描述 |
|---|---|---|---|
| Destination | flowfile-content | flowfile-contentflowfile-content | 指示是否将 JsonPath 计算结果写入流文件内容或流文件属性;如果使用 flowfile-attribute,则必须指定属性名称属性。如果设置为 flowfile-content,则只能指定一个 JsonPath,并且忽略属性名。 |
| Return Type | auto-detect | auto-detectjsonscalar | 指示 JSON 路径表达式的期望返回类型。选择“auto-detect”,“flowfile-content”的返回类型自动设置为“json”,“flowfile-attribute”的返回类型自动设置为“scalar”。 |
| Path Not Found Behavior | ignore | warnignore | 指示在将 Destination 设置为“flowfile-attribute”时如何处理丢失的 JSON 路径表达式。当没有找到 JSON 路径表达式时,选择“warn”将生成一个警告。 |
| Null Value Representation | empty string | empty stringempty string | 指示产生空值的 JSON 路径表达式的所需表示形式。 |
动态属性:
该处理器允许用户指定属性的名称和值。
| 属性名称 | 属性值 | 描述 |
|---|---|---|
| 用户自由定义的属性名称 | 用户自由定义的属性值 | 在该处理器生成的文件流上添加用户自定义的属性。如果使用表达式语言,则每批生成的流文件只执行一次计算 . 支持表达式语言:true(只使用变量注册表进行计算) |
应用场景
通常当需要从流文件 json 中提取某些数据作为流属性时,使用此处理器;或者从流文件 json 内容中提取一部分内容作为下一个流文件内容,使用此处理器。
ReplaceText
描述
使用其他值替换匹配正则表达式的流文件部分内容,从而更新流文件的内容。
属性配置
在下面的列表中,必需属性的名称以粗体显示。任何其他属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值),以及属性是否支持表达式语言。
| 属性名称 | 默认值 | 可选值 | 描述 |
|---|---|---|---|
| Search Value | (?s)(^.*$) | 正则表达式,仅用于“Literal Replace”和“Regex Replace”匹配策略 支持表达式语言:true | |
| Replacement Value | $1 | 使用“Replacement Strategy”策略时插入的值。 支持表达式语言:true | |
| Character Set | UTF-8 | 字符集 | |
| Maximum Buffer Size | 1 MB | 指定要缓冲的最大数据量(每个文件或每行,取决于计算模式),以便应用替换。如果选择了“Entire Text”,并且流文件大于这个值,那么流文件将被路由到“failure”。在“Line-by-Line”模式下,如果一行文本比这个值大,那么 FlowFile 将被路由到“failure”。默认值为 1 MB,主要用于“Entire Text”模式。在“Line-by-Line”模式中,建议使用 8 KB 或 16 KB 这样的值。如果将<Replacement Strategy>属性设置为一下其中之一:Append、Prepend、Always Replace,则忽略该值 | |
| Replacement Strategy | Regex Replace | PrependAppendRegex ReplaceLiteral ReplaceAlways Replace | 在流文件的文本内容中如何替换以及替换什么内容的策略。 |
| Evaluation Mode | Entire text | Line-by-LineEntire text | 对每一行单独进行“替换策略”(Line-by-Line);或将整个文件缓冲到内存中(Entire text),然后对其进行“替换策略”。 |
应用场景
使用正则表达式,来逐行或者全文本替换文件流内容,往往用于业务逻辑处理。
2.3 操作
2.3.1 EvaluateJsonPath 提取 Json 字段值
2.3.1.1 创建并连接 EvaluateJsonPath

2.3.1.2 将 Json 字段配置到 attribute

flowfile-attribute 即为将变量放置在属性中;
扩展属性就是我们读取到的 Json 属性。
同时处理把Invalid警告处理掉

2.3.1.3 启动查看结果

我们可以看到,经过 EvaluateJsonPath 处理后,FlowFile 的属性中,已经包含了 Json 的字段值。

2.3.2 ReplaceText 变更文本内容和格式
虽然我们已经获取到了 Json 中的具体字段值,但是可以看到,FlowFile 的内容还是 Json。如何替换掉 FlowFile 中的内容数据呢?
2.3.2.1 创建 ReplaceText 并连接

2.3.2.2 解决 Invalid

2.3.2.3 配置替换 FlowFile 内容

2.3.3 运行查看结果
2.3.3.1 运行所有处理器,查看最后的 FlowFile 输出

2.3.3.2 查看输出的 HDFS 文件

我们发现,最终的 HDFS 文件,已经符合 Hive 的格式要求,抽取完成。
3.实时同步 Mysql 数据到 Hive
3.1 处理器流程
NiFi 监控 MySQL binlog 处理调用流程如下: CaptureChangeMySQL ——> RouteOnAttribute ——> EvaluateJsonPath ——> ReplaceText ——> PutHiveQL
准备工作
Mysql 建库建表:
create table nifi_test.nifi_hive
(
id int auto_increment
primary key,
name varchar(64) null,
day_time date null
);
Hive 建表:
CREATE TABLE myhive.nifi_hive(id int,name string,day_time string)
STORED AS ORC
TBLPROPERTIES('transactional'='true');
替换 Hive 支持 nar 包:
上传文件NiFi\资料\nifi安装包\nifi-hive-nar-1.9.2.nar,将其替换到 NiFi 服务的 lib 目录下,并重启 NiFi 集群。
3.2 处理器说明
CaptureChangeMySQL
描述
从 MySQL 数据库检索更改数据捕获(CDC)事件。CDC 事件包括 INSERT,UPDATE,DELETE 操作。事件将作为单独的流文件输出,并按操作发生的时间排序。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示任何默认值,属性是否支持NiFi 表达式语言以及属性是否被视为“敏感”,这意味着将加密其值。在敏感属性中输入值之前,请确保nifi.properties文件具有属性nifi.sensitive.props.key的条目。
| 名称 | 默认值 | 允许值 | 描述 |
|---|---|---|---|
| MySQL Hosts | 与 MySQL 群集中的节点相对应的主机名/端口条目的列表。条目应使用冒号(例如 host1:port,host2:port 等)以逗号分隔。例如 mysql.myhost.com:3306。该处理器将尝试按顺序连接到列表中的主机。如果一个节点发生故障并为集群启用了故障转移,则处理器将连接到活动节点(假定在此属性中指定了其主机条目。MySQL 连接的默认端口为 3306。 支持表达式语言:true(将为仅使用变量注册表进行评估) | ||
| MySQL Driver Class Name | MySQL 数据库驱动程序类的类名称 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| MySQL Driver Location(s) | 包含 MySQL 驱动程序 JAR 及其依赖项(如果有)的文件/文件夹和/或 URL 的逗号分隔列表。例如,“ / var / tmp / mysql-connector-java-5.1.38-bin.jar” 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Username | 访问 MySQL 集群的用户名 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Password | 访问 MySQL 集群的密码 敏感属性:true 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Server ID | 连接到 MySQL 复制组的客户端实际上是一个简化的从属服务器(服务器),并且服务器 ID 值在整个复制组中必须是唯一的(即不同于任何主服务器或从属服务器使用的任何其他服务器 ID)。因此,每个 CaptureChangeMySQL 实例在复制组中必须具有唯一的服务器 ID。如果未指定服务器 ID,则默认值为 65535。 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Database/Schema Name Pattern | 用于将数据库(或模式,取决于 RDBMS 术语)与 CDC 事件列表进行匹配的正则表达式(regex)。正则表达式必须与存储在 RDBMS 中的数据库名称匹配。如果未设置该属性,则数据库名称将不会用于过滤 CDC 事件。注意:DDL 事件(即使它们影响不同的数据库)也与会话用来执行 DDL 的数据库相关联。这意味着,如果与一个数据库建立了连接,但针对另一个数据库发出了 DDL,则连接的数据库将是与指定模式匹配的数据库。 | ||
| Table Name Pattern | 用于影响影响匹配表的 CDC 事件的正则表达式(regex)。正则表达式必须与存储在数据库中的表名匹配。如果未设置该属性,则不会基于表名过滤任何事件。 | ||
| Max Wait Time | 30 秒 | 建立连接所允许的最长时间,零表示实际上没有限制。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Distributed Map Cache Client | 标识用于保留有关处理器所需的各种表,列等的信息的分布式映射缓存客户端控制器服务。如果未指定客户端,则生成的事件将不包括列类型或名称信息。 | ||
| Retrieve All Records | true | truefalse | 指定是否获取所有可用的 CDC 事件,而与当前 binlog 文件名和/或位置无关。如果 binlog 文件名和位置值存在于处理器的状态中,则将忽略此属性的值。这允许进行 4 种不同的配置:1)如果 Binlog 数据在处理器状态下可用,则该数据用于确定开始位置,并且“检索所有记录”的值将被忽略。2)如果没有二进制日志数据处于处理器状态,则将“检索所有记录”设置为 true 表示从二进制日志历史记录的开头开始。3)如果没有 Binlog 数据处于处理器状态,并且未设置 Initial Binlog 文件名/位置,则 Retrieve All Records 设置为 false 意味着从 Binlog 历史记录的末尾开始。4)如果没有二进制日志数据处于处理器状态,并且设置了初始二进制日志文件名/位置,然后将“检索所有记录”设置为 false 意味着从指定的初始 Binlog 文件/位置开始。要重置行为,请清除处理器状态(请参阅处理器文档的“状态管理”部分)。 |
| Include Begin/Commit Events | false | truefalse | 指定是否在二进制日志中发出与 BEGIN 或 COMMIT 事件相对应的事件。如果在下游流中需要 BEGIN / COMMIT 事件,则将其设置为 true,否则将其设置为 false,这将抑制这些事件的产生并提高流性能。 |
| Include DDL Events | false | truefalse | 指定是否在二进制日志中发出与数据定义语言(DDL)事件相对应的事件,例如 ALTER TABLE,TRUNCATE TABLE。如果在下游流中需要 DDL 事件是必需的,则将其设置为 true;否则,将其设置为 false,这将抑制这些事件的生成并提高流性能。 |
| State Update Interval | 0 秒 | 指示使用二进制日志文件/位置值更新处理器状态的频率。零值表示仅在处理器停止或关闭时才更新状态。如果在某个时候处理器状态不包含所需的二进制日志值,则发出的最后一个流文件将包含最后观察到的值,并且可以使用“初始二进制日志文件”,“初始二进制日志位置”和“初始序列”将处理器返回到该状态。 ID 属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Initial Sequence ID | 指定一个初始序列标识符,如果该处理器的状态没有当前序列标识符,则使用该序列标识符。如果处理器的状态中存在序列标识符,则将忽略此属性。序列标识符是单调递增的整数,它记录处理器生成的流文件的顺序。它们可以与 EnforceOrder 处理器一起使用,以保证 CDC 事件的有序交付。 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Initial Binlog Filename | 指定一个初始 binlog 文件名,如果该处理器的 State 没有当前 binlog 文件名,则使用该文件名。如果处理器的状态中存在文件名,则忽略此属性。如果不需要先前的事件,可以将其与初始 Binlog 位置一起使用以“向前跳过”。请注意,支持 NiFi 表达式语言,但是在配置处理器时会评估此属性,因此可能不会使用 FlowFile 属性。支持使用表达式语言来启用变量注册表和/或环境属性。 支持表达式语言:true(仅使用变量注册表进行评估) | ||
| Initial Binlog Position | 如果该处理器的 State 没有当前的 binlog 文件名,则指定要使用的 binlog 的初始偏移量(由 Initial Binlog Filename 指定)。如果处理器的状态中存在文件名,则忽略此属性。如果不需要先前的事件,可以将其与初始 Binlog 文件名一起使用以“向前跳过”。请注意,支持 NiFi 表达式语言,但是在配置处理器时会评估此属性,因此可能不会使用 FlowFile 属性。支持使用表达式语言来启用变量注册表和/或环境属性。 支持表达式语言:true(仅使用变量注册表进行评估) |
写入属性
| 名称 | 描述 |
|---|---|
| cdc.sequence.id | 序列标识符(即严格增加的整数值),用于指定 CDC 事件流文件相对于其他事件流文件的顺序。 |
| cdc.event.type | 一个字符串,指示发生的 CDC 事件的类型,包括(但不限于)‘begin’, ‘insert’, ‘update’, ‘delete’, ‘ddl’ 和 ‘commit’。 |
| mime.type | 处理器以 JSON 格式输出流文件内容,并将 mime.type 属性设置为 application / json |
DistributedMapCacheServer
描述
提供可通过套接字访问的映射(键/值)缓存。与该服务的交互通常是通过 DistributedMapCacheClient 服务完成的。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示任何默认值。
| 名称 | 默认值 | 允许值 | 描述 |
|---|---|---|---|
| 港口 | 4557 | 侦听传入连接的端口 | |
| 最大缓存条目 | 10000 | 缓存可以容纳的最大缓存条目数 | |
| 驱逐策略 | 最少使用 | 最少使用最近最少使用先进先出 | 确定应使用哪种策略从缓存中逐出值以为新条目腾出空间 |
| 持久性目录 | 如果指定,则缓存将保留在给定目录中;如果未指定,则高速缓存将仅在内存中 | ||
| SSL 上下文服务 | StandardRestrictedSSLContextService | 如果指定,此服务将用于创建 SSL 上下文,以用于保护通信;如果未指定,则通信将不安全 |
DistributedMapCacheClientService
描述
提供与 DistributedMapCacheServer 通信的功能。可以使用它来在 NiFi 群集中的节点之间共享地图
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示任何默认值。
| 名称 | 默认值 | 描述 |
|---|---|---|
| 服务器主机名 | 运行 DistributedMapCacheServer 服务的服务器的名称 | |
| 服务器端口 | 4557 | 与 DistributedMapCacheServer 服务通信时将使用的远程服务器上的端口 |
| SSL 上下文服务 | 如果指定,则表示用于与远程服务器通信的 SSL 上下文服务。如果未指定,通讯将不会被加密StandardSSLContextServiceStandardRestrictedSSLContextService | |
| 通讯超时 | 30 秒 | 指定在无法发送或接收数据时确定存在通信故障之前与远程服务器通信之前要等待多长时间 |
RouteOnAttribute
描述
该处理器使用属性表达式语言,根据流文件的属性去计算然后进行路由。该处理器往往用于判断逻辑。
属性配置
在下面的列表中,必需属性的名称以粗体显示。任何其他属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值),以及属性是否支持表达式语言。
| 属性名称 | 默认值 | 可选值 | 描述 |
|---|---|---|---|
| Routing Strategy | Route to Property name | Route to Property nameRoute to ‘matched’ if all matchRoute to ‘matched’ if any matches | 指定如何确定在计算表达式语言时使用哪个关系 |
动态属性
该处理器允许用户指定属性的名称和值。
| 属性名称 | 属性值 | 描述 |
|---|---|---|
| 用户自由定义的属性名称 (Relationship Name) | 用户自由定义的属性值 (Attribute Expression Language) | 将其属性与动态属性值中指定的属性表达式语言相匹配的流文件路由到动态属性键中指定的关系. 支持表达式语言:true |
连接关系
| 名称 | 描述 |
|---|---|
| unmatched | 不匹配任何用户定义表达式的流文件将被路由到这里 |
自定义连接关系
可以根据用户配置处理器的方式创建动态连接关系。
| Name | Description |
|---|---|
| 动态属性的属性名 | 匹配动态属性的属性表达式语言的流文件 |
PutHiveQL
描述
执行 HiveQL DDL / DML 命令(例如,UPDATE,INSERT)。预期传入 File 的内容是要执行的 HiveQL 命令。HiveQL 命令可以使用?转义参数。在这种情况下,要使用的参数必须作为 FlowFile 属性存在,命名约定为 hiveql.args.N.type 和 hiveql.args.N.value,其中 N 是一个正整数。hiveql.args.N.type 应该是指示 JDBC 类型的数字。FlowFile 的内容应采用 UTF-8 格式。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示任何默认值。
| 名称 | 默认值 | 描述 |
|---|---|---|
| Hive Database Connection Pooling Service | Hive Controller Service,用于获取与 Hive 数据库的连接 | |
| Batch Size | 100 | 在单个事务中放入数据库的首选 FlowFiles 数 |
| Character Set | UTF-8 | 指定记录数据的字符集。 |
| Statement Delimiter | ; | 语句分隔符,用于在多语句脚本中分隔 SQL 语句 |
| Rollback On Failure | false | 指定如何处理错误。默认情况下(false),如果在处理 FlowFile 时发生错误,则 FlowFile 将根据错误类型路由到“失败”或“重试”关系,处理器可以继续下一个 FlowFile。相反,您可能想回滚当前已处理的 FlowFile,并立即停止进一步的处理。在这种情况下,您可以通过启用此“回滚失败”属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,而不会受到惩罚,并会反复处理,直到成功处理或通过其他方式将其删除为止。重要的是要设置足够的“有效期限”,以免重试次数过多。 |
HiveConnectionPool
描述
为 Apache Hive 提供数据库连接池服务。可以从池中请求连接,使用后返回连接。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示任何默认值,属性是否支持 NiFi 表达式语言。
| 名称 | 默认值 | 描述 |
|---|---|---|
| Database Connection URL | 用于连接数据库的数据库连接 URL。可能包含数据库系统名称,主机,端口,数据库名称和一些参数。数据库连接 URL 的确切语法由 Hive 文档指定。例如,当连接到安全的 Hive 服务器时,通常将服务器主体作为连接参数包括在内。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Hive Configuration Resources | 包含 Hive 配置(例如,hive-site.xml)的文件或文件的逗号分隔列表。否则,Hadoop 将在类路径中搜索“ hive-site.xml”文件,或恢复为默认配置。请注意,例如要启用 Kerberos 身份验证,必须在配置文件中设置适当的属性。请参阅 Hive 文档以获取更多详细信息。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Database User | 数据库用户名 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Password | 数据库用户的密码 敏感属性:true 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Max Wait Time | 500 毫秒 | 池在失败之前将等待(如果没有可用连接时)返回连接的最大时间,或者无限期等待-1。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Max Total Connections | 8 | 可以同时从该池分配的活动连接的最大数量,或者为无限制的最大数量。 支持表达式语言:true(仅使用变量注册表进行评估) |
| Validation query | 验证查询,用于在返回连接之前对其进行验证。当借用的连接无效时,它将被丢弃并返回新的有效连接。注意:使用验证可能会降低性能。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Kerberos Credentials Service | 指定应用于 Kerberos 身份验证的 Kerberos 凭据控制器服务 | |
| Kerberos Principal | Kerberos 主体作为身份验证。需要在您的 nifi.properties 中设置 nifi.kerberos.krb5.file。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Kerberos Keytab | 与主体关联的 Kerberos 密钥表。需要在您的 nifi.properties 中设置 nifi.kerberos.krb5.file。 支持表达式语言:true(仅使用变量注册表进行评估) |
3.3 操作
3.3.1 开启 Mysql 的 binlog 日志
Mysql 的版本号要求 5.7。
3.3.1.1 登陆 MySQL 查看日志状态
# mysql -u root -p123456
mysql> show variables like '%log_bin%';
3.3.1.2 退出 MySQL 登陆
mysql> exit
3.3.1.3 Linux 开启 binlog
编辑配置文件
vi /etc/my.cnf
行尾加上
server_id = 1
log_bin = mysql-bin
binlog_format = row
server-id :表示单个结点的 id,单个节点可以随意写,多个节点不能重复, log-bin 指定 binlog 日志文件的名字为 mysql-bin,以及其存储路径。
重启服务
systemctl restart mysqld.service
或者
service mysqld restart
重新登陆查询开启状态

3.3.1.4 Windows 开启 binlog
修改配置文件
找到 mysql 配置文件my.ini所在目录,一般在C:\ProgramData\MySQL\MySQL Server 5.7。
注意目录不是 C:\ Program Files \MySQL\MySQL Server 5.7。
server_id = 1
log_bin = mysql-bin
binlog_format = row


重启 Mysql 服务

查询开启状态

3.3.1.5 开启 Mysql 远程访问权限
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;
FLUSH PRIVILEGES;
3.3.2 实时获取 Mysql 变更
3.3.2.1 创建处理器组
组名:MySqlToHive_Timely
3.3.2.2 创建 CaptureChangeMySQL
CaptureChangeMySQL 的配置中需要 DistributedMapCacheClientService、DistributedMapCacheServer 处理器,一并创建。
3.3.2.3 配置 DistributedMapCacheServer

3.3.2.4 配置 DistributedMapCacheClientService

3.3.2.5 启动 Cache 服务和客户端
在模拟的集群模式下,因为三台服务都在同一台主机,所以会存在端口冲突的问题,但是并不影响使用。因为只要有一个节点的缓存服务启动正常就可以使用。

3.3.2.6 配置 CaptureChangeMySQL

MySQL Hosts = 192.168.52.6:3306
MySQL Driver Class Name = com.mysql.jdbc.Driver
MySQL Driver Location(s) = /export/download/jars/mysql-connector-java-5.1.40.jar
Username = root
Password = 123456
Include Begin/Commit Events = true
Include DDL Events = true
3.3.2.7 启动 CaptureChangeMysql
启动后报错:

FlowFile 数据的属性信息:

3.3.3 根据条件路由
3.3.3.1 RouteOnAttribute 多线程消费
根据自己的服务器硬件配置,以及数据的更新速率,进行评估后填写。

3.3.3.2 NiFi 表达式
NiFi 表达式官网:https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
之前我们已经了解过 NiFi 表达式语言,这里我们仅针对 equals 函数进行说明。
NiFi 表达式的 equals 函数
equals
说明:equals函数使用非常广泛,它确定其主题是否等于另一个 String 值。请注意,该equals函数直接比较两个 String 值。注意不要将此函数与matchs函数混淆,后者会根据正则表达式评估其主题。
学科类型:任意
参数:
- value:用于比较 Subject 的值。必须与主题类型相同。
返回类型:布尔值
示例:我们可以使用表达式${filename:equals('hello.txt')}检查 FlowFile 的文件名是否为“ hello.txt” ,或者可以检查属性hello的值是否等于属性的值filename: ${hello:equals( ${filename} )}。
3.3.3.3 设置自定义属性

3.3.3.4 运行并查看输出
输出的数据内容:
{
"type": "insert",
"timestamp": 1582484253000,
"binlog_filename": "mysql-bin.000005",
"binlog_position": 375,
"database": "nifi_test",
"table_name": "nifi_hive_streaming",
"table_id": 108,
"columns": [
{
"id": 1,
"name": "id",
"column_type": 4,
"value": 7
},
{
"id": 2,
"name": "name",
"column_type": 12,
"value": "testName5"
},
{
"id": 3,
"name": "day_time",
"column_type": 91,
"value": "2020-02-24"
}
]
}
3.3.4 提取关键属性
EvaluateJsonPath 等处理器在提取数据时,可以使用 JsonPath 表达式,来灵活的获取信息。
3.3.4.1 JsonPath 表达式
简介
类似于 XPath 在 xml 文档中的定位,JsonPath 表达式通常是用来路径检索或设置 Json 的。
JsonPath 中的“根成员对象”始终称为$,无论是对象还是数组。
其表达式可以接受“dot–notation”和“bracket–notation”格式,例如
$.store.book[0].title、$[‘store’][‘book’][0][‘title’]
操作符
| 符号 | 描述 |
|---|---|
| $ | 查询的根节点对象,用于表示一个 json 数据,可以是数组或对象 |
| @ | 过滤器断言(filter predicate)处理的当前节点对象,类似于 java 中的 this 字段 |
| * | 通配符,可以表示一个名字或数字 |
| .. | 可以理解为递归搜索,Deep scan. Available anywhere a name is required. |
| . | 表示一个子节点 |
| [‘’ (, ‘’)] | 表示一个或多个子节点, |
| [ (, )] | 表示一个或多个数组下标 |
| [start:end] | 数组片段,区间为[start,end),不包含 end |
| [?()] | 过滤器表达式,表达式结果必须是 boolean |
函数
可以在 JsonPath 表达式执行后进行调用,其输入值为表达式的结果。
| 名称 | 描述 | 输出 |
|---|---|---|
| min() | 获取数值类型数组的最小值 | Double |
| max() | 获取数值类型数组的最大值 | Double |
| avg() | 获取数值类型数组的平均值 | Double |
| stddev() | 获取数值类型数组的标准差 | Double |
| length() | 获取数值类型数组的长度 | Integer |
过滤器
过滤器是用于过滤数组的逻辑表达式,一个通常的表达式形如:[?(@.age > 18)],可以通过逻辑表达式&&或||组合多个过滤器表达式,例如[?(@.price < 10 && @.category == ‘fiction’)],字符串必须用单引号或双引号包围,例如[?(@.color == ‘blue’)] or [?(@.color == “blue”)]。
| 操作符 | 描述 |
|---|---|
| == | 等于符号,但数字 1 不等于字符 1(note that 1 is not equal to ‘1’) |
| != | 不等于符号 |
| < | 小于符号 |
| <= | 小于等于符号 |
| > | 大于符号 |
| >= | 大于等于符号 |
| =~ | 判断是否符合正则表达式,例如[?(@.name =~ /foo.*?/i)] |
| in | 所属符号,例如[?(@.size in [‘S’, ‘M’])] |
| nin | 排除符号 |
| size | size of left (array or string) should match right |
| empty | 判空符号 |
示例
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
},
"expensive": 10
}
| JsonPath (点击链接测试) | 结果 |
|---|---|
| $.store.book[*].author 或 $..author | 获取 json 中 store 下 book 下的所有 author 值 |
| $.store.* 显示所有叶子节点值 | 所有的东西,书籍和自行车 |
| $.store..price | 获取 json 中 store 下所有 price 的值 |
| $..book[2] | 获取 json 中 book 数组的第 3 个值 |
| $..book[-2] | 倒数的第二本书 |
| $..book[0,1]或$..book[:2] | 前两本书 |
| $..book[1:2] | 从索引 1(包括)到索引 2(排除)的所有图书 |
| $..book[-2:] | 获取 json 中 book 数组的最后两个值 |
| $..book[2:] | 获取 json 中 book 数组的第 3 个到最后一个的区间值 |
| $..book[?(@.isbn)] | 获取 json 中 book 数组中包含 isbn 的所有值 |
| $.store.book[?(@.price < 10)] | 获取 json 中 book 数组中 price<10 的所有值 |
| $..book[?(@.price <= $[‘expensive’])] | 获取 json 中 book 数组中 price<=expensive 的所有值 |
| $..book[?(@.author =~ /.*REES/i)] | 获取 json 中 book 数组中的作者以 REES 结尾的所有值(REES 不区分大小写) |
| $..* | 逐层列出 json 中的所有值,层级由外到内 |
| $..book.length() | 获取 json 中 book 数组的长度 |
3.3.4.2 提取 Json 属性到 Attribute

3.3.4.3 运行并查看输出

3.3.5 ReplaceText 转换 Sql
配置 ReplaceText

Replacement Value = insert into myhive.nifi_hive (id,name,day_time) values (${id},'${name}','${day_time}')
启动查看结果

3.3.6 写入 Hive
3.3.6.1 创建 PutHiveQL
略
3.3.6.2 创建配置 HiveConnectionPool

Database Connection URL = jdbc:hive2://192.168.52.120:10000
Hive Configuration Resources = /export/download/config/core-site.xml,/export/download/config/hdfs-site.xml,/export/download/config/hive-site.xml
配置完成后,记得启用 HiveConnectionPool。
3.3.6.3 PutHiveQL 关联 HiveConnectionPool

3.3.7 验证 Hive 表中是否成功写入数据
略。
4. Kafka 的使用
Kafka 是一个由 Scala 和 java 编写的高吞吐量的分布式发布订阅消息。它拥有很高的吞吐量、稳定性和扩容能力,在 OLTP 和 OLAP 中都会经常使用。使用 NiFi 可以简单快速的建立起 kafka 的生产者和消费者,而不需要编写繁杂的代码。
4.1 处理器说明
4.1.1 PublishKafka_0_10
描述
使用 Kafka 0.10.x Producer API 将 FlowFile 的内容作为消息发送到 Apache Kafka。要发送的消息可以是单独的 FlowFiles,也可以使用用户指定的定界符(例如换行符)进行定界。用于获取消息的辅助 NiFi 处理器是 ConsumeKafka_0_10。
属性配置
| Name | Default Value | Description |
|---|---|---|
| Kafka Brokers | localhost:9092 | 逗号分隔的已知 Kafka Broker 列表,格式为<主机>:<端口> 支持表达式语言:true(仅使用变量注册表进行评估) |
| Security Protocol | 纯文本 | 与经纪人通信的协议。对应于 Kafka 的“ security.protocol”属性。 |
| Kerberos Service Name | 与代理 JAAS 文件中配置的 Kafka 服务器的主要名称匹配的服务名称。可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。对应于 Kafka 的’security.protocol’属性,除非选择的 SASL 选项之一,否则它将被忽略。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Kerberos Credentials Service | 指定应用于 Kerberos 身份验证的 Kerberos 凭据控制器服务 | |
| Kerberos Principal | 将用于连接到代理的 Kerberos 主体。如果未设置,则应在 bootstrap.conf 文件中定义的 JVM 属性中设置 JAAS 配置文件。该主体将被设置为“ sasl.jaas.config” Kafka 的属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Kerberos Keytab | 用于连接代理的 Kerberos 密钥表。如果未设置,则应在 bootstrap.conf 文件中定义的 JVM 属性中设置 JAAS 配置文件。该主体将被设置为“ sasl.jaas.config” Kafka 的属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| SSL Context Service | 指定用于与 Kafka 通信的 SSL 上下文服务。 | |
| Topic Name | 要发布到的 Kafka 主题的名称。 支持表达式语言:true(将使用流文件属性和变量注册表进行评估) | |
| Delivery Guarantee | 0 | 指定保证消息发送到 Kafka 的要求。对应于 Kafka 的“ acks”属性:Best EffortGuarantee Single Node DeliveryGuarantee Replicated Delivery |
| Kafka Key | 用于消息的密钥。如果未指定,则将流文件属性’kafka.key’用作消息密钥(如果存在)。请注意,同时设置 Kafka 密钥和标界可能会导致许多具有相同密钥的 Kafka 消息。这不是问题,因为 Kafka 不会强制执行或假定消息和密钥的唯一性。尽管如此,同时设置分界符和 Kafka 密钥仍存在 Kafka 数据丢失的风险。在 Kafka 上进行主题压缩时,将基于此密钥对消息进行重复数据删除。 支持表达式语言:true(将使用流文件属性和变量注册表进行评估) | |
| Key Attribute Encoding | utf-8 | 发出的 FlowFiles 具有名为“ kafka.key”的属性。此属性指示应如何编码属性的值。 |
| Message Demarcator | 指定用于在单个 FlowFile 中划分多个消息的字符串(解释为 UTF-8)。如果未指定,则 FlowFile 的全部内容将用作一条消息。如果指定,则 FlowFile 的内容将在此定界符上分割,并且每个部分作为单独的 Kafka 消息发送。要输入特殊字符(例如“换行”),请根据您的操作系统使用 CTRL + Enter 或 Shift + Enter。 支持表达式语言:true(将使用流文件属性和变量注册表进行评估) | |
| Max Request Size | 1 兆字节 | 请求的最大大小(以字节为单位)。对应于 Kafka 的’max.request.size’属性,默认值为 1 MB(1048576)。 |
| Acknowledgment Wait Time | 5 秒 | 在向 Kafka 发送消息后,这表明我们愿意等待 Kafka 做出回应的时间。如果 Kafka 在此时间段内未确认该消息,则 FlowFile 将被路由为“失败”。 |
| Max Metadata Wait Time | 5 秒 | 在整个“发送”调用失败之前,发布者将在“发送”调用期间等待获取元数据或等待缓冲区刷新的时间。对应于 Kafka 的’max.block.ms’属性 支持表达式语言:true(仅使用变量注册表进行评估) |
| Partitioner class | org..DefaultPartitioner | 指定用于计算消息的分区标识的类。对应于 Kafka 的’partitioner.class’属性。RoundRobinPartitioner、DefaultPartitioner |
| Compression Type | 没有 | 此参数允许您为此生产者生成的所有数据指定压缩编解码器。 |
4.1.2 ConsumeKafka_0_10
描述
消耗来自专门针对 Kafka 0.10.x Consumer API 构建的 Apache Kafka 的消息。用于发送消息的辅助 NiFi 处理器是 PublishKafka_0_10。
属性配置
在下面的列表中,必需属性的名称以粗体显示。其他任何属性(非粗体)均视为可选。该表还指示所有默认值,以及属性是否支持 NiFi 表达式语言。
| 名称 | 默认值 | 描述 |
|---|---|---|
| Kafka Brokers | localhost:9092 | 逗号分隔的已知 Kafka Broker 列表,格式为<主机>:<端口> 支持表达式语言:true(仅使用变量注册表进行评估) |
| Security Protocol | 纯文本 | 与经纪人通信的协议。对应于 Kafka 的“ security.protocol”属性。 |
| Kerberos Service Name | 与代理 JAAS 文件中配置的 Kafka 服务器的主要名称匹配的服务名称。可以在 Kafka 的 JAAS 配置或 Kafka 的配置中定义。对应于 Kafka 的’security.protocol’属性,除非选择的 SASL 选项之一,否则它将被忽略。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Kerberos Credentials Service | 指定应用于 Kerberos 身份验证的 Kerberos 凭据控制器服务 | |
| Kerberos Principal | 将用于连接到代理的 Kerberos 主体。如果未设置,则应在 bootstrap.conf 文件中定义的 JVM 属性中设置 JAAS 配置文件。该主体将被设置为“ sasl.jaas.config” Kafka 的属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Kerberos Keytab | 用于连接代理的 Kerberos 密钥表。如果未设置,则应在 bootstrap.conf 文件中定义的 JVM 属性中设置 JAAS 配置文件。该主体将被设置为“ sasl.jaas.config” Kafka 的属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| SSL Context Service | 指定用于与 Kafka 通信的 SSL 上下文服务。 | |
| Topic Name(s) | 要从中提取的 Kafka 主题的名称。如果逗号分隔,则可以提供多个。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Topic Name Format | names | 指定提供的主题是逗号分隔的名称列表还是单个正则表达式。names、pattern |
| Group ID | 组 ID 用于标识同一使用者组内的使用者。对应于 Kafka 的’group.id’属性。 支持表达式语言:true(仅使用变量注册表进行评估) | |
| Offset Reset | latest | 当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除),使您可以管理条件。对应于 Kafka 的’auto.offset.reset’属性。earliest、latest、none |
| Key Attribute Encoding | utf-8 | 发出的 FlowFiles 具有名为“ kafka.key”的属性。此属性指示应如何编码属性的值。 |
| Message Demarcator | 由于 KafkaConsumer 批量接收消息,因此您可以选择输出 FlowFiles,其中包含给定主题和分区的单个批次中的所有 Kafka 消息,并且该属性允许您提供一个字符串(解释为 UTF-8)以用于分界多封 Kafka 讯息。这是一个可选属性,如果未提供,则收到的每条 Kafka 消息都会在触发该消息时产生一个 FlowFile。要输入特殊字符(例如“换行”),请使用 CTRL + Enter 或 Shift + Enter,具体取决于操作系统 支持的表达语言:true(仅使用变量注册表进行评估) | |
| Max Poll Records | 10000 | 指定 Kafka 在一次轮询中应返回的最大记录数。 |
| Max Uncommitted Time | 1 secs | 指定在必须提交偏移量之前允许通过的最长时间。该值影响补偿的提交频率。较少地提交偏移量会增加吞吐量,但是如果在提交之间重新平衡或 JVM 重新启动,则可能会增加潜在数据重复的窗口。此值还与最大轮询记录和消息定界符的使用有关。使用消息分界器时,未提交的消息会比未分配的消息多得多,因为跟踪内存的情况要少得多。 |
4.2 Producer 生产
4.2.1 创建处理器
创建处理器组 kafka,进入组后分别创建 GenerateFlowFile 和 PublishKafka_0_10 处理器。
4.2.2 负载均衡生产消息
4.2.2.1 连接 GenerateFlowFile 和 PublishKafka_0_10

4.2.2.2 负载均衡并发

4.2.3 配置 GenerateFlowFile
4.2.3.1 调度配置:
每 1 秒生产一次数据

4.2.3.2 属性配置
文件大小 100b;每次生成 10 个相同文件;每次生成的流文件内容唯一。

4.2.4 配置 PublishKafka_0_10
4.2.4.1 属性配置
Brokers 设置为 192.168.52.100:9092,192.168.52.110:9092,192.168.52.120:9092
topic 设置为 nifi-topic,如果 topic 不存在,会自动创建;
Delivery Guarantee,对应 kafka 的 acks 机制,选择最为保险的 Guarantee Replicated Delivery,相当于 acks=all。

4.2.4.2 关系配置

4.2.5 启动流程并监听数据
4.2.5.1 启动流程
略
4.2.5.2 监听 kafka 消费数据
在 kafka 所在服务器执行监听命令:
/export/servers/kafka_2.11-0.10.2.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.52.110:9092 --topic nifi-topic
4.3 Consumer 消费
4.3.1 创建处理器并连接
创建 ConsumeKafka_0_10 和 LogAttribute 处理器,并连接。

4.3.2 配置 ConsumeKafka_0_10
Brokers 地址要和 Producer 的设置一样:192.168.52.100:9092,192.168.52.110:9092,192.168.52.120:9092
Topic 设置和 Producer 一致:nifi-topic
GroupId 随意设置:nifi
Offset Reset 设置为:latest,从最新的消息开始消费

4.3.3 设置 LogAttribute
设置为自连接

4.3.4 启动流程并查看日志
略、
4.3.5 增加生产频率
注意:如果服务器资源有限,不要进行此操作。
GenerateFlowFile 的调度频率加快:20ms

