Commit f7af8281 authored by jiaorz's avatar jiaorz

添加积分消息队列接口

parent cc335a55
...@@ -28,20 +28,20 @@ public class IntegralMQHandler { ...@@ -28,20 +28,20 @@ public class IntegralMQHandler {
@RabbitListener(queues = "integral_queue") @RabbitListener(queues = "integral_queue")
public void integralHandler(Message message, @Headers Map<String, Object> headers, Channel channel) { public void integralHandler(Message message, @Headers Map<String, Object> headers, Channel channel) {
try{
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
log.info("接收到的消息:msg = {}, 消息ID是:messageId = {} ", msg, messageId);
ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new Runnable() { executorService.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try{
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
log.info("接收到的消息:msg = {}, 消息ID是:messageId = {} ", msg, messageId);
if(StringUtils.isNotBlank(msg)) { if(StringUtils.isNotBlank(msg)) {
IntegralUserRecordDto integralUserRecordDto = JSONObject.parseObject(msg, IntegralUserRecordDto.class); IntegralUserRecordDto integralUserRecordDto = JSONObject.parseObject(msg, IntegralUserRecordDto.class);
integralUserRecordBiz.add(integralUserRecordDto); integralUserRecordBiz.add(integralUserRecordDto);
} }
}
});
executorService.shutdown(); executorService.shutdown();
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收 // 手动签收
...@@ -55,7 +55,8 @@ public class IntegralMQHandler { ...@@ -55,7 +55,8 @@ public class IntegralMQHandler {
} }
e.printStackTrace(); e.printStackTrace();
} }
}
});
} }
......
...@@ -39,6 +39,11 @@ ...@@ -39,6 +39,11 @@
<artifactId>jdom</artifactId> <artifactId>jdom</artifactId>
<version>1.1.3</version> <version>1.1.3</version>
</dependency> </dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.2</version>
</dependency>
<!-- httpclient --> <!-- httpclient -->
<dependency> <dependency>
<groupId>org.apache.httpcomponents</groupId> <groupId>org.apache.httpcomponents</groupId>
......
package com.xxfc.platform.universal.filter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
public class AcceptFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
return event.getLoggerName().startsWith("com.xxfc.platform") ? FilterReply.ACCEPT : FilterReply.DENY;
}
}
\ No newline at end of file
package com.xxfc.platform.universal.filter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.filter.Filter;
import ch.qos.logback.core.spi.FilterReply;
public class DenyFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
return event.getLoggerName().startsWith("com.xxfc.platform") ? FilterReply.DENY : FilterReply.ACCEPT;
}
}
\ No newline at end of file
...@@ -3,10 +3,8 @@ package com.xxfc.platform.universal.mq; ...@@ -3,10 +3,8 @@ package com.xxfc.platform.universal.mq;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
...@@ -54,12 +52,4 @@ public class MQAutoConfiguration { ...@@ -54,12 +52,4 @@ public class MQAutoConfiguration {
factory.setMaxConcurrentConsumers(10); factory.setMaxConcurrentConsumers(10);
return factory; return factory;
} }
/**
* 配置启用rabbitmq事务
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
} }
...@@ -48,7 +48,6 @@ public class CertifHttpUtils { ...@@ -48,7 +48,6 @@ public class CertifHttpUtils {
Map<String, String> querys) Map<String, String> querys)
throws Exception { throws Exception {
HttpClient httpClient = wrapClient(host); HttpClient httpClient = wrapClient(host);
HttpGet request = new HttpGet(buildUrl(host, path, querys)); HttpGet request = new HttpGet(buildUrl(host, path, querys));
for (Map.Entry<String, String> e : headers.entrySet()) { for (Map.Entry<String, String> e : headers.entrySet()) {
request.addHeader(e.getKey(), e.getValue()); request.addHeader(e.getKey(), e.getValue());
......
...@@ -7,6 +7,7 @@ import org.springframework.amqp.core.MessageBuilder; ...@@ -7,6 +7,7 @@ import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID; import java.util.UUID;
...@@ -16,6 +17,7 @@ public class MQServiceBiZ { ...@@ -16,6 +17,7 @@ public class MQServiceBiZ {
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
@Transactional(rollbackFor = Exception.class)
public ObjectRestResponse sendMessage(String exchange, String routKey, String json) { public ObjectRestResponse sendMessage(String exchange, String routKey, String json) {
Message message = MessageBuilder.withBody(json.getBytes()) Message message = MessageBuilder.withBody(json.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8") .setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
......
logging:
config: classpath:logback.xml
level:
com.xxfc.platform.universal:
debug
com.xxfc.platform.common:
debug
\ No newline at end of file
...@@ -17,6 +17,7 @@ spring: ...@@ -17,6 +17,7 @@ spring:
nacos: nacos:
config: config:
file-extension: yaml file-extension: yaml
--- ---
spring: spring:
profiles: dev profiles: dev
......
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径 -->
<property name="LOG_HOME" value="${system.log.path:-logs}"/>
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!--1. 输出到控制台-->
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
</appender>
<appender name="SYSTEM_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 按照每天生成日志文件 -->
<filter class="com.xxfc.platform.universal.filter.DenyFilter"></filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/sys.%d{yyyy-MM-dd}.log</FileNamePattern><!--日志文件输出的文件名 -->
<MaxHistory>30</MaxHistory><!--日志文件保留天数 -->
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
</encoder>
</appender>
<appender name="WEB_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 按照每天生成日志文件 -->
<filter class="com.xxfc.platform.universal.filter.AcceptFilter"></filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/log.%d{yyyy-MM-dd}.log</FileNamePattern><!--日志文件输出的文件名 -->
<MaxHistory>30</MaxHistory><!--日志文件保留天数 -->
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{50} - %msg%n</pattern><!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
</encoder>
</appender>
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.0.181:5044</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" >
<!--"appname":"springboot21-log-elk" 的作用是指定创建索引的名字时用,并且在生成的文档中会多了这个字段
在logstashindex中引入 index => "%{[appname]}-%{+YYYY.MM.dd}"-->
<customFields>{"appName":"elk-log-service-operate-dev"}</customFields>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment