package
com.momfo.web.mns.service
import java
.util.Random
import org
.apache.commons.lang3
.StringUtils
import org
.apache.logging.log4j
.LogManager
import org
.apache.logging.log4j
.Logger
import org
.springframework.beans.factory.annotation.Autowired
import org
.springframework.context.ApplicationListener
import org
.springframework.context.event.ContextRefreshedEvent
import org
.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import
com.alibaba.dubbo.config.annotation.Reference
import
com.alibaba.dubbo.config.annotation.Service
import
com.aliyun.mns.client.CloudAccount
import
com.aliyun.mns.client.CloudQueue
import
com.aliyun.mns.client.MNSClient
import
com.aliyun.mns.model.Message
import
com.momfo.constants.mns.QueueCode
import
com.momfo.constants.status.HouseBuyBuyStatus
import
com.momfo.dto.req.jx.query.BidApplyQueryReq
import
com.momfo.dto.rsp.jx.bid.BidApplyRsp
import
com.momfo.dto.rsp.jx.query.BidApplyQueryRsp
import
com.momfo.face.house.HouseBuyNotifySer
import
com.momfo.face.house.HouseBuySer
import
com.momfo.face.jxpay.QuerySer
import
com.momfo.frameWork.beanutils.BeanUtils
import
com.momfo.model.house.HouseBuy
@Service
public class QueueConsumerSer implements ApplicationListener<ContextRefreshedEvent> {
static final Logger logger = LogManager
.getLogger(QueueConsumerSer
.class.getName())
@Autowired
private CloudAccount cloudAccount
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor
@Reference(version =
"1.0.0")
private HouseBuyNotifySer houseBuyNotifySer
@Reference(version =
"1.0.0")
private HouseBuySer houseBuySer
@Reference(version =
"1.0.0")
private QuerySer querySer
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event
.getApplicationContext()
.getParent() != null) {
return
}
logger
.info(
"--buyTimeout--启动时,启动三个线程处理订单超时问题-start")
// 处理house_buy中的订单超时问题-start
// 由
3个线程,处理消息队列的的消息
for (int i =
0
threadPoolTaskExecutor
.execute(new RunnableMessage())
}
logger
.info(
"--buyTimeout--启动时,启动三个线程处理订单超时问题-end")
// 处理house_buy中的订单超时问题-end
}
class RunnableMessage implements Runnable {
@Override
public void run() {
logger
.info(
"--buyTimeout--启动1个线程处理订单超时问题--Thread.Id=" + Thread
.currentThread()
.getId())
MNSClient client = cloudAccount
.getMNSClient()
CloudQueue queue = client
.getQueueRef(QueueCode
.buyTimeout)
boolean isExit = false
while (!isExit) {
Message msg = queue
.popMessage()
try {
if (msg != null) {
dealMessage(msg, queue)
}
} catch (Exception e) {
logger
.info(
"--buyTimeout--停止该线程处理订单超时问题--Thread.Id=" + Thread
.currentThread()
.getId())
isExit = true
threadPoolTaskExecutor
.execute(new RunnableMessage())
}
// 随机休眠
0--
300毫秒之间
try {
Thread
.currentThread()
Thread
.sleep(new Random()
.nextInt(
300))
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e
.printStackTrace()
}
}
}
}
private void dealMessage(Message msg, CloudQueue queue) {
if (msg != null) {
// house_buy中的id问题
String msgBody = msg
.getMessageBody()
.toString()
msgBody = msgBody
.replace(
"\"",
"")
String id = msgBody
if (StringUtils
.isNotEmpty(id)) {
HouseBuy hb = houseBuySer
.getById(id)
// 订单没有处理
if (hb != null && HouseBuyBuyStatus
.paynot.getValue()
.equals(hb
.getBuyStatus())) {
logger
.info(
"处理一个超时的订单:house_buy中的id:" + id)
BidApplyQueryReq bidApplyQueryReq = new BidApplyQueryReq()
// accountId 电子账号 A
19 M 投资人电子账号
bidApplyQueryReq
.setAccountId(hb
.getAccountId())
// orgOrderId 原订单号 A
30 M 原购买债权订单号
bidApplyQueryReq
.setOrgOrderId(hb
.getBuyOrderno())
// acqRes 请求方保留 A
200 C
// bidApplyQueryReq
.setAcqRes(
"acqRes")
BidApplyQueryRsp bidApplyQueryRsp = querySer
.bidApplyQuery(bidApplyQueryReq)
BidApplyRsp bidApplyRsp = new BidApplyRsp()
BeanUtils
.copyProperties(bidApplyRsp, bidApplyQueryRsp)
bidApplyRsp
.setOrderId(hb
.getBuyOrderno())
houseBuyNotifySer
.houseBuyNotifyJx(bidApplyRsp)
}
}
// 删除消息
queue
.deleteMessage(msg
.getReceiptHandle())
}
}
}
转载请注明原文地址: https://ju.6miu.com/read-2149.html