《kafka权威指南》读书笔记1

Posted by BY KiloMeter on March 13, 2019

第一章 初识kafka

kafka的消息通过主题进行分类,主题就好比于数据库中的表,主题可以分为若干个分区,每次写入消息时,消息会被均衡地分布到所有的分区上,因此这样会导致在读取消息时无法保证所有消息的顺序,但是在单个分区中的消息是顺序的。

在写入数据的时候,也可以选择把消息写到指定的分区上,这个可以根据消息键和分区器实现

消费者读取数据时,按照分区的数据写入顺序进行读取,通过检查消息的偏移量判断是否读取过该数据,偏移量是一个递增的元数据,会在创建消息的时候由kafka添加到消息上,一个分区中,每个消息的偏移量都是唯一的,消费者会把每次读写后的消息偏移量保存在kafka或者zookeeper上,所以消费者宕机时,读取状态不会丢失。

在kafka集群中,一台独立的kafka服务器称为一个broker,broker接受来自生产者的消息,为写入的消息添加偏移量,并提交到磁盘保存,同时也为消费者提供服务,返回已经提交到磁盘上的消息。每一个集群中有一个broker会通过选举的方式选出来当集群控制器,负责管理工作,包括将分区分配给broker和监控broker。

一个分区可以分配给多个broker,此时会发生分区复制,该机制提供了消息冗余,如果某一个broker宕机了,其他broker可以接管工作。

kafka还提供了消息保留,默认消息要么保留一段时间(比如7天),要么保留到一定大小,如果达到上限,会把旧消息给删除。

第二章 安装kafka

kafka环境搭建

第三章 Kafka生产者—向kafka写入数据

kafka新版生产者的API使用

引入dependency

<dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>0.11.0.0</version>
 </dependency>

注意:这里的API要和虚拟机上安装的kafka版本相适应,不然的话就算程序能够成功运行,虚拟机上也可能无法消费到数据。

创建消费者

private Producer<String, String> createProducer() {
        // 通过Properties类设置Producer的属性
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "niubike1:9092,niubike2:9092,niubike3:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(properties);
    }

这里只设置了三个必选的属性

bootstrap.servers,指定broker的地址

key.serializer和value.serializer指定了把键和值转换成字节数组所使用的序列化类。

发送消息

producer.send(new ProducerRecord<String, String>(this.topic, "times", Integer.toString(i)));

kafka生产者的send方法需要传递ProducerRecord对象,构造方法需要主题名,发送内容的键和值。

send方法在发送失败的情况下会重新发送,直到超出重发次数,就抛出异常。

上面的send方法会等待kafka服务器的响应,会阻塞在这里,会影响kafka的吞吐量,下面提供了异步的方法来发送请求

private class DemoProducerCallback implements Callback{
    @Override
    public void onCompletion(RecordMetadata recordMetadata,Exception e){
        if(e!=null){
            e.printStackTrace();
        }
    }
}
ProducerRecord<String,String> record = new ProducerRecord<String, String>(this.topic, "times", Integer.toString(i));
producer.send(record,new DemoProducerCallback());

send方法传入一个回调对象,如果发送失败,将会抛出异常。

如果需要传递自己的对象,那么需要对该对象实现独有的序列化器,通过实现Serializer<class>接口,实现serialize方法来实现,但是不建议这么做,因为在真正的生产环境中,随着版本的迭代,需要处理好不同版本的序列化器和序列化器,兼容性不好处理。

kafka推荐使用avro实现。

分区

从上面可以看到,kafka传递的数据包括了键和值,一般值就是我们想要传递的消息,键主要决定了消息写到哪个分区,拥有相同key的消息会被分发到相同的分区,如果不想使用键,可以使用null代替,这样的话数据将会通过轮询算法均衡地分发到各个分区。

如果想用键来让消息分发到同一个分区,那么分区的数量最好在创建时就规划好,而且永远不要增加新分区,这是因为,加入用户a产生的消息一直是发往1分区的,增加分区后,可能a产生的消息就发送到别的分区去了。

可以实现自定义分区器,只需要实现partitioner接口就行。

第四章 Kafka消费者—向kafka读取数据

消费者从属于消费者组,一个消费者组消费一个主题的消息,消费者组的消费者接受主题一部分分区的消息。

在分区数量大于消费者组的消费者数量时,一个消费者会消费多个分区的消息,但是如果消费者(同一个消费者组)的数量大于分区数量时,将会有一部分消费者不会接受任何消息,因此,不要让消费者的数量超过分区的数量。不同的消费者组如果订阅了同一个主题,那么这些消费者组最终消费的消息是一样的,同一个消费者组中,由于不同消费者消费不同的分区,因此不会重复消费。

消费者群组和分区再均衡

在消费的过程中,如果管理者对分区的数量进行了调整,那么有些分区的消费者将会发生改变,分区的所有权将会从一个消费者转移到另一个消费者,这样的行为称为再均衡。由于再均衡的存在,消费者群组可以进行横向扩展和收缩,但是再均衡的过程中,所有的消费者将会停止消费,而且重新分配后,会导致当前消费者的读取状态消失,产生重复消费。

在一个消费者群组中,消费者和分区之间的从属关系,由被群组指派为群组协调器的broker来协调。群组中的消费者定期向群组协调器发送心跳,如果心跳持续跳跃,则说明还在消费分区的数据,如果停止发送心跳的时间过长,群组协调器会认为该消费者已经死亡,就会触发再均衡。

kafka消费者代码

Properties properties = new Properties();
        properties.put("bootstrap.servers", "niubike1:9092,niubike2:9092,niubike3:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("topic1"));
        try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(100);
                for(ConsumerRecord record:records){
                    Log.debug("topic ="+record.topic()+"partition = "+record.partition()
                    +"offset = "+ record.offset()+"customer = "+record.key()+"country = "+record.value());
                }
            }
        }finally {
            consumer.close();
        }

poll方法让消费者持续对kafka进行轮询请求数据,poll的参数指的是等待broker的时间,如果设置为0,那么poll会立刻返回,否则会在指定时间(ms)内等待broker返回数据。

如果没有poll方法,该消费者会被认定为死亡,该分区将会分配给其他消费者。

每次调用poll方法后,会返回由生产者写入kafka但还没被消费者消费的消息,返回的位置该如何确定呢?这里是消费者在每次消费之后,会提交偏移量。提交偏移量的过程是这样的,消费者会往一个叫做_conusmer_offset_的特殊主题发送消息,这个主题里面保存着每个分区的偏移量。如果发生了再均衡,其他的消费者接管了另外的分区,就可以通过这个主题里面存放的分区偏移量继续消费。因此,提交偏移量这个操作很重要,如果提交的偏移量大于实际处理的偏移量,将会有部分消息无法处理到,如果提交的偏移量小于实际处理的偏移量,那么这中间的数据将会产生重复消费。

kafkaConsumer提供了多种方式来提交偏移量。

自动提交

默认的提交方式是自动提交,这也是最简单的提交方式。但是这种方式存在很大的问题,假设自动提交的时间为5s,在最近一次提交后3s时发生了再均衡,那么新的消费者接管这个分区时,将会重复消费这3s内的消息。虽然可以通过降低自动提交的时间来降低重复消费的可能性,但是还是无法完全避免。

同步提交

try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(100);
                for(ConsumerRecord record:records){
                    Log.debug("topic ="+record.topic()+"partition = "+record.partition()
                    +"offset = "+ record.offset()+"customer = "+record.key()+"country = "+record.value());
                }
                try{
                    consumer.commitSync();
                }catch (CommitFailedException e){
                    e.printStackTrace();
                }
            }
        }finally {
            consumer.close();
        }

将auto.commit.offset设定为false,可以在处理完poll获取的数据后,使用consumer.commitSync()方法进行提交,该方法会提交poll方法返回的最新的偏移量。如果提交失败,该方法会一直尝试提交直到成功,如果遇到不可恢复的错误,将会抛出异常。

注:由于改成手动提交,因此在处理完数据后必须使用该方法进行提交,否则发生再均衡的时候,会发生重复消费。

commitSync方法在调用时,程序会进入阻塞状态,影响系统的吞吐量。虽然可以降低提交频率来提升吞吐量,但是如果发生再均衡会增加重复消息的数量。

异步提交

try {
            while (true){
                ConsumerRecords<String,String> records = consumer.poll(100);
                for(ConsumerRecord record:records){
                    Log.debug("topic ="+record.topic()+"partition = "+record.partition()
                    +"offset = "+ record.offset()+"customer = "+record.key()+"country = "+record.value());
                }
                consumer.commitAsync();
            }
        }finally {
            consumer.close();
        }

异步提交和手动提交有个最大的区别就是,异步提交如果发生错误,不会进行重试。之所以不会进行重试是因为,在得到服务器响应的时候,可能会有一个更大的偏移量被提交了,如果重试将会导致重复消费。

同步和异步提交组合

try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord record : records) {
                    Log.debug("topic =" + record.topic() + "partition = " + record.partition()
                            + "offset = " + record.offset() + "customer = " + record.key() + "country = " + record.value());
                }
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

正常情况下,使用异步提交,即使偶尔一次提交失败了,如果是因为临时的问题导致的,后续的提交总会有成功的,如果发生在关闭消费者或者再均衡前的最后一次提交,就需要使用同步提交确保提交成功。

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        int count = 0;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                Log.debug("topic =" + record.topic() + "partition = " + record.partition()
                        + "offset = " + record.offset() + "customer = " + record.key() + "country = " + record.value());
                currentOffsets.put(new TopicPartition(record.topic(),record.partition()),
                        new OffsetAndMetadata(record.offset()+1,"no metadata"));
                if(count%1000==0)
                    consumer.commitAsync(currentOffsets,null);
                count++;
            }
            consumer.commitAsync();
        }

在上面的例子中,每处理1000条数据就进行一次提交,在实际生产中可以修改成每间隔一段时间就提交。

再均衡监听器

在消费者退出和进行再均衡前,如果想要进行一些清理工作,比如提交最后一次偏移量,关闭数据库连接等,可以在创建consumer的时候传入一个实现了ConsumerRebalanceListener接口的类,实现onPartitionAssigned方法和onPartitionsRevoked方法。前者用于重新分区后消费者开始读取消息前调用,后者用于再均衡开始之前和消费者停止读取消息之后调用。

从特定偏移量处开始处理记录

试想在一个这样的场景,消费者获取kafka上的数据后,将结果保存至数据库中,如果也将偏移量也保存至数据库,那么,在发生再均衡时,直接去数据库读取偏移量,只需要保证偏移量和数据的保存是同步的,就能够保证不会重复消费和丢失消费。