博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka producer partitions分区器(七)
阅读量:4557 次
发布时间:2019-06-08

本文共 4196 字,大约阅读时间需要 13 分钟。

  消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用默认的方式进行数据分区。

  有没有指定partition可以从源码中看出:

public ProducerRecord(String topic, Integer partition, K key, V value) {}  如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用默认的分区策略来进行数据分区

1.根据key进行分区

public class CustomPartitioner {        private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class);        public static void main(String[] args) {        //1.加载配置信息        Properties prop = loadProperties();                //2.创建生产者        KafkaProducer
producer = new KafkaProducer<>(prop); String sendContent = "hello_kafka"; IntStream.range(0, 10).forEach(i ->{ try { ProducerRecord
record = new ProducerRecord<>("test1",i,sendContent+"_"+i);  //topic key value Future
future = producer.send(record); RecordMetadata recordMetadata = future.get(); LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition()); } catch (Exception e) { e.printStackTrace(); } }); } //配置文件的设置 public static Properties loadProperties() { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092"); prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("acks", "all"); //发送到所有的ISR队列中 return prop; }}

 2.自定义分区

  同样在使用自定义分区的时候,需要写实现类和在producer中配置引用

  我们在这个示例中,根据key来分区,key在序列化的时候用的是IntegerSerializer,在ProducerRecord中我们没有指定partition

  自定义分区器

public class CustomPartition implements Partitioner{    @Override    public void configure(Map
configs) { // TODO Auto-generated method stub } @SuppressWarnings({ "null", "unused" }) @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int partitionNum = cluster.partitionsForTopic(topic).size(); int partition = (Integer)key%partitionNum; return key == null? 0:partition; } @Override public void close() { // TODO Auto-generated method stub }}

  生产者

public class ProducerDemo {        private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);            public static void main(String[] args) throws InterruptedException, ExecutionException {        //1.加载配置信息        Properties prop = loadProperties();                //2.创建生产者        KafkaProducer
producer = new KafkaProducer<>(prop); //3.发送内容 String sendContent = "hello_kafka"; IntStream.range(0, 10).forEach(i ->{ try { ProducerRecord
record = new ProducerRecord<>("test1",i,sendContent+"_"+i); Future
future = producer.send(record); RecordMetadata recordMetadata = future.get(); LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition()); } catch (Exception e) { e.printStackTrace(); } }); producer.close(); //回调拦截器中的close方法 } //配置文件的设置 public static Properties loadProperties() { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092"); prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("partitioner.class", "com.zpb.partitioner.CustomPartition"); prop.put("acks", "all"); return prop; }}

 

 

 

 

 

转载于:https://www.cnblogs.com/MrRightZhao/p/11345846.html

你可能感兴趣的文章
旋转图像
查看>>
字符串中的数字(字符串、循环)
查看>>
15.select into
查看>>
缓存-->Java中缓存的原理
查看>>
Activity 和Service绑定
查看>>
URAL 1348 求垂足
查看>>
flume-agent实例
查看>>
【VS开发】CListCtrl控件使用方法总结
查看>>
【神经网络与深度学习】公开的海量数据集
查看>>
03 docker容器镜像基础
查看>>
bzoj 3620 暴力KMP
查看>>
Excel word “由于本机的限制_该操作已被取消_请与管理员联系”的已生效解决办法 (转 )...
查看>>
解压cpio.gz、zip类型文件
查看>>
静态属性和静态方法
查看>>
高效的MySQL分页
查看>>
MooTools 1.2 Beginner's Guide
查看>>
计算储存、交互和语言
查看>>
bzoj2067: [Poi2004]SZN
查看>>
所谓独立环境
查看>>
当代GSM手机的硬件系统分析[zz]
查看>>