技术:kafka streaming + java8
运行环境:IDEA 15.2 + jdk8 + windows 10
demo功能:提供一个kafka streaming 入门demo
org.apache.kafka
kafka-streams
0.10.2.1
public class Startup {
private static final String KAFKA_BROKER_IP_AND_PORT = "xxxxxx:9092";//kafka broker 地址
public static void main(String[] args) {
KafkaStreams streams = buildKafkaStreams();
streams.start();
}
public static KafkaStreams buildKafkaStreams() {
Properties props = buildProperties();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("wc-input");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable>() {
@Override
public Iterable apply(String value) {
System.out.println("收到值: " + value);
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
System.out.println("收到值key: " + key + ", value: " + value);
return new KeyValue<>(value, value);
}
})
.groupByKey()
.count("Counts");
counts.print();
KafkaStreams streams = new KafkaStreams(builder, props);
return streams;
}
public static Properties buildProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP_AND_PORT);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
创建topic: ./kafka-topics.sh --zookeeper 127.0.0.1:2181/kafka-biz --replication-factor 1 --partitions 5 --topic wc-input --create
发送消息: ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic wc-input
然后手动可以输入消息
欢迎添加微信,互相学习↑↑↑ -_-
白老虎
programming is not only to solve problems, ways to think
grafana 级连 菜单 templating (variables) 配置
rocketmq 集群搭建 (2master + 2slave + 2namesrv)
AI 机器人 抓取 微信 聊天中的 百度网盘 分享地址和密码