java 操作 rocketmq 最基本 demo

数据流通道

2018-08-02

443

0

技术:rocketmq + java8

运行环境:IDEA 15.2 + jdk8 + windows 10

demo功能:提供一个java 操作rocketmq最基本demo

pom


    org.apache.rocketmq
    rocketmq-client
    4.3.0

生产者

public class RocketmqProducerInfo {
    public final DefaultMQProducer producer = new DefaultMQProducer("produce_group1");

    public RocketmqProducerInfo() throws MQClientException {
        producer.setNamesrvAddr("10.1.5.52:9876");
        producer.setInstanceName("producer_1");
        producer.start();
    }

    public SendResult send(String content, String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message msg = new Message();
        msg.setTopic(topic);
        msg.setBody(content.getBytes());
        return producer.send(msg);
    }

    public void sendAsync(String content, String topic) throws RemotingException, MQClientException, InterruptedException {
        Message msg = new Message();
        msg.setBody(content.getBytes());
        msg.setTopic(topic);
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("msg success, id=" + sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("msg fail, ");
                throwable.printStackTrace();
            }
        });
    }
}

消费者

public class RocketmqConsumerInfo {
    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");

    public RocketmqConsumerInfo() throws MQClientException {
        consumer.setNamesrvAddr(Application.producerInfo.producer.getNamesrvAddr());
        consumer.setInstanceName("consumer_group_1");
        consumer.subscribe(Application.topic, "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                if (msgs == null || msgs.isEmpty()) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                for (MessageExt item : msgs) {
                    System.out.println("收到消息: " + new String(item.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

    }

    public void startUp() throws MQClientException {
        consumer.start();
    }
}

启动测试

public class Application {
    public static RocketmqProducerInfo producerInfo;
    public static RocketmqConsumerInfo consumerInfo;
    public static String topic = "test_topic1";

    static {
        try {
            producerInfo = new RocketmqProducerInfo();
            consumerInfo = new RocketmqConsumerInfo();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        consumerInfo.startUp();
        System.out.println(producerInfo.send("hi", topic));
    }
}

 

欢迎添加微信,互相学习↑↑↑ -_-

发表评论

全部评论:0条

白老虎

programming is not only to solve problems, ways to think