kafka streaming demo

数据流通道

2018-09-24

226

0

技术:kafka streaming + java8

运行环境:IDEA 15.2 + jdk8 + windows 10

demo功能:提供一个kafka streaming 入门demo

1. 添加maven依赖

 
  org.apache.kafka  
  kafka-streams  
  0.10.2.1 

2. 添加java代码

public class Startup {
    private static final String KAFKA_BROKER_IP_AND_PORT = "xxxxxx:9092";//kafka broker 地址

    public static void main(String[] args) {
        KafkaStreams streams = buildKafkaStreams();
        streams.start();
    }

    public static KafkaStreams buildKafkaStreams() {
        Properties props = buildProperties();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> source = builder.stream("wc-input");
        KTable<String, Long> counts = source
                .flatMapValues(new ValueMapper<String, Iterable>() {
                    @Override
                    public Iterable apply(String value) {
                        System.out.println("收到值: " + value);
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                    }
                }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
                    @Override
                    public KeyValue<String, String> apply(String key, String value) {
                        System.out.println("收到值key: " + key + ", value: " + value);
                        return new KeyValue<>(value, value);
                    }
                })
                .groupByKey()
                .count("Counts");
        counts.print();
        KafkaStreams streams = new KafkaStreams(builder, props);
        return streams;
    }

    public static Properties buildProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER_IP_AND_PORT);
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
}

3. broker端脚本发送消息

创建topic: ./kafka-topics.sh --zookeeper 127.0.0.1:2181/kafka-biz --replication-factor 1 --partitions 5 --topic wc-input --create

发送消息: ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic wc-input

然后手动可以输入消息

4. 运行代码

欢迎添加微信,互相学习↑↑↑ -_-

发表评论

全部评论:0条

白老虎

programming is not only to solve problems, ways to think