现在公司开发的项目中用到了工作流,在进入本公司之前本屌丝还没有接触过工作流,所以对工作流的工作原理一直很感兴趣,一直在默默的学习,终于有一天本屌突然看懂了源码(大雾,现在我觉得看懂了可能只是骗我自己,好可怕),而且今天终于有时间来整理一下工作流,以备大家指正。 本屌公司之前的CTO是阿里的大牛,所以整个公司的项目框架我们可以看出很多阿里的风格,能接触这么厉害的技术,本屌心里好开心,废话不说了,回归正题: 项目中应用Spring MVC+Maven,简单粗暴,工作流的作用呢,就是确保数据流程完整,保持状态一致正确,有倒计时功能的状态可以自动更新状态,发布事件。如果有监听事件的处理器就可以调用相关动作了。
那么工作流除了业务环境中的手动推进,今天我们主要讲的就是超时事件,当前状态可能只能保持10分钟或者配置文件中的特定时间,超过此事件状态失效,状态流转为time_out并且发布状态超时事件。我们准备讲一个实际状态,审核状态,如果审核中的状态持续了1个小时而没有人手动审核的话,超时。
第一。我们定义了业务的状态流转模型
我们可以看到等待复核状态后面有三个相应状态,标红的复核超时就是工作流推动的。
第二,把模型转换成XML配置
我们在项目中按照业务流程把需要工作流推动状态的几个模型的状态流转转换为工作流的模板定义。看XML
<wf_def> <name>transit_task</name> <desc>运输流程</desc> <next_action_name>start</next_action_name> <type_val_in_id>WF03</type_val_in_id> <wf_action_list> <wf_action> <pvg_required> <key>*</key> <user>*</user> </pvg_required> <need_lock>true</need_lock> <name>start</name> <desc>开始</desc> <executer_name>BLANK_ACTION</executer_name> <next_status_list> <next_status> <caseExpr>*</caseExpr> <status>in_transit</status> </next_status> </next_status_list> </wf_action>
</wf_action> <!-- <wf_action> <pvg_required> <key>*</key> <user>*</user> </pvg_required> <need_lock>true</need_lock> <name>jyy_arrive_proof</name> <desc>交易员确认到货</desc> <executer_name>BLANK_ACTION</executer_name> <next_status_list> <next_status> <caseExpr>*</caseExpr> <status>jyy_arrived</status> </next_status> </next_status_list> </wf_action> --> </wf_action_list> <wf_status_list> <!-- 取消开始运输 <wf_status> <name>wait_transit</name> <desc>待运输</desc> <next_action_list> <next_action> <name>start_transit</name> </next_action> </next_action_list> <action_relation>XOR</action_relation> <finish>false</finish> </wf_status> --><wf_status> <name>force_discard</name> <desc>强制作废</desc> <finish>true</finish> </wf_status> </wf_status_list>
屌大的一眼看明白了这个XML配置的主要信息是什么了,一个是wf_status_list,一个是wf_action_list。他们的list顺序就代表状态的流转顺序,不能打乱。
第三。加载我们的工作流实例配置XML
下面这个xml配置就是启动ConfigLoader扫描我们手动配置的工作流的模板的a.b.c.xml文件(即 上面我们说的wf_status_list 和wf_action_list);
<bean id="wf_configLoader" class="com.opengroup.hongshi.wf.biz.def.config.ConfigLoader" scope="singleton" init-method="init" > <constructor-arg> <list> <value>wf_def_config/a.xml</value> <value>wf_def_config/b.xml</value> <value>wf_def_config/c.xml</value> </list> </constructor-arg> </bean>
我们看ConfigLoader的代码
/** * 初始化 */ public void init() { try { XStream xstream = new XStream();//这个类厉害了。到现在我还没搞明白这个类是干什么的 xstream.processAnnotations(WfDef.class);//WfDef这个类里面包装了所有的a或者b或者c工作流XML里面的wf_status_list 和wf_action_list, xstream.processAnnotations(WfAction.class); xstream.processAnnotations(WfStatus.class); xstream.processAnnotations(NextStatus.class); xstream.processAnnotations(NextAction.class); xstream.processAnnotations(PvgRequired.class);//我们就假装这个xstream 把这几个对象里面的标签全部预存起来了,从xml里面读取之后再全部回填到对象里面 xstream.autodetectAnnotations(true); for (String file : files) {//files是我们构造函数里面的 a b c 那三个xml String fileContent = ClasspathFileUtil.readAsStr(file);//残忍,xml被读成了String fileContent = StringUtil.fillReplacement(fileContent, wfProperties);//真残忍,String中的占位符都被实际的配置内容替换掉了 WfDef wfDef = (WfDef) xstream.fromXML(fileContent);//吊炸天的一步操作,当初读取的所有XML都赋值到该对象的相应标签里面了 wfDef.init(); if (wfDef.isAllValid()) { WfDefRegister.register(wfDef); } else { throw new CriticalSystemError("工作流定义的领域对象值校验不通过"); }
最近本屌丝正好在学习core java,第二卷就讲到XML的读写,关于XML也只是皮毛,有很多实际项目中的应用已经超乎书内部分讲的了,所以我们只讲配置。
Spring MVC 最喜欢的事情是什么,就是配置XML,虽然已经有关于注释的配置,但是我们项目还是有很多配置要手动配置XML,为什么呢,因为XML配置明显易懂,可以集中管理。缺点呢就是打字费手。看XML配置
第四,工作流的实现机制
那么工作流到底是怎么检测到超时实例的呢。
那就要从实现机制说起了,Spring有个很神奇的功能叫做Quartz.相信很多盆友都了解,关于Quartz以后单独讲,我们知道他就是一个定时器任务,可以在我们配置的任何时间启动,实现相应的功能。我们项目中的工作流就是依托了Quartz实现了工作流的超时动作。当我们的服务启动之后,quartz就很勤奋的一直在后台准备着,一旦到了该他启动的时候,他就会准时跑起来做自己的工作,很像准时的哨兵,到点准时巡逻。抓到超时没有完成作业的家伙一顿大棒槌砸你小胸胸(大雾!)
<!-- 检查工作流实例有效性的工作类,主要是处理失效问题的timer --> <bean id="wf_quartzWfValidCheck_job" class="com.opengroup.hongshi.wf.biz.quartz.QuartzWfValidTimeCheck"></bean> <!-- 定义调用对象和调用对象的方法 --> <bean id="wf_quartzWfValidCheck_jobtask" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <!-- 调用的类 --> <property name="targetObject"> <ref bean="wf_quartzWfValidCheck_job" /> </property> <!-- 调用类中的方法 --> <property name="targetMethod"> <value>work</value> </property> </bean>
第一个Bean,即QuartzWfValidTimeCheck是我们自己写的检查工作流超时的类。显然第一个bean被系统装配到第二个工厂bean里面了(关于工厂模式我们单开一章)。每到我们配置的时间,工厂Bean就会生成一个QuartzWfValidTimeCheck来实现我们要求的功能。实际代码如下:
/** * 执行真实业务 */ public void doBiz() { List<WfInstance> list = wfInstanceRepository.queryExpiredNonFinish();//这一句是关键,检查数据库中所有的未完结状态的工作流是否超时。 if (list != null) { for (final WfInstance anInstance : list) { try { LogUtil.info(logger, "do timeout for:" + anInstance.getId()); WfProcessRequestDTO request = new WfProcessRequestDTO(); request.setProcessUid("SYSTEM_TIMER"); request.setActionName(anInstance.getCurrentStatus() + "_timeout");// request.setWfInstanceId(anInstance.getId()); wfProcessProcessor.runBiz(request);//这一句也是关键,把当前的实例状态更改为anInstance.getCurrentStatus() + "_timeout" 状态 } catch (BizException e) { LogUtil.error(WfLoggers.ERROR_LOGGER, "业务异常:[" + e.getCode() + "][" + e.getMsg() + "]", e); } catch (Exception e) { LogUtil.error(WfLoggers.ERROR_LOGGER, "系统异常:", e); } } } }
第五,定时器调用的工作流推进器
标红的 wfProcessProcessor 对象是我们用Spring 注入的,他是一个继承了AbstractWfProcessor的工作流推进器。提到这个AbstractWfProcessor就厉害了。他是一个抽象类,是业务处理的基类。(抽象类的概念我们再复习一遍,有抽象方法的类叫抽象类,抽象类可以有实现方法,继承了抽象类的子类要实现抽象类的方法,是不是跟接口很像呢),这个基类其实已经实现了大部分的数据验证,日志处理,事件发布功能,单独留了一个 process()抽象方法,这个就是更新数据的操作了,看源码
public Object runBiz(Serializable params) { //第二个事务,做具体的工作流逻辑 return transactionTemplate.execute(new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { String id = extractId(); LogUtil.info(logger, "start for wf[" + this.getClass().getName() + "]:" + id); //创建模型 WfInstance model = createModel(id); //设置模型到上下文 WfContext.setModel(model); //取业务校验器,并校验 WfValidator validator = getBizValidator(); if (validator != null) { validator.validate(); } //保存请求入参 storeInputForm(); //执行业务 LogUtil.info(logger, "start process wf[" + this.getClass().getName() + "]:" + id); process(); LogUtil.info(logger, "end process wf[" + this.getClass().getName() + "]:" + id); //发同步事件 eventPublisher.publish(constructSyncEvent()); //发异步事件 eventPublisher.publish(constructAsynEvent()); //检查父工作流 // checkParentFinishWf(); return model; } }); }
那么我们看到了抽象类完成了大部分的操作,包括事件发布, 我们待会细讲事件发布
第六。继承了抽象工作流基类的实例推进器
那看我们自己注入的工作流推进器是怎么实现process的。
@Override protected void process() { WfProcessRequestDTO request = (WfProcessRequestDTO) WfContext.getRequest(); LogUtil.info(logger, "process process:" + JSON.toJSONString(request)); WfInstance wfInstance = WfContext.getModel(); wfInstance.setGmtModified(new Date()); wfInstance.setLastModifiedUid(request.getProcessUid()); WfDef wfDef = wfInstance.getWfDef(); WfAction wfAction = wfDef.getActionPool().get(request.getActionName()); PvgRequired pvgRequired = wfAction.getPvgRequired();//标红的这三句厉害了Spring启动加载配置的时候,已经把我们配置的所有工作流的内容已经全部加载完毕,这个地方就是验证符不符合工作流的流动顺序 if (hasPvg(pvgRequired, request.getProcessUid())) { String actionResult = wfAction.execute(wfInstance.convertDTO(), request.getWfInputFormDTO()); String nextStatus = routeToStatus(wfAction, actionResult); wfInstance.setCurrentStatus(nextStatus); WfStatus wfStatus = wfDef.getStatusPool().get(nextStatus); if (wfStatus == null) { throw new BizException(ErrorCodeConst.WF_NO_SUCH_STATUS_ERROR, "没有找到这样的状态[" + nextStatus + "]"); } if (request.getNextStatusEndDate() == null) { LogUtil.info(logger, "NextStatusEndDate is null[" + wfInstance.getId() + "][" + nextStatus + "]"); putDefindEndDate(wfInstance, wfStatus); } else { LogUtil.info(logger, "NextStatusEndDate isnot null[" + wfInstance.getId() + "][" + nextStatus + "]"); wfInstance.setEndDate(request.getNextStatusEndDate()); } wfInstance.setFinish(wfStatus.isFinish()); wfInstanceRepository.modifyStatus(wfInstance);//这个地方就是更新wf数据库中工作流的实例的状态。 } else { throw new BizException(ErrorCodeConst.WF_USER_NO_PERMISSION, "当前的用户没有权限"); } }
显然这个时候,wf数据库中的工作流的实例已经更新完状态了,但是我们生产数据库中的业务实例状态还没有更改,这怎么办呢(兄弟们别怕,赶紧抱紧我,大雾),上面我们已经写到抽象基类里面的事件发布代码,有同步事件和异步事件,这个地方大家可能明白了,通过监听事件,我们可以更改我们相应业务实例的状态。
第七,事件发布器
事件发布器之前我一直没理解,觉得这个机制好神奇,你只是做了一件事情,别人就知道你干过什么。就像是监听!
那我们就来看下事件发布和监听事件。
发布时间我们在上面的AbstractWfProcessor的方法中标注了 eventPublisher.publish(constructSyncEvent());就是这个方法,把超时动作包装成我们约定的消息发布出去,
跟进代码public class EventPublisher implements IEventPublisher { /** * 发布事件 * @see com.opengroup.middleware.event.IEventPublisher#publish(com.opengroup.middleware.event.UniformEvent) */ public void publish(UniformEvent event) { switch (event.getUniformEventType()) { case JVM_SYNC: //jvm内部的同步消息 JvmSyncEventPublisher.publish(event); break; case JVM_ASYN: //jvm内部的异步消息 JvmAsynEventPublisher.publish(event); break; default: throw new UniformEventException("不被支持的消息体类型:" + event.getUniformEventType()); } } 发现进入了framework框架模块,跟进去publish方法
/** * 发布消息。 等于把该消息对应的 {@link IEventHandler}全部执行一遍doHandler方法 * @see com.opengroup.middleware.event.IEventPublisher#publish(com.opengroup.middleware.event.UniformEvent) */ public static void publish(UniformEvent event) { LogUtil.debug(logger, "开始发布消息", event); Set<IEventHandler> set = JvmSyncHandlerRegister.fetchHandlers(event.getTopic(), event.getEventCode());//Jvm三个大字震古烁今,明显就告诉我们这个类与Jvm同生共死 if (set != null) { for (IEventHandler handler : set) { //先在内存中复制一份全新的对象,以保证对象之间不会相互串改。 UniformEvent newUe = BeanUtil.copy(event, UniformEvent.class); LogUtil.debug(logger, "执行doHandler", handler.getClass()); try { PerfLog.printStart(handler.getClass().getName() + ".doHandler(SNY_E)"); handler.doHandler(newUe); } finally { PerfLog.printEnd(handler.getClass().getName() + ".doHandler(SNY_E)"); }
重点标红,这里有一个JVMSyncHandlerRegister的静态方法,跟进代码
public static Set<IEventHandler> fetchHandlers(String topic, String eventCode) { Set<IEventHandler> result = new HashSet<IEventHandler>(); Set<IEventHandler> baseTopic = baseTopicHanlders.get(topic); if (baseTopic != null) { result.addAll(baseTopic); } if (baseEventHanlders.get(topic) != null) { Set<IEventHandler> baseEventCode = baseEventHanlders.get(topic).get(eventCode); if (baseEventCode != null) { result.addAll(baseEventCode); } } return result; }
我们看到了这两个方法,
/** * 只监听topic的handler */ private static Map<String, Set<IEventHandler>> baseTopicHanlders = new HashMap<String, Set<IEventHandler>>(); /** * 监听topic+eventCode的handler */ private static Map<String, Map<String, Set<IEventHandler>>> baseEventHanlders = new HashMap<String, Map<String, Set<IEventHandler>>>();
第八,时间处理器定义
<!-- 事件处理器定义 开始--> <bean id="carrier_auctionBidEndEventHandler_target" class="com.opengroup.hongshi.carrier.biz.event.AuctionBidEndEventHandler"></bean> <bean id="carrier_auctionBidEndEventHandler" class="com.opengroup.middleware.event.EventHandler" init-method="init"> <property name="topic" value="TP_WF_SYNC_auction"></property> <property name="eventCode" value="EC_WF_in_audit"></property> <property name="eventType" value="JVM_SYNC"></property> <property name="target" ref="carrier_auctionBidEndEventHandler_target"></property> </bean>
在jvm加载Spring的时候,EventHandler这个类就开始实例化了,我们跟进代码
*/ public void init() { switch (eventType) { case JVM_ASYN: JvmAsynHandlerRegister.registeHandler(topic, eventCode, target); break; case JVM_SYNC: JvmSyncHandlerRegister.registeHandler(topic, eventCode, target); break; default: throw new CriticalSystemError("该类型的事件类型还不支持[" + this.eventType + "]"); } }
流程结束。
从上面我们可以看到,JVM启动,Spring加载配置的时候,1,把工作流的实例配置xml通过ConfigLoader加载到了WfDefRegister中,2,把事件监听通过EventHandler注册到JvmSyncHandlerRegister中。3,把quartz加载,定时执行超时任务,wfProcessProcessor发布消息, JvmSyncEventPublisher.publish(event);。4,JvmSyncHandlerRegister相应的处理器执行
