Commit 0dafb6dd authored by hezhen's avatar hezhen

123

parent 38296d30
......@@ -20,12 +20,16 @@ public class RabbitAdminConfig extends RabbitCommonConfig {
public static final String ORDER_WATER_QUEUE = "order.water.queue";
public static final String ORDER_FINLISH_USER_RE_QUEUE = "order.cancel.userRe.queue";
//钱包50入账
public static final String WALLET_ADD_QUEUE = "wallet.add.queue";
static {
myQueue = new ArrayList<BindDTO>(){{
add(new BindDTO(ORDER_WATER_QUEUE, ADMIN_TOPIC, KEY_ORDER_PAY));
add(new BindDTO(ORDER_WATER_QUEUE, ADMIN_TOPIC, KEY_ORDER_FINLISH));
add(new BindDTO(ORDER_WATER_QUEUE, ADMIN_TOPIC, KEY_ORDER_CANCEL));
add(new BindDTO(ORDER_FINLISH_USER_RE_QUEUE, ADMIN_TOPIC, KEY_ORDER_FINLISH));
//钱包
add(new BindDTO(WALLET_ADD_QUEUE, ADMIN_TOPIC,KEY_WALLET_ADD ));
}};
}
}
......
package com.github.wxiaoqi.security.admin.handler;
import cn.hutool.json.JSONUtil;
import com.github.wxiaoqi.security.admin.biz.AppUserSellingWaterBiz;
import com.rabbitmq.client.Channel;
import com.xxfc.platform.order.pojo.mq.OrderMQDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.github.wxiaoqi.security.admin.config.RabbitAdminConfig.*;
@Component
@Slf4j
public class WalletMQHandler {
@Autowired
AppUserSellingWaterBiz waterBiz;
/**
* 佣金
* @param
*/
@RabbitListener(queues = WALLET_ADD_QUEUE)
public void integralHandler(Message message, @Headers Map<String, Object> headers, Channel channel) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
OrderMQDTO orderMQDTO = JSONUtil.toBean(msg, OrderMQDTO.class);
executorService.shutdown();
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.info("接收到的消息失败");
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException i) {
i.printStackTrace();
}
e.printStackTrace();
}
}
});
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment