技术:rocketmq + java8
运行环境:IDEA 15.2 + jdk8 + windows 10
demo功能:提供一个java 操作rocketmq最基本demo
org.apache.rocketmq
rocketmq-client
4.3.0
public class RocketmqProducerInfo {
public final DefaultMQProducer producer = new DefaultMQProducer("produce_group1");
public RocketmqProducerInfo() throws MQClientException {
producer.setNamesrvAddr("10.1.5.52:9876");
producer.setInstanceName("producer_1");
producer.start();
}
public SendResult send(String content, String topic) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message msg = new Message();
msg.setTopic(topic);
msg.setBody(content.getBytes());
return producer.send(msg);
}
public void sendAsync(String content, String topic) throws RemotingException, MQClientException, InterruptedException {
Message msg = new Message();
msg.setBody(content.getBytes());
msg.setTopic(topic);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("msg success, id=" + sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
System.out.println("msg fail, ");
throwable.printStackTrace();
}
});
}
}
public class RocketmqConsumerInfo {
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
public RocketmqConsumerInfo() throws MQClientException {
consumer.setNamesrvAddr(Application.producerInfo.producer.getNamesrvAddr());
consumer.setInstanceName("consumer_group_1");
consumer.subscribe(Application.topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
if (msgs == null || msgs.isEmpty()) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt item : msgs) {
System.out.println("收到消息: " + new String(item.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
public void startUp() throws MQClientException {
consumer.start();
}
}
public class Application {
public static RocketmqProducerInfo producerInfo;
public static RocketmqConsumerInfo consumerInfo;
public static String topic = "test_topic1";
static {
try {
producerInfo = new RocketmqProducerInfo();
consumerInfo = new RocketmqConsumerInfo();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
consumerInfo.startUp();
System.out.println(producerInfo.send("hi", topic));
}
}
欢迎添加微信,互相学习↑↑↑ -_-
白老虎
programming is not only to solve problems, ways to think
grafana 级连 菜单 templating (variables) 配置
AI 机器人 抓取 微信 聊天中的 百度网盘 分享地址和密码
rocketmq 集群搭建 (2master + 2slave + 2namesrv)