spring多线程消费消息

    xiaoxiao2021-03-25  190

    package com.momfo.web.mns.service; import java.util.Random; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import com.alibaba.dubbo.config.annotation.Reference; import com.alibaba.dubbo.config.annotation.Service; import com.aliyun.mns.client.CloudAccount; import com.aliyun.mns.client.CloudQueue; import com.aliyun.mns.client.MNSClient; import com.aliyun.mns.model.Message; import com.momfo.constants.mns.QueueCode; import com.momfo.constants.status.HouseBuyBuyStatus; import com.momfo.dto.req.jx.query.BidApplyQueryReq; import com.momfo.dto.rsp.jx.bid.BidApplyRsp; import com.momfo.dto.rsp.jx.query.BidApplyQueryRsp; import com.momfo.face.house.HouseBuyNotifySer; import com.momfo.face.house.HouseBuySer; import com.momfo.face.jxpay.QuerySer; import com.momfo.frameWork.beanutils.BeanUtils; import com.momfo.model.house.HouseBuy; /** * 消费队列的方法 */ @Service public class QueueConsumerSer implements ApplicationListener<ContextRefreshedEvent> { static final Logger logger = LogManager.getLogger(QueueConsumerSer.class.getName()); @Autowired private CloudAccount cloudAccount; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Reference(version = "1.0.0") private HouseBuyNotifySer houseBuyNotifySer; @Reference(version = "1.0.0") private HouseBuySer houseBuySer; /** * 2.7查询类接口 */ @Reference(version = "1.0.0") private QuerySer querySer; /** * 服务启动后调用 */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() != null) { return; } logger.info("--buyTimeout--启动时,启动三个线程处理订单超时问题-start"); // 处理house_buy中的订单超时问题-start // 由3个线程,处理消息队列的的消息 for (int i = 0; i < 3; ++i) { threadPoolTaskExecutor.execute(new RunnableMessage()); } logger.info("--buyTimeout--启动时,启动三个线程处理订单超时问题-end"); // 处理house_buy中的订单超时问题-end } /** * 处理消息的线程<br> **/ class RunnableMessage implements Runnable { @Override public void run() { logger.info("--buyTimeout--启动1个线程处理订单超时问题--Thread.Id=" + Thread.currentThread().getId()); MNSClient client = cloudAccount.getMNSClient(); CloudQueue queue = client.getQueueRef(QueueCode.buyTimeout); boolean isExit = false;// 是否退出-消费消息 while (!isExit) { Message msg = queue.popMessage(); try { if (msg != null) { dealMessage(msg, queue); } } catch (Exception e) { logger.info("--buyTimeout--停止该线程处理订单超时问题--Thread.Id=" + Thread.currentThread().getId()); isExit = true;// 退出, threadPoolTaskExecutor.execute(new RunnableMessage()); } // 随机休眠0--300毫秒之间 try { Thread.currentThread(); Thread.sleep(new Random().nextInt(300)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 处理消息<br> * 处理完成后,删除消息<br> **/ private void dealMessage(Message msg, CloudQueue queue) { if (msg != null) { // house_buy中的id问题 String msgBody = msg.getMessageBody().toString(); msgBody = msgBody.replace("\"", ""); String id = msgBody; if (StringUtils.isNotEmpty(id)) { HouseBuy hb = houseBuySer.getById(id); // 订单没有处理 if (hb != null && HouseBuyBuyStatus.paynot.getValue().equals(hb.getBuyStatus())) { logger.info("处理一个超时的订单:house_buy中的id:" + id); /** * 2.7.13投资人投标申请查询<br> * 功能说明:查询单笔投资人投标申请。 */ BidApplyQueryReq bidApplyQueryReq = new BidApplyQueryReq(); // accountId 电子账号 A 19 M 投资人电子账号 bidApplyQueryReq.setAccountId(hb.getAccountId()); // orgOrderId 原订单号 A 30 M 原购买债权订单号 bidApplyQueryReq.setOrgOrderId(hb.getBuyOrderno()); // acqRes 请求方保留 A 200 C // bidApplyQueryReq.setAcqRes("acqRes"); BidApplyQueryRsp bidApplyQueryRsp = querySer.bidApplyQuery(bidApplyQueryReq); BidApplyRsp bidApplyRsp = new BidApplyRsp(); BeanUtils.copyProperties(bidApplyRsp, bidApplyQueryRsp); bidApplyRsp.setOrderId(hb.getBuyOrderno()); houseBuyNotifySer.houseBuyNotifyJx(bidApplyRsp); } } // 删除消息 queue.deleteMessage(msg.getReceiptHandle()); } } }
    转载请注明原文地址: https://ju.6miu.com/read-2149.html

    最新回复(0)