Commit 87b2a9a1 authored by 周健威's avatar 周健威

修复合并

parent 45d0ad66
......@@ -5,6 +5,7 @@ import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFacto
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -23,37 +24,71 @@ public class RabbitDelayConfig {
public static final String ORDER_CANCEL_EXC = "order_cancel_delay_exchange";
public static final String ORDER_CANCEL_QUE = "order_cancel_delay_queue";
public static final String ORDER_CANCEL_KEY = "order_cancel_delay_key";
public static final String APPLY_CANCEL_EXC = "apply_cancel_delay_exchange";
public static final String APPLY_CANCEL_QUE = "apply_cancel_delay_queue";
public static final String APPLY_CANCEL_KEY = "apply_cancel_delay_key";
/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
* @return
*/
@Bean
public CustomExchange delayExchange(){
@Bean(ORDER_CANCEL_EXC)
public CustomExchange orderDelayExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(ORDER_CANCEL_EXC,"x-delayed-message",true, false, args);
}
/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
* @return
*/
@Bean(APPLY_CANCEL_EXC)
public CustomExchange applyDelayExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(APPLY_CANCEL_EXC,"x-delayed-message",true, false, args);
}
/**
* 延时队列
* @return
*/
@Bean
public Queue delayQueue(){
@Bean(ORDER_CANCEL_QUE)
public Queue orderDelayQueue(){
return new Queue(ORDER_CANCEL_QUE,true);
}
/**
* 延时队列
* @return
*/
@Bean(APPLY_CANCEL_QUE)
public Queue applyDelayQueue(){
return new Queue(APPLY_CANCEL_QUE,true);
}
/**
* 给延时队列绑定交换机
* @return
*/
@Bean
public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange){
@Bean("cfgDelayBinding")
public Binding cfgDelayBinding(@Qualifier(ORDER_CANCEL_QUE) Queue cfgDelayQueue, @Qualifier(ORDER_CANCEL_EXC) CustomExchange cfgUserDelayExchange ){
return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with(ORDER_CANCEL_KEY).noargs();
}
/**
* 给延时队列绑定交换机
* @return
*/
@Bean("acqDelayBinding")
public Binding acqDelayBinding(@Qualifier(APPLY_CANCEL_QUE) Queue acqDelayQueue, @Qualifier(APPLY_CANCEL_EXC) CustomExchange acqUserDelayExchange){
return BindingBuilder.bind(acqDelayQueue).to(acqUserDelayExchange).with(APPLY_CANCEL_KEY).noargs();
}
// @Bean
// public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
......
......@@ -3,8 +3,10 @@ package com.xxfc.platform.order.mqhandler;
import com.github.wxiaoqi.security.common.exception.BaseException;
import com.github.wxiaoqi.security.common.util.process.ResultCode;
import com.rabbitmq.client.Channel;
import com.xxfc.platform.order.biz.ShuntApplyBiz;
import com.xxfc.platform.order.biz.inner.OrderCancelBiz;
import com.xxfc.platform.order.entity.BaseOrder;
import com.xxfc.platform.order.entity.ShuntApply;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
......@@ -14,6 +16,7 @@ import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
import static com.xxfc.platform.order.config.RabbitDelayConfig.APPLY_CANCEL_QUE;
import static com.xxfc.platform.order.config.RabbitDelayConfig.ORDER_CANCEL_QUE;
/**
......@@ -28,6 +31,9 @@ public class RabbitConsumer {
@Autowired
OrderCancelBiz orderCancelBiz;
@Autowired
ShuntApplyBiz shuntApplyBiz;
/**
* 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
......@@ -60,4 +66,45 @@ public class RabbitConsumer {
channel.basicRecover(true);
}
}
/**
* 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
* 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完
* 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去
* spring.rabbitmq.listener.simple.acknowledge-mode = manual
*/
@RabbitListener(queues = APPLY_CANCEL_QUE)
public void applyCancelReceiveDealy(ShuntApply shuntApply, Message message, Channel channel) throws IOException {
log.info("===============接收队列接收消息====================");
log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), shuntApply.toString());
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
try {
//设置自动取消标识
// baseOrder.setCancelReason("超时未付款,系统自动取消订单");
//orderCancelBiz.cancel(baseOrder, null);
//判断updateTime 是否一致(即乐观锁)
ShuntApply dbApply = shuntApplyBiz.selectById(shuntApply.getId());
if(dbApply.getUpdTime().equals(shuntApply.getUpdTime())) {
shuntApplyBiz.updateSelectiveById(new ShuntApply(){{
setId(shuntApply.getId());
setStatus(ShuntApply.STATUS_AUTOCNL);
}});
}
}catch (BaseException e) {
if(ResultCode.DB_OPERATION_FAIL_CODE == e.getStatus()) {
log.info("取消操作被取消;订单id:"+ shuntApply.getId());
}
}catch (Exception e) {
log.error("============消费失败,尝试消息补发再次消费!==============");
log.error(e.getMessage());
/**
* basicRecover方法是进行补发操作,
* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
* 设置为false是只补发给当前的consumer
*/
channel.basicRecover(true);
}
}
}
package com.xxfc.platform.order.mqhandler;
import com.xxfc.platform.order.entity.BaseOrder;
import com.xxfc.platform.order.entity.ShuntApply;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -8,8 +9,7 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.xxfc.platform.order.config.RabbitDelayConfig.ORDER_CANCEL_EXC;
import static com.xxfc.platform.order.config.RabbitDelayConfig.ORDER_CANCEL_KEY;
import static com.xxfc.platform.order.config.RabbitDelayConfig.*;
/**
* rabbitMq生产者类
......@@ -39,5 +39,22 @@ public class RabbitProduct {
);
log.info("{}ms后执行", delayTime);
}
public void sendApplyDelayMessage(ShuntApply shuntApply, Long delayTime) {
//这里的消息可以是任意对象,无需额外配置,直接传即可
log.info("===============延时队列生产消息====================");
log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), shuntApply.toString());
this.rabbitTemplate.convertAndSend(
APPLY_CANCEL_EXC,
APPLY_CANCEL_KEY,
shuntApply,
message -> {
//注意这里时间可以使long,而且是设置header
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
}
);
log.info("{}ms后执行", delayTime);
}
}
package com.xxfc.platform.order.rest.background;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.github.wxiaoqi.security.admin.feign.UserFeign;
import com.github.wxiaoqi.security.common.context.BaseContextHandler;
......@@ -16,6 +17,7 @@ import com.xxfc.platform.order.biz.ShuntApplyBiz;
import com.xxfc.platform.order.biz.inner.OrderCalculateBiz;
import com.xxfc.platform.order.entity.BaseOrder;
import com.xxfc.platform.order.entity.ShuntApply;
import com.xxfc.platform.order.mqhandler.RabbitProduct;
import com.xxfc.platform.order.pojo.order.RentVehicleBO;
import com.xxfc.platform.order.pojo.order.add.BgAddRentDTO;
import com.xxfc.platform.order.service.OrderRentVehicleService;
......@@ -50,6 +52,9 @@ public class BgShuntApplyController extends BaseController<ShuntApplyBiz, ShuntA
@Autowired
OrderRentVehicleService orderRentVehicleService;
@Autowired
RabbitProduct rabbitProduct;
@RequestMapping(value = "/confirmApply", method = RequestMethod.POST)
@ResponseBody
@ApiOperation(value = "后台确认申请")
......@@ -85,7 +90,10 @@ public class BgShuntApplyController extends BaseController<ShuntApplyBiz, ShuntA
shuntApply.setVehicleId(dto.getVehicleId());
shuntApply.setStatus(STATUS_CONFIRM);
shuntApply.setOrderNo(detail.getOrder().getNo());
baseBiz.updateSelectiveById(shuntApply);
shuntApply.setOverTime(DateUtil.offsetHour(DateUtil.date(), 1).getTime());
baseBiz.updateSelectiveByIdRe(shuntApply);
rabbitProduct.sendApplyDelayMessage(baseBiz.selectById(shuntApply.getId()), 1000L * 3601L);
return ObjectRestResponse.succ();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment