Frank Wisniewski - @ultraknackig
Lars Pfannenschmidt - @leastangle
n-1
server failures for a topic with replication factor n
public class News {
public final UUID id;
public final String author, title, body;
...
}
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, NewsSerializer.class.getName());
KafkaProducer<String, News> producer = new KafkaProducer<>(config);
public RecordMetadata send(News news) throws ExecutionException, InterruptedException {
ProducerRecord<String, News> record = new ProducerRecord<>(topic, news.id.toString(), news);
Future<RecordMetadata> recordMetadataFuture = this.producer.send(record);
return recordMetadataFuture.get();
}
ProducerRecord
ProducerRecord(String topic, V value);
key
hash (murmur2)
ProducerRecord(String topic, K key, V value);
ProducerRecord(String topic, Integer partition, K key, V value);
public void run() {
Thread.currentThread().setName(name);
ConsumerIterator<String, News> it = messageStream.iterator();
while (it.hasNext()) {
relayMessage(it.next());
}
}
void relayMessage(MessageAndMetadata<String, News> kafkaMessage) {
logger.trace("Received message with key '{}' and offset '{}' "
+ "on partition '{}' for topic '{}'",
kafkaMessage.key(), kafkaMessage.offset(),
kafkaMessage.partition(), kafkaMessage.topic());
messageConsumer.consume(kafkaMessage.message());
}
public interface NewsConsumer<News> {
void consume(News message);
}
import static kafka.consumer.Consumer.createJavaConsumerConnector;
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper); // list of ZooKeeper nodes
props.put("group.id", groupId); // identifies consumer group
props.put("offsets.storage", "kafka"); // storage for offsets
props.put("dual.commit.enabled", "false"); // migration switch
...
ConsumerConnector consumerConnector = createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, List<KafkaStream<String, News>>> consumerMap;
consumerMap = consumerConnector.createMessageStreams(
ImmutableMap.of(topic, numberOfThreads), // number of streams per topic
new StringDecoder(null), new NewsDecoder()); // message key and value decoders
List<KafkaStream<String, News>> streams = consumerMap.get(topic);
// create fixed size thread pool to launch all the threads
ExecutorService pool = Executors.newFixedThreadPool(numberOfThreads);
// create consumer threads to handle the messages
for (final KafkaStream stream : streams) {
String name = String.format("%s[%s]", topic, threadNumber++);
pool.submit(new ConsumerThread(stream, name, consumer));
}
Full Code Example: https://github.com/kafka101/java-news-feed
"[Designed] to make consumption as cheap as possible"
sendfile()
* Consumer without lag get messages from pagecache!
More Details: http://kafka.apache.org/documentation.html#persistence
datanerds.io
github.com/kafka101
Frank Wisniewski - @ultraknackig
Lars Pfannenschmidt - @leastangle