1. Kafka概述
1.1. 消息队列
1)点对点模式(⼀对⼀,消费者主动拉取数据,消息收到后消息清除)
点对点模型通常是⼀个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,⽽不是将消息推送到客户端。这个模型的特点是发送到队列的消息被⼀个且只有⼀个接收者接收处理,即使有多个消息监听者也是如此。(2)发布/订阅模式(⼀对多,数据⽣产后,推送给所有订阅者)
发布订阅模型则是⼀个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,⽽持久订阅者则监听主题的所有消息,即使当前订阅者不可⽤,处于离线状态。
1.1. 为什么需要消息队列
1)解耦:
允许你独⽴的扩展或修改两边的处理过程,只要确保它们遵守同样的接⼝约束。2)冗余:
消息队列把数据进⾏持久化直到它们已经被完全处理,通过这⼀⽅式规避了数据丢失风险。许多消息队列所采⽤的\"插⼊-获取-删除\"范式中,在把⼀个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从⽽确保你的数据被安全的保存直到你使⽤完毕。3)扩展性:
因为消息队列解耦了你的处理过程,所以增⼤消息⼊队和处理的频率是很容易的,只要另外增加处理过程即可。4)灵活性 & 峰值处理能⼒:
在访问量剧增的情况下,应⽤仍然需要继续发挥作⽤,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投⼊资源随时待命⽆疑是巨⼤的浪费。使⽤消息队列能够使关键组件顶住突发的访问压⼒,⽽不会因为突发的超负荷的请求⽽完全崩溃。5)可恢复性:
系统的⼀部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使⼀个处理消息的进程挂掉,加⼊队列中的消息仍然可以在系统恢复后被处理。6)顺序保证:
在⼤多使⽤场景下,数据处理的顺序都很重要。⼤部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证⼀个Partition内的消息的有序性)7)缓冲:
有助于控制和优化数据流经过系统的速度,解决⽣产消息和消费消息的处理速度不⼀致的情况。8)异步通信:
很多时候,⽤户不想也不需要⽴即处理消息。消息队列提供了异步处理机制,允许⽤户把⼀个消息放⼊队列,但并不⽴即处理它。想向队列中放⼊多少消息就放多少,然后在需要的时候再去处理它们。
1.2. 什么是Kafka
在流式计算中,Kafka⼀般⽤来缓存数据,Storm通过消费Kafka的数据进⾏计算。
1)Apache Kafka是⼀个开源消息系统,由Scala写成。是由Apache软件基⾦会开发的⼀个开源消息系统项⽬。
2)Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10⽉从Apache Incubator毕业。该项⽬的⽬标是为处理实时数据提供⼀个统⼀、⾼通量、低等待的平台。
3)Kafka是⼀个分布式消息队列。Kafka对消息保存时根据Topic进⾏归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
4)⽆论是kafka集群,还是consumer都依赖于zookeeper集群保存⼀些meta信息,来保证系统可⽤性。
1)Producer :消息⽣产者,就是向kafka broker发消息的客户端;2)Consumer :消息消费者,向kafka broker取消息的客户端;3)Topic :可以理解为⼀个队列;
4) Consumer Group (CG):这是kafka⽤来实现⼀个topic消息的⼴播(发给所有的consumer)和单播(发给任意⼀个consumer)的⼿段。⼀个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的⼀个consumer。如果需要实现⼴播,只要每个consumer有⼀个独⽴的CG就可以了。要实现单播只要所有的consumer在同⼀个CG。⽤CG还可以将consumer进⾏⾃由的分组⽽不需要多次发送消息到不同的topic;
5)Broker :⼀台kafka服务器就是⼀个broker。⼀个集群由多个broker组成。⼀个broker可以容纳多个topic;
6)Partition:为了实现扩展性,⼀个⾮常⼤的topic可以分布到多个broker(即服务器)上,⼀个topic可以分为多个partition,每个partition是⼀个有序的队列。partition中的每条消息都会被分配⼀个有序的id(offset)。kafka只保证按⼀个partition中的顺序将消息发给consumer,不保证⼀个topic的整体(多个partition间)的顺序;
7)Offset:kafka的存储⽂件都是按照offset.kafka来命名,⽤offset做名字的好处是⽅便查找。例如你想找位于2049的位置,只要找到2048.kafka的⽂件即可。当然the first offset就是00000000000.kafka。
1. Kafka单节点运⾏⽅式
Setp 1:下载代码
下载 kafka_2.12-2.1.0 版本并且解压。> tar -xzf kafka_2.11-1.0.0.tgz> cd kafka_2.11-1.0.0
Setp 2:启动服务
Kafka 使⽤ ZooKeeper 如果你还没有ZooKeeper服务器,你需要先启动⼀个ZooKeeper服务器。 您可以通过与kafka打包在⼀起的便捷脚本来快速简单地创建⼀个单节点ZooKeeper实例。如果你有使⽤docker的经验,你可以使⽤docker-compose快速搭建⼀个zk集群。> bin/zookeeper-server-start.sh config/zookeeper.properties现在启动Kafka服务器:
> bin/kafka-server-start.sh config/server.properties 后台启动:
> bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
其中1>/dev/null 2>&1 是将命令产⽣的输⼊和错误都输⼊到空设备,也就是不输出的意思。/dev/null代表空设备。
Setp 3:创建⼀个topic
创建⼀个名为“test”的topic,它有⼀个分区和⼀个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test运⾏list(列表)命令来查看这个topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 test
除了⼿⼯创建topic外,你也可以配置你的broker,当发布⼀个不存在的topic时⾃动创建topic。
Setp 4:发送消息
Kafka⾃带⼀个命令⾏客户端,它从⽂件或标准输⼊中获取输⼊,并将其作为message(消息)发送到Kafka集群。默认情况下,每⾏将作为单独的message发送。
运⾏ producer,然后在控制台输⼊⼀些消息以发送到服务器。> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testhello world
Hello study.163.com
Setp 5:启动消费者
Kafka还有⼀个命令⾏使⽤者,它会将消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginninghello world
hello study.163.com
如果在不同的终端中运⾏上述命令,能够在⽣产者终端中键⼊消息并看到它们出现在消费者终端中。所有命令⾏⼯具都有选项; 运⾏不带参数的命令将显⽰使⽤信息。
=========================================================================================
Kafka集群部署⽅式
Setp 6:设置多 broker 集群
到⽬前,我们只是单⼀的运⾏⼀个broker,对于Kafka,⼀个broker仅仅只是⼀个集群的⼤⼩,接下来我们来设多个broker。⾸先为每个broker创建⼀个配置⽂件:
> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties现在编辑这些新建的⽂件,设置以下属性:config/server-1.properties: broker.id=1 listeners=
log.dir=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 listeners=
log.dir=/tmp/kafka-logs-2
broker.id属性是集群中每个节点的名称,这⼀名称是唯⼀且永久的。
我们已经建⽴Zookeeper和⼀个单节点了,现在我们只需要启动两个新的节点:> bin/kafka-server-start.sh config/server-1.properties &...
> bin/kafka-server-start.sh config/server-2.properties &...
现在创建⼀个副本为3的新topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic运⾏命令“describe topics” 查看集群中的topic信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
以下是对输出信息的解释:第⼀⾏给出了所有分区的摘要,下⾯的每⾏都给出了⼀个分区的信息。因为我们只有⼀个分区,所以只有⼀⾏。l “leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。l “replicas”是复制分区⽇志的节点列表,不管这些节点是leader还是仅仅活着。l “isr”是⼀组“同步”replicas,是replicas列表的⼦集,它活着并被指到leader。请注意,在⽰例中,节点1是该主题中唯⼀分区的领导者。
我们运⾏这个命令,看看⼀开始我们创建的那个test节点:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这并不奇怪,刚才创建的主题没有Replicas,并且在服务器“0”上,我们创建它的时候,集群中只有⼀个服务器,所以是“0”。发布⼀些信息在新的topic上:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic...
my test message 1my test message 2消费这些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic...
my test message 1my test message 2
测试集群的容错,kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...> kill -9 7564
备份节点之⼀成为新的leader,⽽broker1已经不在同步备份集合⾥了。
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0即使最初接受写⼊的领导者已经失败,这些消息仍可供消费:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic...
my test message 1my test message 2
Setp 7:使⽤ Kafka Connect 导⼊/导出数据
Kafka Connect是Kafka的⼀个⼯具,它可以将数据导⼊和导出到Kafka。它是⼀种可扩展⼯具,通过运⾏connectors(连接器), 使⽤⾃定义逻辑来实现与外部系统的交互。接下来我们将学习如何使⽤简单的connectors来运⾏Kafka Connect,这些connectors 将⽂件中的数据导⼊到Kafka topic中,并从中导出数据到⼀个⽂件。⾸先,我们将创建⼀些种⼦数据来进⾏测试:> echo -e \"allen\" > test.txt> echo -e \"tony\" >> test.txt
接下来,我们将启动两个standalone(独⽴)运⾏的连接器,第⼀个是源连接器,它从输⼊⽂件读取⾏并⽣成Kafka主题,第⼆个是宿连接器从Kafka主题读取消息并将每个消息⽣成为输出⽂件中的⼀⾏。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties⼀旦Kafka Connect进程启动,源连接器应该开始从test.txt主题读取⾏并⽣成它们connect-test,并且接收器连接器应该开始从主题读取消息connect-test 并将它们写⼊⽂件test.sink.txt。我们可以通过检查输出⽂件的内容来验证数据是否已通过整个管道传递:
> more test.sink.txtallentony
数据存储在Kafka主题中connect-test,因此我们还可以运⾏控制台使⽤者来查看主题中的数据:> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{\"schema\":{\"type\":\"string\{\"schema\":{\"type\":\"string\...
连接器⼀直在处理数据,所以我们可以将数据添加到⽂件中,并看到它在pipeline 中移动:> echo mike >> test.txt
因篇幅问题不能全部显示,请点此查看更多更全内容