本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序

腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云
基于前面两篇关于KSQL的介绍和实现原理,本文将继续进入到KSQL的开发实战阶段,给大家讲解一个简单的demo程序。
本实例将演示一个使用KSQL的简单工作流程来针对Kafka中的数据编写流式查询。
由于KSQL查询Kafka群集中的数据,因此您需要启动Kafka群集,其中包括ZooKeeper和Kafka代理。
第一步:启动Kafka集群并启动KSQL。
第二步:成功启动Kafka集群并启动KSQL后,您将看到KSQL提示符:
====================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ \| | = = | ' /| (___ | | | | | = = | < \___ \| | | | | = = | . \ ____) | |__| | |____ = = |_|\_\_____/ \___\_\______| = = = = Streaming SQL Engine for Kafka = Copyright 2017 Confluent Inc. CLI v0.1, Server v0.1 located at http://localhost:9098 Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
第三步:KSQL提供了结构化查询语言来查询Kafka数据,因此您需要一些数据进行查询。
- 如果您使用Docker Compose文件,Docker容器已经在运行,该数据生成器正在向Kafka集群不断生成Kafka消息。不需要采取进一步行动。
- 如果您不使用Docker环境,请按照非Docker环境中说明的方法向Kafka群集生成数据。
创建Stream 和 Table
本示例显示了从Kafka主题中查询数据的示例,pageviews并users使用以下模式:
在继续之前,请确认:
- 在您启动KSQL的终端窗口中,您会看到ksql>提示。
- 如果你不想使用Docker,你必须手动运行数据发生器来产生主题叫pageviews 和users。参考非Docker环境中向Kafka群集生成数据的方法。
从Kafka的pageviews主题(topic)创建一个pageviews_original流(Stream)。指定value_format的DELIMITED。然后DESCRIBE是新的STREAM。请注意,KSQL创建了额外的列ROWTIME,它们对应于Kafka消息时间戳,并且ROWKEY对应于Kafka消息密钥。
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED'); ksql> DESCRIBE pageviews_original; Field | Type ---------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) VIEWTIME | BIGINT USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING)
创建一个表,users_original从卡夫卡的话题users,指定value_format的JSON。然后DESCRIBE新的TABLE。
ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON'); ksql> DESCRIBE users_original; Field | Type -------------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) REGISTERTIME | BIGINT GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) USERID | VARCHAR(STRING)
显示所有STREAMS和TABLES。
ksql> SHOW STREAMS; Stream Name | Kafka Topic | Format ----------------------------------------------------------------- PAGEVIEWS_ORIGINAL | pageviews | DELIMITED ksql> SHOW TABLES; Table Name | Kafka Topic | Format | Windowed -------------------------------------------------------------- USERS_ORIGINAL | users | JSON | false
使用查询
默认情况下,KSQL从最新的偏移量读取流和表的主题。
使用SELECT创建从流返回数据的查询。要停止查看数据,请按<ctrl-c>。您可以选择包含LIMIT关键字来限制查询结果中返回的行数。请注意,由于数据生成的随机性,精确的数据输出可能会有所不同。
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED'); ksql> DESCRIBE pageviews_original; Field | Type ---------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) VIEWTIME | BIGINT USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING)
通过CREATE STREAM在SELECT语句之前使用关键字来创建持久性查询。与上述非持久查询不同,此查询的结果将写入Kafka主题PAGEVIEWS_FEMALE。下面的查询将pageviews通过LEFT JOIN使用users_original用户ID 的TABLE来满足STREAM,满足条件。
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; ksql> DESCRIBE pageviews_female; Field | Type ---------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) USERID | VARCHAR(STRING) PAGEID | VARCHAR(STRING) REGIONID | VARCHAR(STRING) GENDER | VARCHAR(STRING)
使用SELECT查看查询结果,因为他们进来。停止观看查询结果,按<ctrl-c>。这将停止打印到控制台,但不会终止实际的查询。该查询继续运行在Kernel应用程序中。
ksql> SELECT * FROM pageviews_female; 1502477856762 | User_2 | User_2 | Page_55 | Region_9 | FEMALE 1502477857946 | User_5 | User_5 | Page_14 | Region_2 | FEMALE 1502477858436 | User_3 | User_3 | Page_60 | Region_3 | FEMALE ^CQuery terminated ksql>
创建一个新的持久性查询,其中使用另一个条件LIKE。此查询的结果将写入名为“Kafka”的主题pageviews_enriched_r8_r9。
ksql> CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
创建一个新的持久性查询,当计数大于1时,会在30秒的滚动窗口中计算每个区域和性别组合的网页浏览量。此查询的结果将写入名为的Kafka主题PAGEVIEWS_REGIONS。
ksql> CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1; ksql> DESCRIBE pageviews_regions; Field | Type ---------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) GENDER | VARCHAR(STRING) REGIONID | VARCHAR(STRING) NUMUSERS | BIGINT
使用SELECT查看从上面的查询结果。
ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5; Region_3 | 4 Region_3 | 5 Region_6 | 5 Region_6 | 6 Region_3 | 8 LIMIT reached for the partition. Query terminated ksql>
显示所有持久查询。
ksql> SHOW QUERIES; Query ID | Kafka Topic | Query String ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE'; 2 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'; 3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;
退出终端
查询将作为KSQL应用程序持续运行,直到它们被手动终止。退出KSQL不会终止持久查询。
从输出中SHOW QUERIES;标识您要终止的查询ID。例如,如果你想终止查询ID 2:
ksql> TERMINATE 2;
要从KSQL退出,键入'exit'。
ksql> exit
关闭Docker Compose
如果你运行Docker Compose,你必须明确地关闭Docker Compose。
下面的命令将删除所有KSQL查询和主题数据。
$ docker-compose down
更多详细命令,请参阅docker-compose down文档。
停止Confluent平台
使用下面的命令,即可停止confluent平台。
$ confluent stop
参考资料
最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!
本文原文出处:业余草: » Kafka KSQL入门