马老师Elasticsearch核心知识篇
KafkaProducer<String, String> producer = new KafkaProducer<>(buildConfigProps());
branches[1].map((k, v) -> new KeyValue<>(k, new ProducerRecord<>(SOURCE_TOPIC, k, v.toJSONString())))
.foreach((k, v) -> producer.send(v));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
kafkaStreams.setUncaughtExceptionHandler(
new RestartUncaughtExceptionHandler(streamsBuilder, props));
kafkaStreams.start();