失效链接处理 |
Kafka KSQL实战 PDF 下蝲
本站整理下蝲Q?/strong>
链接Q?a target="_blank">https://pan.baidu.com/s/1jh6wv3C16keHYGPNfFzWvg
提取码:(x)pf0m
相关截图Q?/strong>
![]()
主要内容Q?/strong>
1.背景
kafka早期作ؓ(f)一个日志消息系l,很受q维Ƣ迎的,配合ELK玩v来很happyQ在kafka慢慢的{向流式^台的q程中,开发也慢慢介入?jin),一些业务系l也开始和kafkaҎ(gu)h?jin),也还是很受大家欢q的Q由于业务需要,一部分白也就免不?jin)接触kafka?jin),q些白L?x)安奈不住好奇?j)Q要_的查看kafka中的某一条数据,作ؓ(f)服务提供方,我也很方啊,该怎么|业务方不敢得|啊Q只能写consumerL费,然后查询?/div>
2.需?/div>
有什么方法能直接查询kafka中已有的数据呢?那时候presto映入眼帘了(jin)Q初步探索后发现presto实强大Q和我们在用的impala有的一|支持的数据源也更多,什么redis、mongo、kafka都可以用sql来查询,真是救星啊,q样那群白可以直接用presto来查询里面的数据?jin)。不qpresto在不开发插件的情况下,对kafka的数据有格式要求Q支持json、avro。关于presto的调研见presto实战。但是我只是想用sql查询kafkaQ而presto功能q于强大Q必然整个框架就昑־比较厚重?jin),功能多嘛。有什么轻量的工具呢Q?/div>
3.介绍
某一天,kafka的亲儿子KSQLp生了(jin)QKSQL是一个用于Apache kafka的流式SQL引擎QKSQL降低?jin)进入流处理的门槛,提供了(jin)一个简单的、完全交互式的SQL接口Q用于处理Kafka的数据,可以让我们在数据上持箋执行 SQL 查询QKSQL支持q泛的强大的处理操作,包括聚合、连接、窗口、会(x)话等{?/div>
KSQL在内部用Kafka的Streams APIQƈ且它们共享与Kafka处理相同的核心(j)抽象QKSQL有两个核?j)抽象,它们对应于到Kafka Streams中的两个核心(j)抽象Q让你可以处理kafka的topic数据。关于这两个核心(j)抽象下章节解诅R?/div>
4.架构
4.1部v架构
׃个KSQL服务器进E执行查询。一lKSQLq程可以作ؓ(f)集群q行。可以通过启动更多的KSQL实例来动态添加更多的处理能力。这些KSQL实例是容错的Q如果一个实例失败了(jin)Q其他的׃(x)接管它的工作。查询是使用交互式的KSQL命o(h)行客L(fng)启动的,该客L(fng)通过REST API向集发送命令。命令行允许(g)查可用的stream和tableQ发出新的查询,(g)查状态ƈl止正在q行的查询。KSQL内部是用Kafka的stream API构徏的,它承了(jin)它的Ҏ(gu)可伸羃性、先q的状态管理和定w功能Qƈ支持Kafka最q引入的一ơ性处理语义。KSQL服务器将此嵌入到一个分布式SQL引擎?包括一些用于查询性能的自动字节代码生?和一个用于查询和控制的REST API?/div>
4.2处理架构
5.抽象概念
KSQL化了(jin)应用程序,它集成了(jin)stream和table的概念,允许使用表示现在发生的事件的stream来连接表C当前状态的table?Apache Kafka中的一个topic可以表示为KSQL中的STREAM或TABLEQ具体取决于topic处理的预期语义。下面看看两个核?j)的解读?/div>
streamQ流是无限制的结构化数据序列Qstream中的fact是不可变的,q意味着可以新fact插入到stream中,但是现有fact永远不会(x)被更新或删除?stream可以从Kafka topic创徏Q或者从现有的stream和table中派生?/div>
tableQ一个table是一个stream或另一个table的视图,它代表了(jin)一个不断变化的fact的集合,它相当于传统的数据库表,但通过化{流语义来丰富。表中的事实是可变的Q这意味着可以新的事实插入到表中Q现有的事实可以被更新或删除。可以从Kafka主题中创Q也可以从现有的和表中z表?/div>
6.部v
ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默认ƈ没有加入ksql serverE序Q当然V3和V4是支持ksql的,在V5版本中已l默认加入ksql?jin),Z(jin)方便演示Q我们用confluent kafka V5版本演示Qzk和kafka也是单实例启动?/div>
6.1下蝲
wget https://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz
tar zxvf confluent-oss-5.0.0-2.11.tar.gz -C /opt/programs/confluent_5.0.0
6.2启动zk
cd /opt/programs/confluent_5.0.0
bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
6.3启动kafka
cd /opt/programs/confluent_5.0.0
bin/kafka-server-start -daemon etc/kafka/server.properties
6.4创徏topic和data
confluent自带?jin)一个ksql-datagen工具Q可以创建和产生相关的topic和数据,ksql-datagen可以指定的参数如下:(x)
[bootstrap-server=<kafka bootstrap server(s)> (defaults to localhost:9092)]
[quickstart=<quickstart preset> (case-insensitive; one of 'orders', 'users', or 'pageviews')]
schema=<avro schema file>
[schemaRegistryUrl=<url for Confluent Schema Registry> (defaults to http://localhost:8081)]
format=<message format> (case-insensitive; one of 'avro', 'json', or 'delimited')
topic=<kafka topic name>
key=<name of key column>
[iterations=<number of rows> (defaults to 1,000,000)]
[maxInterval=<Max time in ms between rows> (defaults to 500)]
[propertiesFile=<file specifying Kafka client properties>]
创徏pageviewsQ数据格式ؓ(f)delimited
cd /opt/programs/confluent_5.0.0/bin
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
psQ以上命令会(x)源源不断在stdin上输出数据,是工具自己产生的数据,如下样例
8001 --> ([ 1539063767860 | 'User_6' | 'Page_77' ]) ts:1539063767860
8011 --> ([ 1539063767981 | 'User_9' | 'Page_75' ]) ts:1539063767981
8021 --> ([ 1539063768086 | 'User_5' | 'Page_16' ]) ts:1539063768086
不过使用consumer消费出来的数据是如下样式
1539066430530,User_5,Page_29
1539066430915,User_6,Page_74
1539066431192,User_4,Page_28
1539066431621,User_6,Page_38
1539066431772,User_7,Page_29
1539066432122,User_8,Page_34
创徏usersQ数据格式ؓ(f)json
cd /opt/programs/confluent_5.0.0/bin
./ksql-datagen quickstart=users format=json topic=users maxInterval=100
psQ以上命令会(x)源源不断在stdin上输出数据,是工具自己产生的数据,如下样例
User_5 --> ([ 1517896551436 | 'User_5' | 'Region_5' | 'MALE' ]) ts:1539063787413
User_7 --> ([ 1513998830510 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1539063787430
User_6 --> ([ 1514865642822 | 'User_6' | 'Region_2' | 'MALE' ]) ts:1539063787481
不过使用consumer消费出来的数据是如下样式
{"registertime":1507118206666,"userid":"User_6","regionid":"Region_7","gender":"OTHER"}
{"registertime":1506192314325,"userid":"User_1","regionid":"Region_1","gender":"MALE"}
{"registertime":1489277749526,"userid":"User_6","regionid":"Region_4","gender":"FEMALE"}
{"registertime":1497188917765,"userid":"User_9","regionid":"Region_3","gender":"OTHER"}
{"registertime":1493121964253,"userid":"User_4","regionid":"Region_3","gender":"MALE"}
{"registertime":1515609444511,"userid":"User_5","regionid":"Region_9","gender":"FEMALE"}
|