java - How do I set the size of messages in Kafka? -


i'm using kafka 0.9.0.1. according sources i've found, way set sizes of messages modify following key values in server.properties.

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

my server.properties file has these settings.

message.max.bytes=10485760 replica.fetch.max.bytes=20971520 fetch.message.max.bytes=10485760 

other settings may relevant below.

socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 

however, when attempt send messages payloads of 4 6 mb in size, consumer never gets messages. producer seems send messages without exceptions being thrown. if send smaller payloads (like < 1 mb) consumer receive messages.

any idea on i'm doing wrong in terms of configuration settings?

here example code send message.

producer<string, byte[]> producer = new kafkaproducer<>(getproducerprops()); file dir = new file("/path/to/dir"); for(string s : dir.list()) {   file f = new file(dir, s);   byte[] data = files.readallbytes(f.topath());   payload payload = new payload(data); //a simple pojo store payload   string key = string.valueof(system.currenttimemillis());   byte[] val = kryoutil.tobytes(payload); //custom util use kryo bytes[]   producer.send(new producerrecord<>("test", key, val)); } producer.close(); 

here example code receive message.

kafkaconsumer consumer = new kafkaconsumer<>(getconsumerprops()); consumer.subscribe(arrays.aslist("test")); while(true) {   consumerrecord<string, byte[]> records = consumer.poll(100);   for(consumerrecord<string, byte[]> record : records) {     long offset = record.offset();     string key = record.key();     byte[] val = record.value();     payload payload = (payload)kryoutil.toobject(val, payload.class); //custom util use kryo deserialize object     system.out.println(       system.format("offset=%d, key=%s", offset, key));   } } 

here methods populate properties files producer , consumer.

public static properties getproducerprops() {   properties props = new properties();   props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");   props.put("acks", "all");   props.put("retries", 0);   props.put("batch.size", 16384);   props.put("linger.ms", 1);   props.put("buffer.memory", 33554432);   props.put("compression.type", "snappy");   props.put("max.request.size", 10485760); //need   props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");   props.put("value.serializer", "org.apache.kafka.common.serialization.bytearrayserializer");   return props; }  public static properties getconsumerprops() {   properties props = new properties();   props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");   props.put("group.id", "test");   props.put("enable.auto.commit", "true");   props.put("auto.commit.interval.ms", "1000");   props.put("session.timeout.ms", "30000");   props.put("max.partition.fetch.bytes", 10485760); //need   props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");   props.put("value.deserializer", "org.apache.kafka.common.serialization.bytearraydeserializer");   return props; } 

jane, don't use fetch.message.max.bytes first of because that's property consumer , doesn't go in server.properties file , second because old version of consumer, instead use max.partition.fetch.bytes when create consumer part of properties use instantiate it.


Comments

Popular posts from this blog

c# - How Configure Devart dotConnect for SQLite Code First? -

java - Copying object fields -

c++ - Clear the memory after returning a vector in a function -