java socket client简单的检测断开并重连发送队列消息实现

    xiaoxiao2022-06-29  58

    package com.pam.push; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.LinkedList; import java.util.Properties; import java.util.logging.Logger; import org.springframework.stereotype.Component; import com.pam.enums.Constants; import com.pam.utils.PropertiesUtil; @Component(value="pushClient") public class PushClient { Logger logger = Logger.getLogger("Client"); private String host; private Integer port; private Socket client; private OutputStream os; private LinkedList<Object> msgList = new LinkedList<Object>(); private Thread sendMessageThread; public PushClient() { super(); logger.info("\n\n#############加载PushClient\n"); sendMessageThread = new Thread(new Runnable() { @Override public void run() { Properties properties = null; while (true) { try { if (null != client) { try {//发送检测是否断开 client.sendUrgentData(0xFF); } catch (Exception e) {//断开产生异常,关闭对象 os.close(); client.close(); os = null; client = null; e.printStackTrace(); } if (msgList.size() == 0) { Thread.sleep(3000); logger.info("\n\n待发送的消息条数为0\n"); continue; } logger.info("\n\n待发送的消息条数:"+msgList.size()+"\n"); for (int i = 0; i < msgList.size(); i++) { Thread.sleep(500); logger.info("\n\n发送第"+(i+1)+"条消息:"+msgList.get(i).toString()+"\n"); os.write(msgList.get(i).toString().getBytes()); os.flush(); } msgList.clear(); } else { logger.info("\n\n重新连接中...\n"); properties = PropertiesUtil.getProperties(Constants.PROPERTIES_PATH); String push_ip = PropertiesUtil.getAttribute(properties, Constants.PUSH_IP); Integer push_port = Integer.valueOf(PropertiesUtil.getAttribute(properties, Constants.PUSH_PORT)); logger.info("\n\n连接IP:"+push_ip+"\n连接PORT:"+push_port); client = new Socket(push_ip, push_port); os = client.getOutputStream(); } } catch (Exception e) { try { logger.info("\n\n连接失败,继续连接\n"); Thread.sleep(10*1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } } } }); sendMessageThread.start(); } /** * 出 * @return */ public Object getMsg() { synchronized (this) { if (msgList != null && msgList.size() > 0) { return msgList.removeFirst(); } return null; } } /** * 入 * @param obj * @return */ public Object addMsg(Object obj) { synchronized (this) { msgList.addLast(obj); } return obj; } /** * 发送消息 * @param data */ public void sendMessage(String data) { addMsg(data); } /** * 将一个字符串转化为输入流 * @param sInputString * @return */ public static InputStream getStringStream(String sInputString){ if(null != sInputString && !sInputString.trim().equals("")){ try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(sInputString.getBytes()); return byteArrayInputStream; } catch (Exception e) { e.printStackTrace(); } } return null; } public Logger getLogger() { return logger; } public void setLogger(Logger logger) { this.logger = logger; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public Socket getClient() { return client; } public void setClient(Socket client) { this.client = client; } public OutputStream getOs() { return os; } public void setOs(OutputStream os) { this.os = os; } public LinkedList<Object> getMsgList() { return msgList; } public void setMsgList(LinkedList<Object> msgList) { this.msgList = msgList; } public Thread getSendMessageThread() { return sendMessageThread; } public void setSendMessageThread(Thread sendMessageThread) { this.sendMessageThread = sendMessageThread; } }

    其中constants是我自己定义的一些常量数据,而propertiesutil是一个操作properties文件的工具类。

    该类在spring web项目启动时会初始化加载,线程就会启动。

    转载请注明原文地址: https://ju.6miu.com/read-1124928.html

    最新回复(0)