異步線程池的實現(一)-------具體實現方法
本篇是這個內容的第一篇,主要是寫:遇到的問題,和自己摸索實現的方法。後面還會有一篇是總結性地寫線程池的相關內容(偏理論的)。
一、背景介紹
朋友的項目開發到一定程度之後,又遇到了一些問題:在某些流程中的一些節點,由於是串聯執行的。上一步要等下一步執行完畢;或者提交數據之後要等待後臺其他系統處理完成之後,才能返回結果。這樣就會導致,請求發起方不得不一直等待結果,用戶體驗很不好;從項目優化來說,模塊與模塊之間構成了強耦合,這也是不利於以後擴展的,更不用說訪問量上來之後,肯定會抓瞎的問題。所以,我就著手開始,利用異步線程池來解決這個問題。
剛開始的時候,我準備只是在節點處另外起線程去執行異步操作。但是,考慮到以後的擴展,同時利用“池化”技術,更加高效地重復利用線程,節省資源。在這裏就選定了,使用線程池的方法。
二、實現步驟
實現總共分為四步:
第一步,在啟動服務的時候初始化線程池;
第二步,建立有隊列的線程池;
第三步,將業務邏輯方法與線程池聯系起來;
第四步,調整原有代碼邏輯結構,將可以異步的操作放入第三步的業務邏輯方法,並將請求放入線程池的隊列中,等待執行。
三、具體實現
首先,第一步我們在web項目的起源之處web.xml中加入這麽一行
1 <listener> 2 3 <listener-class>com.jptec.kevin.thread.listener.InitThreadPoolListener</listener-class> 4 5 </listener>
這裏的路徑實際上就是,在啟動項目之後,會加載的初始化函數。這個函數主要的作用就是:將線程池啟動起來。實現代碼如下:
1 public class InitThreadPoolListener implements ServletContextListener { 2 3 @Override 4 public void contextInitialized(ServletContextEvent sce) { 5 6 new TestThreadPool().runThread();7 } 8 9 @Override 10 public void contextDestroyed(ServletContextEvent sce) { 11 } 12 13 }
好了,第一步就算完工了。
然後,我們開始第二步,建立有隊列的線程池(這裏有很多,理論上的內容,會放在第二篇中詳細說)。在這裏主要是,定義了一個ArrayBlockingQueue隊列(先進先出,有限阻塞),使用Executor定義了一個線程池。具體代碼如下:
1 public class TestThreadPool { 2 3 protected final static Logger log = LoggerFactory.getLogger(TestThreadPool.class); 4 5 // 線程休眠時間(秒) 6 // 存放需要發送的信息 7 public static BlockingQueue<Runnable> addressBqueue = new ArrayBlockingQueue<Runnable>( 8 10000); 9 10 public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 200, 25, TimeUnit.SECONDS, 11 addressBqueue); 12 13 public TestThreadPool() { 14 } 15 16 17 public void runThread() { 18 19 try { 20 executor.prestartCoreThread(); 21 log.info("隊列大小:" + executor.getQueue().size()); 22 23 } catch (Exception e) { 24 log.error("啟動子線程異常", e); 25 } 26 27 } 28 29 }
完成第二步之後,我們繼續第三步。我們有了線程池,那麽實際代碼如何將請求放入其中,並等待執行呢。於是,這裏分為兩個類,一個是負責業務代碼中調用的,負責向隊列中插入請求,一個是單個線程的實現類。具體實現如下:
插入請求實現:
1 /** 2 * 線程隊列 3 */ 4 public class TestQueue { 5 protected final static Logger log = LoggerFactory.getLogger(TestQueue.class); 6 7 public static boolean put(String userId,String tradeId, String amount, String flag, String term) { 8 log.debug("添加入隊列開始... 額度申請用戶tradeId=[{}]", tradeId); 9 try { 10 TestThreadPool.executor.execute(new TestThread( tradeId, amount, flag, term, userId)); 11 log.debug("添加入隊列結束..."); 12 } catch (Exception e) { 13 log.error("添加入隊列異常...", e); 14 return false; 15 } 16 return true; 17 } 18 19 }
單個線程實現(第八行的引用在下文細說):
1 /** 2 * 發送信息線程處理類 3 */ 4 public class TestThread implements Runnable { 5 6 protected final static Logger log = LoggerFactory.getLogger(TestThread.class); 7 8 private TradeService tradeService = (TradeService) SpringHandle.getBean("tradeService"); 9 10 String tradeId; 11 String amount; 12 String flag; 13 String term; 14 String userId; 15 16 17 18 /** 19 * <p>Title: </p> 20 * <p>Description: </p> 21 * @param tradeId 22 * @param amount 23 * @param flag 24 * @param term 25 * @param userId 26 */ 27 28 public TestThread(String tradeId, String amount, String flag, String term, 29 String userId) { 30 super(); 31 this.tradeId = tradeId; 32 this.amount = amount; 33 this.flag = flag; 34 this.term = term; 35 this.userId = userId; 36 } 37 38 @Override 39 public void run() { 40 log.info("線程開始tradeId={}", tradeId); 41 log.info("線程名:={}", Thread.currentThread().getId()); 42 log.info("隊列大小:" + TestThreadPool.executor.getPoolSize() + "," 43 + TestThreadPool.executor.getCompletedTaskCount()); 44 putTradeConfirm(userId,tradeId, amount, flag, term); 45 try { 46 Thread.sleep(1000L); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 52 private void putTradeConfirm(String userId,String tradeId, String amount, String flag, String term) { 53 54 tradeService.getMatchFundInfo(userId,amount, tradeId, flag, term); 55 56 } 57 58 }
這裏需要註意的是,我需要獲得一個Service的實例來調用具體的方法。但是,註釋的方法不起作用,於是在朋友的幫助下,使用了輔助類。具體實現如下:
1 @Component 2 public final class SpringHandle implements BeanFactoryPostProcessor { 3 4 private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境 5 6 public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { 7 SpringHandle.beanFactory = beanFactory; 8 } 9 10 /** 11 * 獲取對象 12 * 13 * @param name 14 * @return Object 一個以所給名字註冊的bean的實例 15 * @throws org.springframework.beans.BeansException 16 * 17 */ 18 @SuppressWarnings("unchecked") 19 public static <T> T getBean(String name) throws BeansException { 20 return (T) beanFactory.getBean(name); 21 } 22 23 /** 24 * 獲取類型為requiredType的對象 25 * 26 * @param clz 27 * @return 28 * @throws org.springframework.beans.BeansException 29 * 30 */ 31 public static <T> T getBean(Class<T> clz) throws BeansException { 32 T result = (T) beanFactory.getBean(clz); 33 return result; 34 } 35 36 /** 37 * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true 38 * 39 * @param name 40 * @return boolean 41 */ 42 public static boolean containsBean(String name) { 43 return beanFactory.containsBean(name); 44 } 45 46 /** 47 * 判斷以給定名字註冊的bean定義是一個singleton還是一個prototype。 48 * 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException) 49 * 50 * @param name 51 * @return boolean 52 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException 53 * 54 */ 55 public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { 56 return beanFactory.isSingleton(name); 57 } 58 59 /** 60 * @param name 61 * @return Class 註冊對象的類型 62 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException 63 * 64 */ 65 public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { 66 return beanFactory.getType(name); 67 } 68 69 /** 70 * 如果給定的bean名字在bean定義中有別名,則返回這些別名 71 * 72 * @param name 73 * @return 74 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException 75 * 76 */ 77 public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { 78 return beanFactory.getAliases(name); 79 } 80 81 }
最後,在具體業務邏輯中,調用插入請求的方法,即可。
TradeGetFundInfoQueue.put(userId, tradeId, quota, repaymentType, timeLimit);
四、測試函數
由於在項目中,所以我寫了另外一個測試函數(這個測試函數,會在下一篇文章中再次遇到),放在這裏。供大家參考:
1 public class TestThreadPool { 2 3 public static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( 4 10000); 5 6 public static void main(String[] args) { 7 for (int i = 0; i < 2; i++) { 8 queue.add(new TestThread("初始化")); 9 } 10 11 final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, queue); 12 13 executor.prestartCoreThread(); 14 15 16 new Thread(new Runnable() { 17 @Override 18 public void run() { 19 while (true) { 20 System.out.println("getActiveCount=" + executor.getActiveCount() 21 + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS) 22 + ";getCompletedTaskCount=" + executor.getCompletedTaskCount() 23 + ";getCorePoolSize=" + executor.getCorePoolSize() 24 + ";getLargestPoolSize=" + executor.getLargestPoolSize() 25 + ";getMaximumPoolSize=" + executor.getMaximumPoolSize() 26 + ";getPoolSize=" + executor.getPoolSize() 27 + ";getTaskCount=" + executor.getTaskCount() 28 + ";getQueue().size()=" + executor.getQueue().size() 29 ); 30 try { 31 Thread.currentThread().sleep(200L); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 } 36 } 37 }).start(); 38 39 new Thread(new Runnable() { 40 @Override 41 public void run() { 42 int i = 0; 43 while (true) { 44 queue.add(new TestThread("生產者")); 45 try { 46 Thread.currentThread().sleep(100L); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 i++; 51 if (i > 100) break; 52 } 53 } 54 }).start(); 55 } 56 } 57 58 class TestThread implements Runnable { 59 public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 60 private String name; //創建者 61 private Date addDate; //添加到隊列的日期 62 63 TestThread(String name) { 64 this.name = name; 65 this.addDate = new Date(); 66 } 67 68 @Override 69 public void run() { 70 System.out.println(Thread.currentThread().getName() + 71 ":創建者=" + name + ",創建時間=" + sdf.format(addDate) + ",執行時間=" + sdf.format(new Date()) + ",當前隊列大小=" + TestThreadPool.queue.size()); 72 73 System.out.println(TestThreadPool.queue.peek()); 74 try { 75 Thread.currentThread().sleep(1000L); 76 } catch (InterruptedException e) { 77 e.printStackTrace(); 78 } 79 } 80 }
測試的結果大致是這個樣子的:
最後,希望這篇文章對你有幫助,感謝朋友的幫助!
異步線程池的實現(一)-------具體實現方法