基於TCP/IP協議的socket通訊server
阿新 • • 發佈:2017-12-07
while resource err close 通訊 ice inpu utils 緩沖
思路:
socket必須要隨項目啟動時啟動,所以需用Spring自帶的監聽器,需要保持長連接,要用死循環,所以必須另外起線程,不能阻礙主線程運行
1.在項目的web.xml中配置listener
<listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <listener> <listener-class>com.ra.car.utils.MyListener</listener-class> </listener>
2.因為是一個獨立的線程,所以需要調用的註入類不能通過@resource或@aotowire註入,需要應用上下文獲取
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd"> <!-- 掃描包加載Service實現類 --> <context:component-scan base-package="com.ra.*.service.impl"></context:component-scan> <bean id="DataCallBackService" class="com.ra.truck.service.impl.DataCallBackServiceImpl"/> <bean id="RdTrackInfoService" class="com.ra.truck.service.impl.RdTrackInfoServiceImpl"/> <bean id="OutInterfaceService" class="com.ra.truck.service.impl.OutInterfaceImpl"/> <bean id="RdPhotoInfoService" class="com.ra.truck.service.impl.RdPhotoInfoServiceImpl"/> <bean id="MessagePackegerService" class="com.ra.truck.service.impl.MessagePackegerServiceImpl"/> <!--<bean id="redis" class="com.ra.redis.service.impl.JedisClientCluster"/>--> </beans>
3.創建listener監聽器類
package com.ra.car.utils; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.ra.car.rabbitMQ.PBWRabbitMQCustomer; import com.ra.car.rabbitMQ.RabbitMQCustomer; /** * listener監聽器類* */ public class MyListener implements ServletContextListener { protected static final Logger logge = LoggerFactory .getLogger(MyListener.class); @Override public void contextInitialized(ServletContextEvent arg0) { //必須單獨啟線程去跑listener Mythread myThread = new Mythread(); //創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程 // ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // cachedThreadPool.execute(myThread); Thread thread = new Thread(myThread); thread.start(); //啟動MQTT // MQTTSubMsg client = new MQTTSubMsg(); // client.start(); RabbitMQCustomer customer=new RabbitMQCustomer(); Thread threadCustomer = new Thread(customer); threadCustomer.start(); PBWRabbitMQCustomer pbwcustomer=new PBWRabbitMQCustomer(); Thread pbwT = new Thread(pbwcustomer); pbwT.start(); } @Override public void contextDestroyed(ServletContextEvent arg0) { logge.info("進入ListenerUtil的contextDestroyed方法........."); } }
package com.ra.car.utils; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.text.SimpleDateFormat; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 多線程類 * */ public class Mythread implements Runnable{ protected static final Logger logge = LoggerFactory .getLogger(Mythread.class); @Override public void run() { logge.info("進入ListenerUtil的contextInitialized方法........."); try { ServerSocket serverSocket = new ServerSocket(8888); logge.info("socket通信服務端已啟動,等待客戶端連接......."); logge.info("我是111111111111111"); while (true) { Socket socket = serverSocket.accept();// 偵聽並接受到此套接字的連接,返回一個Socket對象 JavaTCPServer socketThread = new JavaTCPServer(socket); socketThread.run(); try { //休眠10毫秒,壓力測試50000次連接無壓力 Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (IOException e) { logge.error("通信服務器啟動失敗!", e); } } public static String stampToDate(String s){ Long timestamp = Long.parseLong(s)*1000; String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp)); return date; } }
package com.ra.car.utils; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JavaTCPServer { protected static final Logger logger=LoggerFactory.getLogger(JavaTCPServer.class); private Socket socket; public JavaTCPServer(Socket socket) { this.socket = socket; } public void run() { MyThread2 myThread2=null; try { myThread2 = new MyThread2(socket); } catch (IOException e) { e.printStackTrace(); } ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); cachedThreadPool.execute(myThread2); } }
package com.ra.car.utils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.ra.truck.model.RdDeviceCallBackDataDomain; import com.ra.truck.service.DataCallBackService; import com.ra.truck.service.RdPhotoInfoService; import com.ra.truck.service.RdTrackInfoService; import com.ra.truck.service.outInterface.OutInterfaceService; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.context.ContextLoader; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; import java.text.SimpleDateFormat; import java.util.*; public class MyThread2 implements Runnable { protected static final Logger logger = LoggerFactory .getLogger(MyThread2.class); private Socket socket; private InputStream inputStream; private OutputStream outputStream; private PrintWriter printWriter; private int totalCount; //總數量 private int adasCount; // 傳輸的ADAS信號數量 private int gpsCount; // 傳輸的GPS信號數量 private DataCallBackService dataCallBackService;//數據回傳private SimpleDateFormat df; public MyThread2(Socket socket) throws IOException { this.socket = socket; inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); printWriter = new PrintWriter(outputStream); dataCallBackService=(DataCallBackService) ContextLoader.getCurrentWebApplicationContext().getBean("DataCallBackService"); df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } @Override public void run() { // 根據輸入輸出流和客戶端連接 // 得到一個輸入流,接收客戶端傳遞的信息 // InputStreamReader inputStreamReader = new InputStreamReader( // inputStream);// 提高效率,將自己字節流轉為字符流 // bufferedReader = new BufferedReader(inputStreamReader);// 加入緩沖區 Date timestart = new Date(); Date timeend = null; long minuine = 0; int count = 0; while (true) { try { if (inputStream.available() > 0 == false) { timeend = new Date(); minuine = timeend.getTime() - timestart.getTime(); if (minuine != 0 && (minuine / 1000) > 60) { break; } continue; } else { timestart = new Date(); try { Thread.sleep(200); } catch (InterruptedException e) { logger.error("*****線程休眠出現異常*****", e); } count = inputStream.available(); byte[] b = new byte[count]; int readCount = 0; // 已經成功讀取的字節的個數 while (readCount < count) { readCount += inputStream.read(b, readCount, count - readCount); } logger.info("**********當前服務器正在被連接**********"); logger.info("正在連接的客戶端IP為:" + socket.getInetAddress().getHostAddress()); logger.info("當前時間為:" + df.format(new Date())); String data = new String(b, "utf-8"); logger.info("傳輸過來的info:" + data); String id = jsonStringToObject(data); Map<Object, Object> map = new HashMap<Object, Object>(); //心跳發送不帶id的json數據 if (StringUtils.isNotBlank(id)) { map.put("id", id); } map.put("resultCode", "1"); map.put("result", "success"); printWriter.print(JSON.toJSONString(map) + "\n"); printWriter.flush(); } } catch (Exception e) { logger.error("數據傳輸出現異常", e); try { outputStream = socket.getOutputStream(); } catch (IOException e1) { logger.error("獲取outputStream出現異常"); } // 獲取一個輸出流,向服務端發送信息 // printWriter = new PrintWriter(outputStream);// 將輸出流包裝成打印流 Map<Object, Object> map = new HashMap<Object, Object>(); map.put("resultCode", "0"); map.put("result", "fail"); printWriter.print(JSON.toJSONString(map) + "\n"); printWriter.flush(); } } try { printWriter.close(); outputStream.close(); inputStream.close(); logger.info("30s沒有發送數據,服務端主動關閉連接"); logger.info("被斷開的客戶端IP為:" + socket.getInetAddress().getHostAddress()); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); logger.info("被斷開的時間為:" + df.format(new Date())); socket.close(); } catch (IOException e) { logger.error("關閉socket出現異常", e); } /* * while ((temp = bufferedReader.readLine()) != null) { info += temp; * logger.info(bufferedReader.readLine()); * logger.info("已接收到客戶端連接!!!!!!"); logger.info("服務端接收到客戶端信息:" + * info + ",當前客戶端ip為:" + socket.getInetAddress().getHostAddress()); * logger.info("服務端接收到客戶端信息:" + info + ",當前客戶端ip為:" + * socket.getInetAddress().getHostAddress()); } */ /* * logger.info("*****測試Redis*****"); JedisClient * jedisClient=(JedisClient) * ContextLoader.getCurrentWebApplicationContext().getBean("redis"); * jedisClient.set("testLanHao", "123456789"); String * str=jedisClient.get("testLanHao"); * logger.info("從Redis中取得數據為:"+str); * logger.info("*****測試Redis*****"); */ // ApplicationContext applicationContext=new // ClassPathXmlApplicationContext("classpath*:applicationContext-*.xml"); // RiskManageService // riskManageService=applicationContext.getBean(RiskManageService.class); // socket單獨線程,需要重新加載上下文,掃描的類在applicationContext-service.xml配置 /* * RiskManageService riskManageService=(RiskManageService) * ContextLoader.getCurrentWebApplicationContext().getBean("risk"); * RdRiskEventInfo rdRiskEventInfo=new RdRiskEventInfo(); * rdRiskEventInfo.setId("10"); try { List<RdPhotoInfo> * list=riskManageService.findPhotoInfoByEventId(rdRiskEventInfo); * logger.info(list); } catch (ServiceException e) { * e.printStackTrace(); } */ // outputStream = socket.getOutputStream();// 獲取一個輸出流,向服務端發送信息 // printWriter = new PrintWriter(outputStream);// 將輸出流包裝成打印流 } private String jsonStringToObject(String data) { //數據解析方法return xx; } public static Date stampToDate(String s){ Long timestamp = Long.parseLong(s)*1000; Date date = new Date(timestamp); return date; }
基於TCP/IP協議的socket通訊server