Commit 2b185c8d authored by jiaorz's avatar jiaorz

添加积分消息队列接口

parent 242b6704
......@@ -114,6 +114,12 @@ public class IntegralRule{
*/
private String name;
/**
* 规则名称
*/
private String code;
/**
* 获取积分规则
*/
......
package com.xxfc.platform.activity.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import javax.persistence.Column;
......@@ -8,6 +10,7 @@ import javax.persistence.Table;
@Table(name = "integral_user_record")
@Data
@Builder
public class IntegralUserRecord {
/**
* 主键id
......@@ -34,8 +37,8 @@ public class IntegralUserRecord {
/**
* 积分规则id
*/
@Column(name = "integral_rule_id")
private Integer integralRuleId;
@Column(name = "integral_rule_code")
private String integralRuleCode;
/**
* 积分时间
......
......@@ -10,105 +10,7 @@ public class IntegralRuleDto extends PageParam {
/**
* 主键id
*/
private Integer id;
Integer id;
/**
* 周期单位:0-天,1-周;3-月;4-季;5-年
*/
private Integer period;
/**
* 次数
*/
private Integer number;
/**
* 积分
*/
private Integer point;
/**
* 积分兑换:金额/分
*/
private BigDecimal price;
/**
* 是否连续:0-不连续;1-连续
*/
private Boolean isContinuity;
/**
* 多少天后连续算
*/
private Integer finishDay;
/**
* n天获取的积分
*/
private Integer finishPoint;
/**
* 是否启用:1-启用;0-停用
*/
private Boolean status;
/**
* 创建时间
*/
private Long crtTime;
/**
* 是否删除:0-正常;1-删除
*/
private Boolean isdel;
/**
* 规则开始时间
*/
private Long startTime;
/**
* 规则结束时间
*/
private Long endTime;
/**
* 更新时间
*/
private Long updTime;
/**
* 备注
*/
private String remarks;
/**
* 描述
*/
private String desc;
/**
* 规则图片
*/
private String img;
/**
* 排序
*/
private Integer orderId;
/**
* 规则名称
*/
private String name;
/**
* 获取积分规则
*/
private String regulation;
/**
* json字符串 如:[{"day":4,"integeral":8},{"day":4,"integeral":8}]
*/
private String otherRule;
String code;
}
package com.xxfc.platform.activity.vo;
import com.github.wxiaoqi.security.common.vo.PageParam;
import com.xxfc.platform.activity.entity.IntegralUserRecord;
import lombok.Data;
@Data
......@@ -8,6 +9,23 @@ public class IntegralUserRecordDto extends PageParam {
Integer userId;
Long startTime;
Long endTime;
Integer integralRuleId;
String integralRuleCode;
String amount;
/**
* 0-获取积分;1-抵扣积分
*/
private Integer type;
/**
* 积分数
*/
private Integer point;
/**
* 获取积分的途径id:如订单id,评论id,签到记录id
*/
private Integer channelId;
public IntegralUserRecord getIntegralUserRecord() {
return IntegralUserRecord.builder().integralRuleCode(integralRuleCode).userId(userId).channelId(channelId).point(point).type(type).isdel(false).isValid(true).build();
}
}
......@@ -60,12 +60,15 @@ public class IntegralRuleBiz extends BaseBiz<IntegralRuleMapper, IntegralRule> {
return ObjectRestResponse.succ();
}
public ObjectRestResponse<IntegralRule> getOne(IntegralRule integralRule) {
public ObjectRestResponse<IntegralRule> getOne(IntegralRuleDto integralRule) {
if(integralRule == null) {
return ObjectRestResponse.paramIsEmpty();
}
IntegralRule oldValue = mapper.selectByPrimaryKey(integralRule.getId());
return ObjectRestResponse.succ(oldValue);
List<IntegralRule> oldValue = mapper.selectAllByParam(integralRule);
if(oldValue != null && oldValue.size() > 0) {
return ObjectRestResponse.succ(oldValue.get(0));
}
return ObjectRestResponse.succ();
}
......
......@@ -14,6 +14,7 @@ import com.xxfc.platform.activity.entity.IntegralUserTotal;
import com.xxfc.platform.activity.mapper.IntegralSignRecordMapper;
import com.xxfc.platform.activity.user.UserInfoBiz;
import com.xxfc.platform.activity.util.IntegralToolsUtils;
import com.xxfc.platform.activity.vo.IntegralRuleDto;
import com.xxfc.platform.activity.vo.IntegralUserRecordDto;
import com.xxfc.platform.activity.vo.IntegralUserTotalDto;
import org.apache.commons.lang3.StringUtils;
......@@ -44,17 +45,20 @@ public class IntegralSignRecordBiz extends BaseBiz<IntegralSignRecordMapper, Int
* 签到
* @return
*/
public ObjectRestResponse add(Integer integralRuleId) {
public ObjectRestResponse add(String code) {
AppUserDTO appUserDTO = userInfoBiz.getUserInfo();
if (appUserDTO == null) {
return ObjectRestResponse.createFailedResult(508, "token is null or invalid");
}
if(StringUtils.isBlank(code)) {
return ObjectRestResponse.createDefaultFail();
}
List<IntegralSignRecord> integralSignRecordList = mapper.selectByUserId(appUserDTO.getUserid());
//第一次签到 默认连续1
if(integralSignRecordList == null || integralSignRecordList.size() <= 0) {
IntegralSignRecord integralSignRecord = new IntegralSignRecord();
IntegralRule integralRule = new IntegralRule();
integralRule.setId(integralRuleId);
IntegralRuleDto integralRule = new IntegralRuleDto();
integralRule.setCode(code);
ObjectRestResponse<IntegralRule> objectRestResponse = integralRuleBiz.getOne(integralRule);
if(objectRestResponse.getData() != null) {
integralSignRecord.setUserId(appUserDTO.getUserid());
......@@ -73,8 +77,8 @@ public class IntegralSignRecordBiz extends BaseBiz<IntegralSignRecordMapper, Int
IntegralSignRecord integralSignRecord = integralSignRecordList.get(0);
//判断最后签到的时间和当前时间是不是相差一天
int day = IntegralToolsUtils.differentDaysByMillisecond(integralSignRecord.getLastTime(), new Date().getTime());
IntegralRule integralRule = new IntegralRule();
integralRule.setId(integralRuleId);
IntegralRuleDto integralRule = new IntegralRuleDto();
integralRule.setCode(code);
ObjectRestResponse<IntegralRule> objectRestResponse = integralRuleBiz.getOne(integralRule);
if(objectRestResponse.getData() != null) {
if(day == 1) {//第二天续签
......@@ -139,13 +143,11 @@ public class IntegralSignRecordBiz extends BaseBiz<IntegralSignRecordMapper, Int
}
//添加积分记录
List<IntegralSignRecord> integralSignRecords = mapper.selectByUserId(appUserDTO.getUserid());
IntegralUserRecord integralUserRecord = new IntegralUserRecord();
IntegralUserRecordDto integralUserRecord = new IntegralUserRecordDto();
integralUserRecord.setUserId(appUserDTO.getUserid());
integralUserRecord.setType(0);
integralUserRecord.setIsdel(false);
integralUserRecord.setChannelId(integralSignRecords.get(0).getId());
integralUserRecord.setIntegralRuleId(integralRule.getId());
integralUserRecord.setIsValid(true);
integralUserRecord.setIntegralRuleCode(integralRule.getCode());
integralUserRecord.setPoint(point);
integralUserRecordBiz.add(integralUserRecord);
}
......@@ -170,7 +172,7 @@ public class IntegralSignRecordBiz extends BaseBiz<IntegralSignRecordMapper, Int
public boolean getSignNumberBoolean(AppUserDTO appUserDTO, IntegralRule integralRule) {
IntegralUserRecordDto integralUserRecordDto = new IntegralUserRecordDto();
integralUserRecordDto.setUserId(appUserDTO.getUserid());
integralUserRecordDto.setIntegralRuleId(integralRule.getId());
integralUserRecordDto.setIntegralRuleCode(integralRule.getCode());
switch (integralRule.getPeriod()) {
case 0 : //日
return getBoolean(IntegralToolsUtils.getDayStart(), IntegralToolsUtils.getDayStart() + 24 * 60 * 60 * 1000 - 1, integralUserRecordDto, integralRule);
......
package com.xxfc.platform.activity.biz;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageInfo;
import com.github.wxiaoqi.security.admin.feign.dto.AppUserDTO;
import com.github.wxiaoqi.security.common.biz.BaseBiz;
......@@ -8,18 +9,22 @@ import com.github.wxiaoqi.security.common.msg.ObjectRestResponse;
import com.github.wxiaoqi.security.common.util.Query;
import com.github.wxiaoqi.security.common.vo.PageDataVO;
import com.github.wxiaoqi.security.common.vo.PageParam;
import com.xxfc.platform.activity.entity.IntegralRule;
import com.xxfc.platform.activity.entity.IntegralUserRecord;
import com.xxfc.platform.activity.entity.IntegralUserTotal;
import com.xxfc.platform.activity.mapper.IntegralUserRecordMapper;
import com.xxfc.platform.activity.user.UserInfoBiz;
import com.xxfc.platform.activity.vo.IntegralRuleDto;
import com.xxfc.platform.activity.vo.IntegralUserRecordDto;
import com.xxfc.platform.activity.vo.IntegralUserTotalDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.List;
@Service
......@@ -32,57 +37,82 @@ public class IntegralUserRecordBiz extends BaseBiz<IntegralUserRecordMapper, Int
@Autowired
IntegralUserTotalBiz integralUserTotalBiz;
@Autowired
IntegralRuleBiz integralRuleBiz;
/**
* 添加用户积分记录
*
* @param integralUserRecord
* @return
*/
public ObjectRestResponse add(IntegralUserRecord integralUserRecord) {
if(integralUserRecord == null) {
public ObjectRestResponse add(IntegralUserRecordDto integralUserRecord) {
log.info("添加积分记录的参数:integralUserRecord = {}", integralUserRecord);
if (integralUserRecord == null || StringUtils.isBlank(integralUserRecord.getIntegralRuleCode())) {
return ObjectRestResponse.paramIsEmpty();
}
if(integralUserRecord.getType() == 0) {//获取积分 增加总积分表
ObjectRestResponse<IntegralUserTotal> objectRestResponse = integralUserTotalBiz.getByUser();
if(objectRestResponse.getStatus() == RestCode.SUCCESS.getStatus() && objectRestResponse.getData() != null) {
IntegralUserTotal integralUserTotal = objectRestResponse.getData();
IntegralUserTotalDto integralUserTotalDto = new IntegralUserTotalDto();
integralUserTotalDto.setUserId(integralUserTotal.getUserId());
integralUserTotalDto.setPoint(integralUserRecord.getPoint());
integralUserTotalBiz.add(integralUserTotalDto);
//如果参数没有积分,说明是消息队列过来的参数,需要查询规则表获取积分数
if (integralUserRecord.getPoint() == null) {
IntegralRuleDto integralRule = new IntegralRuleDto();
integralRule.setCode(integralUserRecord.getIntegralRuleCode());
ObjectRestResponse<IntegralRule> ruleObjectRestResponse = integralRuleBiz.getOne(integralRule);
if (ruleObjectRestResponse.getData() == null) {
return ObjectRestResponse.createFailedResult(1202, "积分规则不存在");
}
Integer point = 0;
Integer amount = Integer.parseInt(new BigDecimal(integralUserRecord.getAmount()).divide(new BigDecimal("100"), 0, BigDecimal.ROUND_DOWN).toString());
if(ruleObjectRestResponse.getData().getPoint() == 0) {//没有基础分需要计算分数
JSONObject jsonObject = JSONObject.parseObject(ruleObjectRestResponse.getData().getOtherRule());
log.info("查询的其他规则json信息:jsonObject = {}", jsonObject);
if(jsonObject == null) {
point = ruleObjectRestResponse.getData().getPoint();
} else {
point = jsonObject.getInteger("rule") == null? 0 * amount: jsonObject.getInteger("rule") * amount;
}
} else {
point = ruleObjectRestResponse.getData().getPoint();
}
log.info("查询的其他规则积分数:point = {}", point);
//把规则表中的积分数设置到参数对象中,然后进行后续操作
integralUserRecord.setPoint(point);
}
if (integralUserRecord.getType() == 0) {//获取积分 增加总积分表
IntegralUserTotalDto integralUserTotalDto = new IntegralUserTotalDto();
integralUserTotalDto.setUserId(integralUserRecord.getUserId());
integralUserTotalDto.setPoint(integralUserRecord.getPoint());
integralUserTotalBiz.add(integralUserTotalDto);
}
} else if(integralUserRecord.getType() == 1) {//扣减积分
integralUserTotalBiz.update(integralUserTotalDto);
} else if (integralUserRecord.getType() == 1) {//扣减积分
ObjectRestResponse<IntegralUserTotal> objectRestResponse = integralUserTotalBiz.getByUser();
if(objectRestResponse.getStatus() == RestCode.SUCCESS.getStatus() && objectRestResponse.getData() != null) {
if (objectRestResponse.getStatus() == RestCode.SUCCESS.getStatus() && objectRestResponse.getData() != null) {
IntegralUserTotal integralUserTotal = objectRestResponse.getData();
IntegralUserTotalDto integralUserTotalDto = new IntegralUserTotalDto();
integralUserTotalDto.setUserId(integralUserTotal.getUserId());
integralUserTotalDto.setPoint(- integralUserRecord.getPoint());
integralUserTotalBiz.add(integralUserTotalDto);
integralUserTotalDto.setPoint(-integralUserRecord.getPoint());
integralUserTotalBiz.update(integralUserTotalDto);
} else {
return ObjectRestResponse.createFailedResult(1008,"用户积分不足");
return ObjectRestResponse.createFailedResult(1008, "用户积分不足");
}
}
insertSelective(integralUserRecord);
insertSelective(integralUserRecord.getIntegralUserRecord());
return ObjectRestResponse.succ();
}
/**
* 删除一个用户记录
*
* @param id
* @return
*/
public ObjectRestResponse deleteOne(Integer id) {
if(id == null || id <=0 ) {
log.info("删除用户积分记录的参数:id = {}", id);
if (id == null || id <= 0) {
return ObjectRestResponse.paramIsEmpty();
}
IntegralUserRecord integralUserRecord = mapper.selectByPrimaryKey(id);
if(integralUserRecord == null) {
if (integralUserRecord == null) {
log.info("删除的用户记录不存在,要删除的id ={}", id);
return ObjectRestResponse.createDefaultFail();
}
......@@ -93,20 +123,24 @@ public class IntegralUserRecordBiz extends BaseBiz<IntegralUserRecordMapper, Int
/**
* 根据获取某个用户的列表
*
* @return
*/
public ObjectRestResponse<PageDataVO> getUserList(IntegralUserRecordDto integralUserRecordDto) {
log.info("获取用户积分记录的参数:integralUserRecordDto = {}", integralUserRecordDto.toString());
AppUserDTO appUserDTO = userInfoBiz.getUserInfo();
if (appUserDTO == null) {
return ObjectRestResponse.createFailedResult(508, "token is null or invalid");
}
integralUserRecordDto.setUserId(appUserDTO.getUserid());
Query query = new Query(integralUserRecordDto);
PageDataVO pageDataVO = PageDataVO.pageInfo(query, () -> mapper.selectByUserId(appUserDTO.getUserid()));
return ObjectRestResponse.succ(pageDataVO);
}
public ObjectRestResponse<List<IntegralUserRecord>> getByUserAndTime(IntegralUserRecordDto integralUserRecordDto) {
if(integralUserRecordDto == null) {
log.info("获取用户积分记录的参数:integralUserRecordDto = {}", integralUserRecordDto.toString());
if (integralUserRecordDto == null) {
return ObjectRestResponse.paramIsEmpty();
}
List<IntegralUserRecord> integralUserRecordList = mapper.selectByUserAndTime(integralUserRecordDto);
......
......@@ -55,6 +55,34 @@ public class IntegralUserTotalBiz extends BaseBiz<IntegralUserTotalMapper, Integ
return ObjectRestResponse.succ();
}
public ObjectRestResponse update(IntegralUserTotalDto integralUserTotalDto) {
if(integralUserTotalDto == null || integralUserTotalDto.getUserId() == null) {
return ObjectRestResponse.paramIsEmpty();
}
List<IntegralUserTotal> integralUserTotalList = mapper.selectAllByParam(integralUserTotalDto);
if(integralUserTotalList != null && integralUserTotalList.size() > 0) {
if(integralUserTotalDto.getPoint() > 0) {//加
integralUserTotalList.get(0).setTotalPoint(integralUserTotalList.get(0).getTotalPoint() + integralUserTotalDto.getPoint());
integralUserTotalList.get(0).setRestPoint(integralUserTotalList.get(0).getRestPoint() + integralUserTotalDto.getPoint());
updateByIdRe( integralUserTotalList.get(0));
return ObjectRestResponse.succ();
} else { //减积分
integralUserTotalList.get(0).setRestPoint(integralUserTotalList.get(0).getTotalPoint() + integralUserTotalDto.getPoint());
updateByIdRe( integralUserTotalList.get(0));
return ObjectRestResponse.succ();
}
}
IntegralUserTotal integralUserTotal = new IntegralUserTotal();
integralUserTotal.setUserId(integralUserTotalDto.getUserId());
integralUserTotal.setTotalPoint(integralUserTotalDto.getPoint());
integralUserTotal.setRestPoint(integralUserTotalDto.getPoint());
integralUserTotal.setIsdel(false);
insertSelective(integralUserTotal);
return ObjectRestResponse.succ();
}
/**
* 删除信息
* @return
......
package com.xxfc.platform.activity.handler;
import com.alibaba.fastjson.JSONObject;
import com.xxfc.platform.activity.biz.IntegralUserRecordBiz;
import com.xxfc.platform.activity.vo.IntegralUserRecordDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class IntegralMQHandler {
@Autowired
IntegralUserRecordBiz integralUserRecordBiz;
@RabbitListener(queues = "integral_queue")
public void integralHandler(String json) {
log.info("接收到的消息:json = {}", json);
try{
if(StringUtils.isNotBlank(json)) {
IntegralUserRecordDto integralUserRecordDto = JSONObject.parseObject(json, IntegralUserRecordDto.class);
integralUserRecordBiz.add(integralUserRecordDto);
}
}catch (Exception e){
log.info("接收到的消息失败");
e.printStackTrace();
}
}
}
......@@ -37,8 +37,8 @@ public class IntegralRuleController {
@GetMapping(value = "/one")
@ApiOperation(value = "根据id获取制定规则")
public ObjectRestResponse<IntegralRule> getOne(IntegralRule integralRule) {
return integralRuleBiz.getOne(integralRule);
public ObjectRestResponse<IntegralRule> getOne(IntegralRuleDto integralRuleDto) {
return integralRuleBiz.getOne(integralRuleDto);
}
@GetMapping(value = "/list")
......
......@@ -17,8 +17,8 @@ public class IntegralSignRecordController {
IntegralSignRecordBiz integralSignRecordBiz;
@PostMapping(value = "/add")
public ObjectRestResponse add(Integer integralRuleId) {
return integralSignRecordBiz.add(integralRuleId);
public ObjectRestResponse add(String integralRuleCode) {
return integralSignRecordBiz.add(integralRuleCode);
}
}
......@@ -21,8 +21,8 @@ public class IntegralUserRecordController {
@PostMapping(value = "/add")
public ObjectRestResponse add(IntegralUserRecord integralUserRecord) {
return integralUserRecordBiz.add(integralUserRecord);
public ObjectRestResponse add(IntegralUserRecordDto integralUserRecordDto) {
return integralUserRecordBiz.add(integralUserRecordDto);
}
@PostMapping(value = "/delete")
......
......@@ -24,6 +24,7 @@
<result column="img" property="img" jdbcType="VARCHAR" />
<result column="order_id" property="orderId" jdbcType="INTEGER" />
<result column="name" property="name" jdbcType="VARCHAR" />
<result column="code" property="code" jdbcType="VARCHAR" />
<result column="regulation" property="regulation" jdbcType="LONGVARCHAR" />
<result column="other_rule" property="otherRule" jdbcType="LONGVARCHAR" />
</resultMap>
......@@ -35,6 +36,9 @@
<if test="id != null">
and id = #{id}
</if>
<if test="code != null">
and code = #{code}
</if>
and isdel = 0 and status = 1
</where>
order by order_id DESC ,crt_time DESC
......
......@@ -9,7 +9,7 @@
<result column="user_id" property="userId" jdbcType="INTEGER" />
<result column="type" property="type" jdbcType="INTEGER" />
<result column="point" property="point" jdbcType="INTEGER" />
<result column="integral_rule_id" property="integralRuleId" jdbcType="INTEGER" />
<result column="integral_rule_code" property="integralRuleCode" jdbcType="INTEGER" />
<result column="crt_time" property="crtTime" jdbcType="BIGINT" />
<result column="is_valid" property="isValid" jdbcType="BIT" />
<result column="isdel" property="isdel" jdbcType="BIT" />
......@@ -27,6 +27,6 @@
</select>
<select id="selectByUserAndTime" resultType="com.xxfc.platform.activity.entity.IntegralUserRecord" parameterType="com.xxfc.platform.activity.vo.IntegralUserRecordDto">
select * from integral_user_record
where user_id = #{userId} and crt_time between #{startTime} and #{endTime} and integral_rule_id = #{integralRuleId}
where user_id = #{userId} and crt_time between #{startTime} and #{endTime} and integral_rule_code = #{integralRuleCode}
</select>
</mapper>
\ No newline at end of file
......@@ -30,6 +30,10 @@
<artifactId>jsoup</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.jdom</groupId>
<artifactId>jdom</artifactId>
......
......@@ -3,11 +3,12 @@ package com.xxfc.platform.universal.entity;
import lombok.Data;
import javax.persistence.*;
import java.io.Serializable;
import java.util.Set;
@Data
@Table(name = "data_dictionary")
public class Dictionary {
public class Dictionary implements Serializable {
@Id
private Integer id;
......
package com.xxfc.platform.universal.feign;
import com.github.wxiaoqi.security.common.msg.ObjectRestResponse;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(value = "xx-universal", contextId = "sendMessage")
public interface MQSerderFeign {
@GetMapping(value = "/message/sendMessage")
public ObjectRestResponse sendMessage(@RequestParam(value = "exchange") String exchange, @RequestParam(value = "routingKey")String routKey, @RequestParam(value = "jsonParam") String jsonParam);
}
package com.xxfc.platform.universal.mq;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
* 项目中Mq的自动配置
*
* @author Vangelis
* @date 2019-06-01 20:36
*/
@Configuration
@Slf4j
@AllArgsConstructor
public class MQAutoConfiguration {
private final ConnectionFactory connectionFactory;
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//开启回调 必须设置为true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
//实现消息到达交换器的回调方法
log.info("RabbitMQ sendMessage-->confirm-->start correlationData:{},ack:{},cause:{}",correlationData,ack,cause);
log.info("RabbitMQ sendMessage-->confirm-->end");
});
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{
//实现消息到达队列的回调方法
log.info("RabbitMQ sendMessage-->returnedMessage-->start message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}"
,message.toString(),replyCode,replyText,exchange,routingKey);
log.info("RabbitMQ sendMessage-->returnedMessage-->end");
});
return rabbitTemplate;
}
/**
* 监听器中默认封装实现
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
/**
* 配置启用rabbitmq事务
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
package com.xxfc.platform.universal.mq;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
@Configuration
public class RabbitConfig implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
}
\ No newline at end of file
package com.xxfc.platform.universal.mq;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
/**
* RabbitMQ封装发送消息
*
* @author Vangelis
* @date 2019-06-02 11:56
*/
@AllArgsConstructor
@Slf4j
@Component
@Scope("prototype")
public class RabbitMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init() {
// 使用当前类作为发送后回调通知
rabbitTemplate.setConfirmCallback(this);
//是否当前类作为返回失败错误处理
rabbitTemplate.setReturnCallback(this);
}
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息已经确认收到");
}else {
log.info("消息没有确认收到,你该在这里写没有收到消息的处理代码");
}
}
/**
* 开启事务发送消息的封装
* @param exchange exchange
* @param routingKey routingKey
* @param msg 消息体
*/
@Transactional(rollbackFor = Exception.class)
public void sendMsg(String exchange,String routingKey,Object msg) {
log.info("使用事务发送消息,msg{}", msg);
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
}
/**
* Returned message callback.
*
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息返回没有收到");
}
}
......@@ -34,6 +34,7 @@
<artifactId>aliyun-java-sdk-dysmsapi</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
......
package com.xxfc.platform.universal.biz;
import com.github.wxiaoqi.security.common.msg.ObjectRestResponse;
import com.xxfc.platform.universal.entity.Dictionary;
import lombok.AllArgsConstructor;
import org.apache.poi.ss.formula.functions.T;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Service
@AllArgsConstructor
public class MQServiceBiZ {
private RabbitTemplate rabbitTemplate;
public ObjectRestResponse sendMessage(String exchange, String routKey, String json) {
rabbitTemplate.convertAndSend(exchange, routKey, json);
return ObjectRestResponse.succ();
}
}
package com.xxfc.platform.universal.controller;
import com.github.wxiaoqi.security.common.msg.ObjectRestResponse;
import com.xxfc.platform.universal.biz.MQServiceBiZ;
import com.xxfc.platform.universal.entity.Dictionary;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "message")
public class MQSenderController {
@Autowired
private MQServiceBiZ mqServiceBiZ;
@GetMapping(value = "/sendMessage")
public ObjectRestResponse sendMessage(String exchange, String routKey, String json) {
return mqServiceBiZ.sendMessage(exchange, routKey, json);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment