Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
C
cloud-platform
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
youjj
cloud-platform
Commits
6d621aee
Commit
6d621aee
authored
Dec 27, 2019
by
jiaorz
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
活动注册加锁
parent
3e831829
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
45 additions
and
35 deletions
+45
-35
ActivityPopularizeBiz.java
...com/xxfc/platform/activity/biz/ActivityPopularizeBiz.java
+15
-17
PopularizeMQHandler.java
...m/xxfc/platform/activity/handler/PopularizeMQHandler.java
+30
-18
No files found.
xx-activity/xx-activity-server/src/main/java/com/xxfc/platform/activity/biz/ActivityPopularizeBiz.java
View file @
6d621aee
...
@@ -19,14 +19,12 @@ import com.xxfc.platform.universal.feign.MQSenderFeign;
...
@@ -19,14 +19,12 @@ import com.xxfc.platform.universal.feign.MQSenderFeign;
import
lombok.RequiredArgsConstructor
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.redisson.api.RLock
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
java.math.BigDecimal
;
import
java.math.BigDecimal
;
import
java.util.Date
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
import
java.util.concurrent.TimeUnit
;
import
static
com
.
github
.
wxiaoqi
.
security
.
common
.
constant
.
CommonConstants
.
SYS_FALSE
;
import
static
com
.
github
.
wxiaoqi
.
security
.
common
.
constant
.
CommonConstants
.
SYS_FALSE
;
import
static
com
.
github
.
wxiaoqi
.
security
.
common
.
constant
.
CommonConstants
.
SYS_TRUE
;
import
static
com
.
github
.
wxiaoqi
.
security
.
common
.
constant
.
CommonConstants
.
SYS_TRUE
;
...
@@ -86,13 +84,13 @@ public class ActivityPopularizeBiz extends BaseBiz<ActivityPopularizeMapper, Act
...
@@ -86,13 +84,13 @@ public class ActivityPopularizeBiz extends BaseBiz<ActivityPopularizeMapper, Act
}});
}});
log
.
info
(
"活动邀请注册:activityPopularize = {}"
,
activityPopularize
.
toString
());
log
.
info
(
"活动邀请注册:activityPopularize = {}"
,
activityPopularize
.
toString
());
if
(
activityPopularize
!=
null
)
{
if
(
activityPopularize
!=
null
)
{
String
lockKey
=
String
.
format
(
"%s%d%d"
,
registerQueueDTO
.
getInParamDTO
().
getActivityCode
(),
appUserDTO
.
getInviterAccount
(),
activityPopularize
.
getId
());
//
String lockKey = String.format("%s%d%d", registerQueueDTO.getInParamDTO().getActivityCode(), appUserDTO.getInviterAccount(), activityPopularize.getId());
RLock
rLock
=
redissonLock
.
getRLock
(
lockKey
);
//
RLock rLock = redissonLock.getRLock(lockKey);
try
{
//
try {
boolean
isSuccess
=
rLock
.
tryLock
(
5
,
5
,
TimeUnit
.
SECONDS
);
//
boolean isSuccess = rLock.tryLock(5, 5, TimeUnit.SECONDS);
if
(
isSuccess
)
{
//
if (isSuccess) {
log
.
info
(
"1, {}"
,
System
.
currentTimeMillis
());
log
.
info
(
"1, {}"
,
System
.
currentTimeMillis
());
Thread
.
sleep
(
1000
);
//
Thread.sleep(1000);
log
.
info
(
"2, {}"
,
System
.
currentTimeMillis
());
log
.
info
(
"2, {}"
,
System
.
currentTimeMillis
());
List
<
ActivityPopularizeItem
>
activityPopularizeItems
=
activityPopularizeItemBiz
.
selectByPopularizeId
(
activityPopularize
.
getId
());
List
<
ActivityPopularizeItem
>
activityPopularizeItems
=
activityPopularizeItemBiz
.
selectByPopularizeId
(
activityPopularize
.
getId
());
ActivityPopularizeItem
activityPopularizeItem
=
activityPopularizeItems
.
get
(
activityPopularizeItems
.
size
()
-
1
);
ActivityPopularizeItem
activityPopularizeItem
=
activityPopularizeItems
.
get
(
activityPopularizeItems
.
size
()
-
1
);
...
@@ -206,15 +204,15 @@ public class ActivityPopularizeBiz extends BaseBiz<ActivityPopularizeMapper, Act
...
@@ -206,15 +204,15 @@ public class ActivityPopularizeBiz extends BaseBiz<ActivityPopularizeMapper, Act
}}));
}}));
}
}
}
}
}
else
{
//
} else {
// 获取锁失败
//
// 获取锁失败
log
.
info
(
"tryLock fail, key = [{}]"
,
lockKey
);
//
log.info("tryLock fail, key = [{}]", lockKey);
}
//
}
}
catch
(
InterruptedException
e
)
{
//
} catch (InterruptedException e) {
log
.
error
(
"tryLock fail, key = [{}]"
,
lockKey
);
//
log.error("tryLock fail, key = [{}]", lockKey);
}
finally
{
//
} finally {
log
.
info
(
"执行结束, {}"
,
System
.
currentTimeMillis
());
//
log.info("执行结束, {}", System.currentTimeMillis());
}
//
}
}
}
}
}
}
}
...
...
xx-activity/xx-activity-server/src/main/java/com/xxfc/platform/activity/handler/PopularizeMQHandler.java
View file @
6d621aee
...
@@ -18,8 +18,8 @@ import org.springframework.stereotype.Component;
...
@@ -18,8 +18,8 @@ import org.springframework.stereotype.Component;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.util.Map
;
import
java.util.Map
;
import
java.util.concurrent.
ExecutorService
;
import
java.util.concurrent.
*
;
import
java.util.concurrent.
Executors
;
import
java.util.concurrent.
locks.StampedLock
;
@Component
@Component
@Slf4j
@Slf4j
...
@@ -30,38 +30,50 @@ public class PopularizeMQHandler {
...
@@ -30,38 +30,50 @@ public class PopularizeMQHandler {
@Autowired
@Autowired
ActivityUserJoinBiz
activityUserJoinBiz
;
ActivityUserJoinBiz
activityUserJoinBiz
;
// 同时并发执行的线程数
public
static
int
threadTotal
=
10
;
final
Semaphore
semaphore
=
new
Semaphore
(
threadTotal
);
final
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
10
);
private
final
static
StampedLock
lock
=
new
StampedLock
();
//@RabbitListener(queues = {RabbitActivityConfig.POPULARZIE_REGISTER_QUEUE,RabbitActivityConfig.POPULARZIE_AUTH_QUEUE})
//@RabbitListener(queues = {RabbitActivityConfig.POPULARZIE_REGISTER_QUEUE,RabbitActivityConfig.POPULARZIE_AUTH_QUEUE})
@RabbitListener
(
queues
=
{
RabbitActivityConfig
.
POPULARZIE_0101_QUEUE
})
@RabbitListener
(
queues
=
{
RabbitActivityConfig
.
POPULARZIE_0101_QUEUE
})
public
void
popularizeHandler
(
Message
message
,
@Headers
Map
<
String
,
Object
>
headers
,
Channel
channel
)
{
public
void
popularizeHandler
(
Message
message
,
@Headers
Map
<
String
,
Object
>
headers
,
Channel
channel
)
{
ExecutorService
executorService
=
Executors
.
newCachedThreadPool
();
ExecutorService
executorService
=
Executors
.
newCachedThreadPool
();
executorService
.
execute
(
new
Runnable
()
{
executorService
.
execute
(()
->
{
@Override
try
{
public
void
run
()
{
semaphore
.
acquire
();
try
{
String
messageId
=
message
.
getMessageProperties
().
getMessageId
();
String
messageId
=
message
.
getMessageProperties
().
getMessageId
();
String
msg
=
new
String
(
message
.
getBody
(),
"UTF-8"
);
String
msg
=
new
String
(
message
.
getBody
(),
"UTF-8"
);
log
.
info
(
"接收到的消息:msg = {}, 消息ID是:messageId = {} "
,
msg
,
messageId
);
log
.
info
(
"接收到的消息:msg = {}, 消息ID是:messageId = {} "
,
msg
,
messageId
);
if
(
StringUtils
.
isNotBlank
(
msg
))
{
if
(
StringUtils
.
isNotBlank
(
msg
))
{
RegisterQueueDTO
registerQueueDTO
=
JSONUtil
.
toBean
(
msg
,
RegisterQueueDTO
.
class
);
long
stamp
=
lock
.
writeLock
();
activityPopularizeBiz
.
handleRegister
(
registerQueueDTO
);
try
{
RegisterQueueDTO
registerQueueDTO
=
JSONUtil
.
toBean
(
msg
,
RegisterQueueDTO
.
class
);
activityPopularizeBiz
.
handleRegister
(
registerQueueDTO
);
}
finally
{
lock
.
unlock
(
stamp
);
}
}
}
executorService
.
shutdown
();
Long
deliveryTag
=
(
Long
)
headers
.
get
(
AmqpHeaders
.
DELIVERY_TAG
);
Long
deliveryTag
=
(
Long
)
headers
.
get
(
AmqpHeaders
.
DELIVERY_TAG
);
// 手动签收
// 手动签收
channel
.
basicAck
(
deliveryTag
,
false
);
channel
.
basicAck
(
deliveryTag
,
false
);
}
catch
(
Exception
e
)
{
semaphore
.
release
();
log
.
info
(
"接收到的消息失败"
);
}
catch
(
Exception
e
)
{
try
{
log
.
info
(
"接收到的消息失败"
);
channel
.
basicNack
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
,
false
);
try
{
}
catch
(
IOException
i
)
{
channel
.
basicNack
(
message
.
getMessageProperties
().
getDeliveryTag
(),
false
,
false
);
log
.
error
(
e
.
getMessage
(),
i
);
}
catch
(
IOException
i
)
{
}
log
.
error
(
e
.
getMessage
(),
i
);
log
.
error
(
e
.
getMessage
(),
e
);
}
}
log
.
error
(
e
.
getMessage
(),
e
);
}
}
executorService
.
shutdown
();
countDownLatch
.
countDown
();
});
});
}
}
//新人注册有礼
//新人注册有礼
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment