在上一篇《Ambari-server源码分析:agent-AgentResource类》 http://blog.csdn.net/chengyuqiang/article/details/61914712 的基础上,再来看另一个核心类:HeartBeatHandler。
该类位于org.apache.ambari.server.agent包下,如下图。
Ambari-server 的AgentResource类提供处理Ambari-agent请求的REST接口Ambari-server 的HeartBeatHander提供真正的处理服务,处理来自Ambari-agent的心跳,将信息传递到其他模块,并处理队列以发送心跳响应,将响应结果返回给AgentResource package org.apache.ambari.server.agent; import ...; /** * This class handles the heartbeats coming from the agent, passes on the information * to other modules and processes the queue to send heartbeat response. * 此类处理来自代理的心跳,将信息传递到其他模块,并处理队列以发送心跳响应。 */ @Singleton public class HeartBeatHandler { /** * Logger. */ private static final Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class); private static final Pattern DOT_PATTERN = Pattern.compile("\\."); private final Clusters clusterFsm; private final ActionQueue actionQueue; private final ActionManager actionManager; private HeartbeatMonitor heartbeatMonitor; private HeartbeatProcessor heartbeatProcessor; @Inject private Injector injector; @Inject private Configuration config; @Inject private AmbariMetaInfo ambariMetaInfo; @Inject private ConfigHelper configHelper; @Inject private AlertDefinitionHash alertDefinitionHash; @Inject private RecoveryConfigHelper recoveryConfigHelper; /** * KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances */ @Inject private KerberosIdentityDataFileReaderFactory kerberosIdentityDataFileReaderFactory; private Map<String, Long> hostResponseIds = new ConcurrentHashMap<String, Long>(); private Map<String, HeartBeatResponse> hostResponses = new ConcurrentHashMap<String, HeartBeatResponse>(); //构造器依赖注入 @Inject public HeartBeatHandler(Clusters fsm, ActionQueue aq, ActionManager am, Injector injector) { clusterFsm = fsm; actionQueue = aq; actionManager = am; heartbeatMonitor = new HeartbeatMonitor(fsm, aq, am, 60000, injector); heartbeatProcessor = new HeartbeatProcessor(fsm, am, heartbeatMonitor, injector); //TODO modify to match pattern injector.injectMembers(this); } public void start() { //启动心跳处理器 heartbeatProcessor.startAsync(); //启动心跳监控器 heartbeatMonitor.start(); } void setHeartbeatMonitor(HeartbeatMonitor heartbeatMonitor) { this.heartbeatMonitor = heartbeatMonitor; } public void setHeartbeatProcessor(HeartbeatProcessor heartbeatProcessor) { this.heartbeatProcessor = heartbeatProcessor; } public HeartbeatProcessor getHeartbeatProcessor() { return heartbeatProcessor; } //处理心跳 public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat) throws AmbariException { long now = System.currentTimeMillis(); if (heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) { //报告处理心跳的开始时间 heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now); } String hostname = heartbeat.getHostname(); Long currentResponseId = hostResponseIds.get(hostname); HeartBeatResponse response; //服务器重新启动或未知主机 if (currentResponseId == null) { //Server restarted, or unknown host. LOG.error("CurrentResponseId unknown for " + hostname + " - send register command"); // 无responseId, 新请求,就进行注册, responseId =0 return createRegisterCommand(); } LOG.debug("Received heartbeat from host" + ", hostname=" + hostname + ", currentResponseId=" + currentResponseId + ", receivedResponseId=" + heartbeat.getResponseId()); //接收到旧响应ID - 响应丢失 - 返回缓存响应 if (heartbeat.getResponseId() == currentResponseId - 1) { LOG.warn("Old responseId received - response was lost - returning cached response"); return hostResponses.get(hostname); } else if (heartbeat.getResponseId() != currentResponseId) { LOG.error("Error in responseId sequence - sending agent restart command"); // 心跳是历史记录,那么就要求其重启,重新注册,responseId 不变 return createRestartCommand(currentResponseId); } response = new HeartBeatResponse(); //responseId 加 1 , 返回一个新的responseId,下次心跳又要把这个responseId带回来。 response.setResponseId(++currentResponseId); Host hostObject; try { hostObject = clusterFsm.getHost(hostname); } catch (HostNotFoundException e) { LOG.error("Host: {} not found. Agent is still heartbeating.", hostname); if (LOG.isDebugEnabled()) { LOG.debug("Host associated with the agent heratbeat might have been " + "deleted", e); } // For now return empty response with only response id. return response; } //失去心跳,要求重新注册, responseId=0 if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) { // After loosing heartbeat agent should reregister LOG.warn("Host is in HEARTBEAT_LOST state - sending register command"); return createRegisterCommand(); } hostResponseIds.put(hostname, currentResponseId); hostResponses.put(hostname, response); // If the host is waiting for component status updates, notify it //如果主机正在等待组件状态更新,请通知它 //节点已经进行了注册,但是该节点还没有汇报相关状态信息,等待服务状态更新 if (heartbeat.componentStatus.size() > 0 && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) { try { LOG.debug("Got component status updates"); //更新服务状态机 hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now)); } catch (InvalidStateTransitionException e) { LOG.warn("Failed to notify the host about component status updates", e); } } if (heartbeat.getRecoveryReport() != null) { RecoveryReport rr = heartbeat.getRecoveryReport(); processRecoveryReport(rr, hostname); } try { if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) { //向状态机发送更新事件,更新节点至正常状态 hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now, heartbeat.getAgentEnv(), heartbeat.getMounts())); } else { // 把节点列入不健康 hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null)); } } catch (InvalidStateTransitionException ex) { LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex); hostObject.setState(HostState.INIT); return createRegisterCommand(); } /** * A host can belong to only one cluster. Though getClustersForHost(hostname) * returns a set of clusters, it will have only one entry. *主机只能属于一个集群。 通过getClustersForHost(hostname)返回一组集群,它只有一个条目。 * * TODO: Handle the case when a host is a part of multiple clusters. * 处理 主机是多个集群的一部分时的 情况。 */ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); if (clusters.size() > 0) { String clusterName = clusters.iterator().next().getClusterName(); if (recoveryConfigHelper.isConfigStale(clusterName, hostname, heartbeat.getRecoveryTimestamp())) { RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname); response.setRecoveryConfig(rc); if (response.getRecoveryConfig() != null) { LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString()); } } } heartbeatProcessor.addHeartbeat(heartbeat); // Send commands if node is active if (hostObject.getState().equals(HostState.HEALTHY)) { sendCommands(hostname, response); annotateResponse(hostname, response); } return response; } protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException { LOG.debug("Received recovery report: " + recoveryReport.toString()); Host host = clusterFsm.getHost(hostname); host.setRecoveryReport(recoveryReport); } /** * Adds commands from action queue to a heartbeat response. * 将操作队列中的命令添加到心跳响应。 */ protected void sendCommands(String hostname, HeartBeatResponse response) throws AmbariException { List<AgentCommand> cmds = actionQueue.dequeueAll(hostname); if (cmds != null && !cmds.isEmpty()) { for (AgentCommand ac : cmds) { try { if (LOG.isDebugEnabled()) { LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac)); } } catch (Exception e) { throw new AmbariException("Could not get jaxb string for command", e); } switch (ac.getCommandType()) {//根据命令类型处理 //背景执行命令 case BACKGROUND_EXECUTION_COMMAND: //执行命令 case EXECUTION_COMMAND: { //将AgentCommand强制转换为ExecutionCommand ExecutionCommand ec = (ExecutionCommand) ac; LOG.info("HeartBeatHandler.sendCommands: sending ExecutionCommand for host {}, role {}, roleCommand {}, and command ID {}, task ID {}", ec.getHostname(), ec.getRole(), ec.getRoleCommand(), ec.getCommandId(), ec.getTaskId()); Map<String, String> hlp = ec.getHostLevelParams(); if (hlp != null) { String customCommand = hlp.get("custom_command"); if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) { LOG.info(String.format("%s called", customCommand)); try { injectKeytab(ec, customCommand, hostname); } catch (IOException e) { throw new AmbariException("Could not inject keytab into command", e); } } } response.addExecutionCommand((ExecutionCommand) ac); break; } case STATUS_COMMAND: { response.addStatusCommand((StatusCommand) ac); break; } case CANCEL_COMMAND: { response.addCancelCommand((CancelCommand) ac); break; } case ALERT_DEFINITION_COMMAND: { response.addAlertDefinitionCommand((AlertDefinitionCommand) ac); break; } case ALERT_EXECUTION_COMMAND: { response.addAlertExecutionCommand((AlertExecutionCommand) ac); break; } default: LOG.error("There is no action for agent command =" + ac.getCommandType().name()); } } } } public String getOsType(String os, String osRelease) { String osType = ""; if (os != null) { osType = os; } if (osRelease != null) { String[] release = DOT_PATTERN.split(osRelease); if (release.length > 0) { osType += release[0]; } } return osType.toLowerCase(); } protected HeartBeatResponse createRegisterCommand() { HeartBeatResponse response = new HeartBeatResponse(); RegistrationCommand regCmd = new RegistrationCommand(); response.setResponseId(0); response.setRegistrationCommand(regCmd); return response; } protected HeartBeatResponse createRestartCommand(Long currentResponseId) { HeartBeatResponse response = new HeartBeatResponse(); response.setRestartAgent(true); response.setResponseId(currentResponseId); return response; } //注册响应 public RegistrationResponse handleRegistration(Register register) throws InvalidStateTransitionException, AmbariException { String hostname = register.getHostname(); int currentPingPort = register.getCurrentPingPort(); long now = System.currentTimeMillis(); String agentVersion = register.getAgentVersion(); String serverVersion = ambariMetaInfo.getServerVersion(); if (!VersionUtils.areVersionsEqual(serverVersion, agentVersion, true)) { LOG.warn("Received registration request from host with non compatible" + " agent version" + ", hostname=" + hostname + ", agentVersion=" + agentVersion + ", serverVersion=" + serverVersion); throw new AmbariException("Cannot register host with non compatible" + " agent version" + ", hostname=" + hostname + ", agentVersion=" + agentVersion + ", serverVersion=" + serverVersion); } String agentOsType = getOsType(register.getHardwareProfile().getOS(), register.getHardwareProfile().getOSRelease()); LOG.info("agentOsType = " + agentOsType); if (!ambariMetaInfo.isOsSupported(agentOsType)) { LOG.warn("Received registration request from host with not supported" + " os type" + ", hostname=" + hostname + ", serverOsType=" + config.getServerOsType() + ", agentOsType=" + agentOsType); throw new AmbariException("Cannot register host with not supported" + " os type" + ", hostname=" + hostname + ", serverOsType=" + config.getServerOsType() + ", agentOsType=" + agentOsType); } Host hostObject; try { hostObject = clusterFsm.getHost(hostname); } catch (HostNotFoundException ex) { clusterFsm.addHost(hostname); hostObject = clusterFsm.getHost(hostname); } // Resetting host state hostObject.setState(HostState.INIT); // Set ping port for agent hostObject.setCurrentPingPort(currentPingPort); // Get status of service components List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname); // Add request for component version for (StatusCommand command : cmds) { command.getCommandParams().put("request_version", String.valueOf(true)); } // Save the prefix of the log file paths hostObject.setPrefix(register.getPrefix()); hostObject.handleEvent(new HostRegistrationRequestEvent(hostname, null != register.getPublicHostname() ? register.getPublicHostname() : hostname, new AgentVersion(register.getAgentVersion()), now, register.getHardwareProfile(), register.getAgentEnv())); RegistrationResponse response = new RegistrationResponse(); if (cmds.isEmpty()) { //No status commands needed let the fsm know that status step is done hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now)); } response.setStatusCommands(cmds); response.setResponseStatus(RegistrationStatus.OK); // force the registering agent host to receive its list of alert definitions List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname); response.setAlertDefinitionCommands(alertDefinitionCommands); response.setAgentConfig(config.getAgentConfigsMap()); if (response.getAgentConfig() != null) { LOG.debug("Agent configuration map set to " + response.getAgentConfig()); } /** * A host can belong to only one cluster. Though getClustersForHost(hostname) * returns a set of clusters, it will have only one entry. * * TODO: Handle the case when a host is a part of multiple clusters. */ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); if (clusters.size() > 0) { String clusterName = clusters.iterator().next().getClusterName(); RecoveryConfig rc = recoveryConfigHelper.getRecoveryConfig(clusterName, hostname); response.setRecoveryConfig(rc); if (response.getRecoveryConfig() != null) { LOG.info("Recovery configuration set to " + response.getRecoveryConfig().toString()); } } Long requestId = 0L; hostResponseIds.put(hostname, requestId); response.setResponseId(requestId); return response; } /** * Annotate the response with some housekeeping details. * hasMappedComponents - indicates if any components are mapped to the host * hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent yet) * clusterSize - indicates the number of hosts that form the cluster * @param hostname * @param response * @throws org.apache.ambari.server.AmbariException */ private void annotateResponse(String hostname, HeartBeatResponse response) throws AmbariException { for (Cluster cl : clusterFsm.getClustersForHost(hostname)) { response.setClusterSize(cl.getClusterSize()); List<ServiceComponentHost> scHosts = cl.getServiceComponentHosts(hostname); if (scHosts != null && scHosts.size() > 0) { response.setHasMappedComponents(true); break; } } if (actionQueue.hasPendingTask(hostname)) { LOG.debug("Host " + hostname + " has pending tasks"); response.setHasPendingTasks(true); } } /** * Response contains information about HDP Stack in use * @param clusterName * @return @ComponentsResponse * @throws org.apache.ambari.server.AmbariException */ public ComponentsResponse handleComponents(String clusterName) throws AmbariException { ComponentsResponse response = new ComponentsResponse(); Cluster cluster = clusterFsm.getCluster(clusterName); StackId stackId = cluster.getCurrentStackVersion(); if (stackId == null) { throw new AmbariException("Cannot provide stack components map. " + "Stack hasn't been selected yet."); } StackInfo stack = ambariMetaInfo.getStack(stackId.getStackName(), stackId.getStackVersion()); response.setClusterName(clusterName); response.setStackName(stackId.getStackName()); response.setStackVersion(stackId.getStackVersion()); response.setComponents(getComponentsMap(stack)); return response; } private Map<String, Map<String, String>> getComponentsMap(StackInfo stack) { Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(); for (ServiceInfo service : stack.getServices()) { Map<String, String> components = new HashMap<String, String>(); for (ComponentInfo component : service.getComponents()) { components.put(component.getName(), component.getCategory()); } result.put(service.getName(), components); } return result; } /** * Gets the {@link AlertDefinitionCommand} instances that need to be sent for * each cluster that the registering host is a member of. * * @param hostname * @return * @throws AmbariException */ private List<AlertDefinitionCommand> getRegistrationAlertDefinitionCommands( String hostname) throws AmbariException { Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname); if (null == hostClusters || hostClusters.size() == 0) { return null; } List<AlertDefinitionCommand> commands = new ArrayList<AlertDefinitionCommand>(); // for every cluster this host is a member of, build the command for (Cluster cluster : hostClusters) { String clusterName = cluster.getClusterName(); alertDefinitionHash.invalidate(clusterName, hostname); List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions( clusterName, hostname); String hash = alertDefinitionHash.getHash(clusterName, hostname); AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName, hostname, hash, definitions); command.addConfigs(configHelper, cluster); commands.add(command); } return commands; } /** * Insert Kerberos keytab details into the ExecutionCommand for the SET_KEYTAB custom command if * any keytab details and associated data exists for the target host. * * @param ec the ExecutionCommand to update * @param command a name of the relevant keytab command * @param targetHost a name of the host the relevant command is destined for * @throws AmbariException */ void injectKeytab(ExecutionCommand ec, String command, String targetHost) throws AmbariException { String dataDir = ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY); if (dataDir != null) { KerberosIdentityDataFileReader reader = null; List<Map<String, String>> kcp = ec.getKerberosCommandParams(); try { reader = kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(new File(dataDir, KerberosIdentityDataFileReader.DATA_FILE_NAME)); for (Map<String, String> record : reader) { String hostName = record.get(KerberosIdentityDataFileReader.HOSTNAME); if (targetHost.equalsIgnoreCase(hostName)) { if ("SET_KEYTAB".equalsIgnoreCase(command)) { String keytabFilePath = record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH); if (keytabFilePath != null) { String sha1Keytab = DigestUtils.sha1Hex(keytabFilePath); File keytabFile = new File(dataDir + File.separator + hostName + File.separator + sha1Keytab); if (keytabFile.canRead()) { Map<String, String> keytabMap = new HashMap<String, String>(); String principal = record.get(KerberosIdentityDataFileReader.PRINCIPAL); String isService = record.get(KerberosIdentityDataFileReader.SERVICE); keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName); keytabMap.put(KerberosIdentityDataFileReader.SERVICE, isService); keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT)); keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal); keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath); keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME)); keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS)); keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME)); keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS)); BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile)); byte[] keytabContent = null; try { keytabContent = IOUtils.toByteArray(bufferedIn); } finally { bufferedIn.close(); } String keytabContentBase64 = Base64.encodeBase64String(keytabContent); keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64); kcp.add(keytabMap); } } } else if ("REMOVE_KEYTAB".equalsIgnoreCase(command)) { Map<String, String> keytabMap = new HashMap<String, String>(); keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName); keytabMap.put(KerberosIdentityDataFileReader.SERVICE, record.get(KerberosIdentityDataFileReader.SERVICE)); keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT)); keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, record.get(KerberosIdentityDataFileReader.PRINCIPAL)); keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH)); kcp.add(keytabMap); } } } } catch (IOException e) { throw new AmbariException("Could not inject keytabs to enable kerberos"); } finally { if (reader != null) { try { reader.close(); } catch (Throwable t) { // ignored } } } ec.setKerberosCommandParams(kcp); } } }