`
can_do
  • 浏览: 245785 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

用于处理日志的分布式消息系统Kafka(吞吐量高于ActiveMQ和RabbitMQ)

阅读更多
1、几个重要概念
1.1 topic
1.2 broker
1.3 consumer pull message,不是broker push消息给消费者,这样做的另一个特点是,消费者可以绕过当前偏移位的消息,去消费其他偏移位上的消息。和传统的方式不一样。
1.4 consumer可以订阅1个或者多个topic
broker不记录消息的ID(或者说没有消息ID),通过offset和消息的大小来定位消息。
简单的存储机制,1GB的log file。
1.5 partition =>分区,一个topic划分为多个partition,一个分区对应一个逻辑日志,物理日志是由一组分段文件构成,每个分段文件大小为1GB。
2、flush消息的时机:
为提高性能,flush的次数尽量少,选择一个生产者发布完配置的消息数时,而不是每个消息发布完后都flush一次。
或者过多少时间后,flush一次,只有flush后,消息才会被消费者消费。
3、消息的状态是记录在消费端而不是在broker上,因为broker是无状态的(比如:每个消费者消费了多少消息不是记录在broker上,而是记录在consumer上)。
4、生产者和消费者顺序的访问分段文件。
5、本身在进程上不缓存消息,而是依靠os的page cache功能,避免有两份消息缓存起来。
6、受os的连续写(write-through)和预先读(read-ahead)的启发。
7、一般来说,消息的生产速度要快于消息的消费速度,或者说,消费者落后于生产者。
8、优化对网络的访问(比如发送文件),采用Zero-Copy技术,在linux或者其他unix系统上有sendfile API(windows是否支持需要确认?)
///////////////begin//////////
A typical approach to sending bytes from a local file to a remote
socket involves the following steps: (1) read data from the storage
media to the page cache in an OS, (2) copy data in the page cache
to an application buffer, (3) copy application buffer to another
kernel buffer, (4) send the kernel buffer to the socket. This
includes 4 data copying and 2 system calls.

====>走网络的前提下,不走网络的话,相当于直接的文件copy操作,还是挺快的,可达100MB/s以上。
【storage media】--os-->【page cache】 --application-->【buffer】--application-->【kernel buffe】---os--->【socket】
////////////////end//////////////////
9、broker删除消息,是通过基于时间的策略来出来的,比如7天,因为broker不知道消息是否被消费。实际上,消费者都是按日、小时或者实时的消费消息的。
    另外,长期保留数据不会降低Kafka的性能。
10、consumer groups =>消费者组
消费者组中的消费者可以共同消费一组主题上的消息,并且同一个消息仅会被消费者组中的某一个消费者消费。消费者组和消费者组之间是相互独立的。
消费者组中的消费者可以在不同的进程或者主机上。
11、没有中心的概念,采用去中心化,
12、Zookeeper负责消费者和broker的协作
zookeeper上的以下注册时临时的:
A、broker注册
B、消费者注册
C、归属注册
但是,偏移注册是持久化的。
13、topic上有partition的概念
14、delivery guarantees =>分发保证,保证每个消息对消费者组只分发一次。如果因为消费者crash导致的分发重复问题,需要消费端处理时进行重复判断。
消费端的重复判断对性能有消耗,可以采用two-phase提交,但该方式对大多数应用又不是必需的。
分发时,能够保证同一个分区中的消息,是按顺序分发给消费者,但不同的分区间的消息的顺序是不保证的。
15、缺点:如果broker宕掉,任何存储在broker上没有消费的消息,变得不可用。
16、LinkedIn =>审计系统验证消息是否丢失。
   A、Avro用作序列化协议
吞吐量高的原因
生产者性能=>
17、不需要消息的应答:==>生产者发送消息到broker上时,不需要对broker进行应答,
         这样就不能保证消息完全被发送到broker上或者说不能完全保证每一个被发布的消息被broker接收到。
         注意:对很多类型的日志数据系统,其追求用吞吐量来替代持久化,只要丢失的消息数量相对小就可以。
18、可以批量生产消息:=>生产消息时可以进行批量产生,但其他的MQ好像没有此功能。比如:每批50个消息的发送频率,几乎可以占满1Gbps的带宽。
19、更高效的消息存储格式:==>平均需要9 bytes,但AMQ平均需要144Bytes(消息头比较大)。AMQ花费时间在B-tree上对索引的查找,以及维护消息的状态。
注意:消息大的原因有两个:(1)、消息头比较大;(2)、维护各种索引结构;(3)、批量发送改进吞吐量(通过分摊RPC的开销)
消费者性能=>
20、更高效的消息存储格式:==>从broker到消费者传输的字节数更少。
21、不需要在broker上维护消息的状态(消息的发送状态):===>AMQ或者RMQ在broker或者server维护每个消息的分发状态,
         需要频繁的IO(磁盘写)操作而Kafka broker上没有磁盘写活动。
22、Kafka采用了ZeroCopy:==>使用Linux下的sendfile API,减少了传输开销(4步变2步)。|| 此技术点可以考虑用在优化文件池中。
//////////////begin///////////////
We close the section by noting that the purpose of the experiment
is not to show that other messaging systems are inferior to Kafka.
After all, both ActiveMQ and RabbitMQ have more features than
Kafka. The main point is to illustrate the potential performance
gain that can be achieved by a specialized system.
//////////////end/////////////////
21、Kafka下一步要做的事情是:在多个broker间添加内置的消息复制功能,来应对当不可恢复的机器失败时,保证消息的持久性和可用性。
22、最后:一个应用需要根据自己的需求,选择合适的冗余级别,即在持久化、可靠性和吞吐量之间进行适当裁剪或者寻求一种平衡。

【总结】
其实更多场景或者很多时候,需要考虑的是消费者的性能,因为消费者收到消息后,需要做业务逻辑处理,待一次业务处理完毕才进行消息的应答。
这个业务逻辑处理过程的复杂度也会影响到消费消息的性能。

【Kafka不具备什么】==>
1、不能进行broker组网
2、没有Queue的概念
3、不会缓存消息==>消息永远都是进磁盘的

【性能测试】
环境:I had six machines each has the following specs
    Intel Xeon 2.5 GHz processor with six cores
    Six 7200 RPM SATA drives
    32GB of RAM
    1Gb Ethernet

吞吐量指标:records/s 或者 MB/s,每秒消息数或者每秒字节数。
>>>Producer Throughput
  消息大小:100byte ==> on small 100 byte messages
  >>>>Single producer thread, no replication ==>单个producer线程,不复制消息 
  ///////////begin////////
  821,557 records/sec
  (78.3 MB/sec)
  ///////////end//////////
  >>>>Single producer thread, 3x asynchronous replication==>单个producer线程,3个Server节点异步复制
  ///////////begin/////////
  786,980 records/sec
  (75.1 MB/sec)
  ///////////end////////////
  >>>>Single producer thread, 3x synchronous replication==>单个producer线程,3个Server节点同步复制
  //////////begin////////
  421,823 records/sec
  (40.2 MB/sec)
  //////////end//////////
  >>>>Three producers, 3x async replication==>三个producer线程,3个Server节点异步复制
  ///////////begin/////
  2,024,032 records/sec
  (193.0 MB/sec)
  ///////////end///////

>>>Consumer Throughput
  消息大小:100byte ==> on small 100 byte messages
  >>>>Single Consumer
  //////////begin/////////
  940,521 records/sec
  (89.7 MB/sec)
  //////////end///////////
  >>>>Three Consumers
  //////////begin/////////
  2,615,968 records/sec
(249.5 MB/sec)
  //////////end///////////
  >>>>Producer and Consumer
  //////////begin/////////
  795,064 records/sec
  (75.8 MB/sec)
  //////////end///////////
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics