Commit 17aabf5e authored by hezhen's avatar hezhen

Merge branch 'dev-chw' of http://113.105.137.151:22280/youjj/cloud-platform into dev-chw

parents e8af61af 71abd114
...@@ -24,7 +24,10 @@ public class ShuntApply implements Serializable { ...@@ -24,7 +24,10 @@ public class ShuntApply implements Serializable {
public static final int STATUS_CNL = 2; public static final int STATUS_CNL = 2;
public static final int STATUS_CONFIRM = 3; public static final int STATUS_CONFIRM = 3;
public static final int STATUS_ORDER = 4; public static final int STATUS_ORDER = 4;
public static final int STATUS_AUTOCNL = 5; public static final int STATUS_AUTOCNL_REQUIRE = 501;
public static final int STATUS_AUTOCNL_ORDER = 502;
public static final int STATUS_AUTOCNL_TOPAY = 503;
public static final int STATUS_AUTOCNL_PAYED = 504;
public static final int STATUS_ERR = 6; public static final int STATUS_ERR = 6;
public static final int STATUS_SUCC = 7; public static final int STATUS_SUCC = 7;
......
...@@ -27,9 +27,13 @@ public class RabbitDelayConfig { ...@@ -27,9 +27,13 @@ public class RabbitDelayConfig {
public static final String APPLY_CANCEL_EXC = "apply_cancel_delay_exchange"; public static final String APPLY_CANCEL_EXC = "apply_cancel_delay_exchange";
public static final String APPLY_CANCEL_QUE = "apply_cancel_delay_queue"; public static final String APPLY_CANCEL_QUE = "apply_cancel_delay_queue";
public static final String APPLY_CANCEL_KEY = "apply_cancel_delay_key"; public static final String APPLY_CANCEL_KEY = "apply_cancel_delay_key";
public static final String APPLY_REQUIRE_CANCEL_EXC = "apply_require_cancel_delay_exchange";
public static final String APPLY_REQUIRE_CANCEL_QUE = "apply_require_cancel_delay_queue";
public static final String APPLY_REQUIRE_CANCEL_KEY = "apply_require_cancel_delay_key";
/** /**
* 延时队列交换机 * 自动取消订单延时队列交换机
* 注意这里的交换机类型:CustomExchange * 注意这里的交换机类型:CustomExchange
* @return * @return
*/ */
...@@ -41,7 +45,7 @@ public class RabbitDelayConfig { ...@@ -41,7 +45,7 @@ public class RabbitDelayConfig {
} }
/** /**
* 延时队列交换机 * 申请下单自动取消延时队列交换机
* 注意这里的交换机类型:CustomExchange * 注意这里的交换机类型:CustomExchange
* @return * @return
*/ */
...@@ -52,9 +56,21 @@ public class RabbitDelayConfig { ...@@ -52,9 +56,21 @@ public class RabbitDelayConfig {
return new CustomExchange(APPLY_CANCEL_EXC,"x-delayed-message",true, false, args); return new CustomExchange(APPLY_CANCEL_EXC,"x-delayed-message",true, false, args);
} }
/**
* 申请需求自动取消延时队列交换机
* 注意这里的交换机类型:CustomExchange
* @return
*/
@Bean(APPLY_REQUIRE_CANCEL_EXC)
public CustomExchange applyRequireDelayExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(APPLY_REQUIRE_CANCEL_QUE,"x-delayed-message",true, false, args);
}
/** /**
* 延时队列 * 自动取消订单延时队列
* @return * @return
*/ */
@Bean(ORDER_CANCEL_QUE) @Bean(ORDER_CANCEL_QUE)
...@@ -63,7 +79,7 @@ public class RabbitDelayConfig { ...@@ -63,7 +79,7 @@ public class RabbitDelayConfig {
} }
/** /**
* 延时队列 * 申请下单自动取消延时队列
* @return * @return
*/ */
@Bean(APPLY_CANCEL_QUE) @Bean(APPLY_CANCEL_QUE)
...@@ -72,7 +88,18 @@ public class RabbitDelayConfig { ...@@ -72,7 +88,18 @@ public class RabbitDelayConfig {
} }
/** /**
* 给延时队列绑定交换机 * 申请需求自动取消延时队列
* @return
*/
@Bean(APPLY_REQUIRE_CANCEL_QUE)
public Queue applyRequireDelayQueue(){
return new Queue(APPLY_REQUIRE_CANCEL_QUE,true);
}
/**
* 自动取消订单给延时队列绑定交换机
* @return * @return
*/ */
@Bean("cfgDelayBinding") @Bean("cfgDelayBinding")
...@@ -81,12 +108,21 @@ public class RabbitDelayConfig { ...@@ -81,12 +108,21 @@ public class RabbitDelayConfig {
} }
/** /**
* 给延时队列绑定交换机 * 申请下单自动取消给延时队列绑定交换机
* @return * @return
*/ */
@Bean("acqDelayBinding") @Bean("acqDelayBinding")
public Binding acqDelayBinding(@Qualifier(APPLY_CANCEL_QUE) Queue acqDelayQueue, @Qualifier(APPLY_CANCEL_EXC) CustomExchange acqUserDelayExchange){ public Binding acqDelayBinding(@Qualifier(APPLY_CANCEL_QUE) Queue acqDelayQueue, @Qualifier(APPLY_CANCEL_EXC) CustomExchange acqUserDelayExchange){
return BindingBuilder.bind(acqDelayQueue).to(acqUserDelayExchange).with(APPLY_CANCEL_KEY).noargs(); return BindingBuilder.bind(acqDelayQueue).to(acqUserDelayExchange).with(APPLY_CANCEL_KEY).noargs();
} }
/**
* 申请下单自动取消给延时队列绑定交换机
* @return
*/
@Bean("acqRequireDelayBinding")
public Binding acqRequireDelayBinding(@Qualifier(APPLY_REQUIRE_CANCEL_QUE) Queue acqDelayQueue, @Qualifier(APPLY_REQUIRE_CANCEL_EXC) CustomExchange acqUserDelayExchange){
return BindingBuilder.bind(acqDelayQueue).to(acqUserDelayExchange).with(APPLY_REQUIRE_CANCEL_KEY).noargs();
}
} }
...@@ -3,8 +3,12 @@ package com.xxfc.platform.order.mqhandler; ...@@ -3,8 +3,12 @@ package com.xxfc.platform.order.mqhandler;
import com.github.wxiaoqi.security.common.exception.BaseException; import com.github.wxiaoqi.security.common.exception.BaseException;
import com.github.wxiaoqi.security.common.util.process.ResultCode; import com.github.wxiaoqi.security.common.util.process.ResultCode;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.xxfc.platform.order.biz.ShuntApplyBiz;
import com.xxfc.platform.order.biz.inner.OrderCancelBiz; import com.xxfc.platform.order.biz.inner.OrderCancelBiz;
import com.xxfc.platform.order.entity.BaseOrder; import com.xxfc.platform.order.entity.BaseOrder;
import com.xxfc.platform.order.entity.ShuntApply;
import com.xxfc.platform.order.pojo.order.RentVehicleBO;
import com.xxfc.platform.order.service.OrderRentVehicleService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
...@@ -14,7 +18,7 @@ import org.springframework.stereotype.Component; ...@@ -14,7 +18,7 @@ import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import static com.xxfc.platform.order.config.RabbitDelayConfig.ORDER_CANCEL_QUE; import static com.xxfc.platform.order.config.RabbitDelayConfig.*;
/** /**
* activeMq消费者类 * activeMq消费者类
...@@ -28,6 +32,12 @@ public class RabbitConsumer { ...@@ -28,6 +32,12 @@ public class RabbitConsumer {
@Autowired @Autowired
OrderCancelBiz orderCancelBiz; OrderCancelBiz orderCancelBiz;
@Autowired
ShuntApplyBiz shuntApplyBiz;
@Autowired
OrderRentVehicleService orderRentVehicleService;
/** /**
* 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
...@@ -60,4 +70,72 @@ public class RabbitConsumer { ...@@ -60,4 +70,72 @@ public class RabbitConsumer {
channel.basicRecover(true); channel.basicRecover(true);
} }
} }
@RabbitListener(queues = APPLY_CANCEL_QUE)
public void applyCancelReceiveDealy(ShuntApply shuntApply, Message message, Channel channel) throws IOException {
log.info("===============接收队列接收消息====================");
log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), shuntApply.toString());
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
try {
//判断updateTime 是否一致(即乐观锁)
ShuntApply dbApply = shuntApplyBiz.selectById(shuntApply.getId());
if(dbApply.getUpdTime().equals(shuntApply.getUpdTime())) {
shuntApplyBiz.updateSelectiveById(new ShuntApply(){{
setId(shuntApply.getId());
setStatus(ShuntApply.STATUS_AUTOCNL_ORDER);
}});
}
//取消预订
orderRentVehicleService.errorRejectVehicle(new RentVehicleBO(){{
setBookRecordId(shuntApply.getBookRecordId());
}});
}catch (BaseException e) {
if(ResultCode.DB_OPERATION_FAIL_CODE == e.getStatus()) {
log.info("取消操作被取消;订单id:"+ shuntApply.getId());
}
}catch (Exception e) {
log.error("============消费失败,尝试消息补发再次消费!==============");
log.error(e.getMessage());
/**
* basicRecover方法是进行补发操作,
* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
* 设置为false是只补发给当前的consumer
*/
channel.basicRecover(true);
}
}
@RabbitListener(queues = APPLY_REQUIRE_CANCEL_QUE)
public void applyRequireCancelReceiveDealy(ShuntApply shuntApply, Message message, Channel channel) throws IOException {
log.info("===============接收队列接收消息====================");
log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), shuntApply.toString());
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
try {
//判断updateTime 是否一致(即乐观锁)
ShuntApply dbApply = shuntApplyBiz.selectById(shuntApply.getId());
if(dbApply.getUpdTime().equals(shuntApply.getUpdTime())) {
shuntApplyBiz.updateSelectiveById(new ShuntApply(){{
setId(shuntApply.getId());
setStatus(ShuntApply.STATUS_AUTOCNL_REQUIRE);
}});
}
}catch (BaseException e) {
if(ResultCode.DB_OPERATION_FAIL_CODE == e.getStatus()) {
log.info("取消操作被取消;订单id:"+ shuntApply.getId());
}
}catch (Exception e) {
log.error("============消费失败,尝试消息补发再次消费!==============");
log.error(e.getMessage());
/**
* basicRecover方法是进行补发操作,
* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
* 设置为false是只补发给当前的consumer
*/
channel.basicRecover(true);
}
}
} }
...@@ -304,7 +304,6 @@ public class ShuntApplyController extends BaseController<ShuntApplyBiz, ShuntApp ...@@ -304,7 +304,6 @@ public class ShuntApplyController extends BaseController<ShuntApplyBiz, ShuntApp
Integer companyId = userDTO.getCompanyId(); Integer companyId = userDTO.getCompanyId();
ShuntApply shuntApply = baseBiz.selectById(applyId); ShuntApply shuntApply = baseBiz.selectById(applyId);
StewardShuntApply shuntApplyVO = BeanUtil.toBean(shuntApply, StewardShuntApply.class); StewardShuntApply shuntApplyVO = BeanUtil.toBean(shuntApply, StewardShuntApply.class);
shuntApplyVO.setVehicleCategory(vehicleFeign.getVehicleCategory(shuntApply.getCategoryId()).getData());
RentVehicleBookDTO rbd = new RentVehicleBookDTO(); RentVehicleBookDTO rbd = new RentVehicleBookDTO();
//rbd.setModelId(shuntApply.getModelId()); //rbd.setModelId(shuntApply.getModelId());
rbd.setBrandId(shuntApply.getBrandId()); rbd.setBrandId(shuntApply.getBrandId());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment