java - How do I set the size of messages in Kafka? -
i'm using kafka 0.9.0.1. according sources i've found, way set sizes of messages modify following key values in server.properties
.
- message.max.bytes
- replica.fetch.max.bytes
- fetch.message.max.bytes
my server.properties
file has these settings.
message.max.bytes=10485760 replica.fetch.max.bytes=20971520 fetch.message.max.bytes=10485760
other settings may relevant below.
socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600
however, when attempt send messages payloads of 4 6 mb in size, consumer never gets messages. producer seems send messages without exceptions being thrown. if send smaller payloads (like < 1 mb) consumer receive messages.
any idea on i'm doing wrong in terms of configuration settings?
here example code send message.
producer<string, byte[]> producer = new kafkaproducer<>(getproducerprops()); file dir = new file("/path/to/dir"); for(string s : dir.list()) { file f = new file(dir, s); byte[] data = files.readallbytes(f.topath()); payload payload = new payload(data); //a simple pojo store payload string key = string.valueof(system.currenttimemillis()); byte[] val = kryoutil.tobytes(payload); //custom util use kryo bytes[] producer.send(new producerrecord<>("test", key, val)); } producer.close();
here example code receive message.
kafkaconsumer consumer = new kafkaconsumer<>(getconsumerprops()); consumer.subscribe(arrays.aslist("test")); while(true) { consumerrecord<string, byte[]> records = consumer.poll(100); for(consumerrecord<string, byte[]> record : records) { long offset = record.offset(); string key = record.key(); byte[] val = record.value(); payload payload = (payload)kryoutil.toobject(val, payload.class); //custom util use kryo deserialize object system.out.println( system.format("offset=%d, key=%s", offset, key)); } }
here methods populate properties files producer , consumer.
public static properties getproducerprops() { properties props = new properties(); props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("compression.type", "snappy"); props.put("max.request.size", 10485760); //need props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.bytearrayserializer"); return props; } public static properties getconsumerprops() { properties props = new properties(); props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.partition.fetch.bytes", 10485760); //need props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.bytearraydeserializer"); return props; }
jane, don't use fetch.message.max.bytes
first of because that's property consumer , doesn't go in server.properties file , second because old version of consumer, instead use max.partition.fetch.bytes
when create consumer part of properties use instantiate it.
Comments
Post a Comment