1. 程式人生 > >異步線程池的實現(一)-------具體實現方法

異步線程池的實現(一)-------具體實現方法

fun format 測試 路徑 線程池。 用戶體驗 deb tar clas

本篇是這個內容的第一篇,主要是寫:遇到的問題,和自己摸索實現的方法。後面還會有一篇是總結性地寫線程池的相關內容(偏理論的)。

一、背景介紹

朋友的項目開發到一定程度之後,又遇到了一些問題:在某些流程中的一些節點,由於是串聯執行的。上一步要等下一步執行完畢;或者提交數據之後要等待後臺其他系統處理完成之後,才能返回結果。這樣就會導致,請求發起方不得不一直等待結果,用戶體驗很不好;從項目優化來說,模塊與模塊之間構成了強耦合,這也是不利於以後擴展的,更不用說訪問量上來之後,肯定會抓瞎的問題。所以,我就著手開始,利用異步線程池來解決這個問題。

剛開始的時候,我準備只是在節點處另外起線程去執行異步操作。但是,考慮到以後的擴展,同時利用“池化”技術,更加高效地重復利用線程,節省資源。在這裏就選定了,使用線程池的方法。

二、實現步驟

實現總共分為四步:

第一步,在啟動服務的時候初始化線程池;

第二步,建立有隊列的線程池;

第三步,將業務邏輯方法與線程池聯系起來;

第四步,調整原有代碼邏輯結構,將可以異步的操作放入第三步的業務邏輯方法,並將請求放入線程池的隊列中,等待執行。

三、具體實現

首先,第一步我們在web項目的起源之處web.xml中加入這麽一行

1 <listener>
2 
3                    <listener-class>com.jptec.kevin.thread.listener.InitThreadPoolListener</
listener-class> 4 5 </listener>

這裏的路徑實際上就是,在啟動項目之後,會加載的初始化函數。這個函數主要的作用就是:將線程池啟動起來。實現代碼如下:

技術分享

 1 public class InitThreadPoolListener implements ServletContextListener {
 2 
 3     @Override
 4     public void contextInitialized(ServletContextEvent sce) {
 5 
 6         new TestThreadPool().runThread();
7 } 8 9 @Override 10 public void contextDestroyed(ServletContextEvent sce) { 11 } 12 13 }

好了,第一步就算完工了。

然後,我們開始第二步,建立有隊列的線程池(這裏有很多,理論上的內容,會放在第二篇中詳細說)。在這裏主要是,定義了一個ArrayBlockingQueue隊列(先進先出,有限阻塞),使用Executor定義了一個線程池。具體代碼如下:

 1 public class TestThreadPool {
 2 
 3     protected final static Logger log = LoggerFactory.getLogger(TestThreadPool.class);
 4 
 5     // 線程休眠時間(秒)
 6     // 存放需要發送的信息
 7     public static BlockingQueue<Runnable> addressBqueue = new ArrayBlockingQueue<Runnable>(
 8                 10000);
 9         
10     public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 200, 25, TimeUnit.SECONDS,
11             addressBqueue);
12 
13     public TestThreadPool() {
14     }
15     
16 
17     public void runThread() {
18 
19         try {
20             executor.prestartCoreThread();
21             log.info("隊列大小:" + executor.getQueue().size());
22 
23         } catch (Exception e) {
24             log.error("啟動子線程異常", e);
25         }
26 
27     }
28 
29 }

完成第二步之後,我們繼續第三步。我們有了線程池,那麽實際代碼如何將請求放入其中,並等待執行呢。於是,這裏分為兩個類,一個是負責業務代碼中調用的,負責向隊列中插入請求,一個是單個線程的實現類。具體實現如下:

插入請求實現:

 1 /**
 2  * 線程隊列
 3  */
 4 public class TestQueue {
 5     protected final static Logger log = LoggerFactory.getLogger(TestQueue.class);
 6 
 7     public static boolean put(String userId,String tradeId, String amount, String flag, String term) {
 8             log.debug("添加入隊列開始... 額度申請用戶tradeId=[{}]", tradeId);
 9             try {
10                 TestThreadPool.executor.execute(new TestThread( tradeId, amount,  flag, term, userId));
11             log.debug("添加入隊列結束...");
12         } catch (Exception e) {
13             log.error("添加入隊列異常...", e);
14             return false;
15         }
16         return true;
17     }
18 
19 }

單個線程實現(第八行的引用在下文細說):

 1 /**
 2  * 發送信息線程處理類
 3  */
 4 public class TestThread implements Runnable {
 5 
 6     protected final static Logger log = LoggerFactory.getLogger(TestThread.class);
 7 
 8     private TradeService tradeService = (TradeService) SpringHandle.getBean("tradeService");
 9 
10     String tradeId;
11     String amount;
12     String flag;
13     String term;
14     String userId;
15 
16     
17 
18     /** 
19      * <p>Title: </p>
20      * <p>Description: </p>
21      * @param tradeId
22      * @param amount
23      * @param flag
24      * @param term
25      * @param userId 
26      */ 
27     
28     public TestThread(String tradeId, String amount, String flag, String term,
29             String userId) {
30         super();
31         this.tradeId = tradeId;
32         this.amount = amount;
33         this.flag = flag;
34         this.term = term;
35         this.userId = userId;
36     }
37 
38     @Override
39     public void run() {
40         log.info("線程開始tradeId={}", tradeId);
41         log.info("線程名:={}", Thread.currentThread().getId());
42         log.info("隊列大小:" + TestThreadPool.executor.getPoolSize() + ","
43                 + TestThreadPool.executor.getCompletedTaskCount());
44         putTradeConfirm(userId,tradeId, amount, flag, term);
45         try {
46             Thread.sleep(1000L);
47         } catch (InterruptedException e) {
48             e.printStackTrace();
49         }
50     }
51 
52     private void putTradeConfirm(String userId,String tradeId, String amount, String flag, String term) {
53 
54         tradeService.getMatchFundInfo(userId,amount, tradeId, flag, term);
55 
56     }
57 
58 }

這裏需要註意的是,我需要獲得一個Service的實例來調用具體的方法。但是,註釋的方法不起作用,於是在朋友的幫助下,使用了輔助類。具體實現如下:

 1 @Component
 2 public final class SpringHandle implements BeanFactoryPostProcessor {
 3 
 4     private static ConfigurableListableBeanFactory beanFactory; // Spring應用上下文環境
 5 
 6     public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
 7         SpringHandle.beanFactory = beanFactory;
 8     }
 9 
10     /**
11      * 獲取對象
12      * 
13      * @param name
14      * @return Object 一個以所給名字註冊的bean的實例
15      * @throws org.springframework.beans.BeansException
16      * 
17      */
18     @SuppressWarnings("unchecked")
19     public static <T> T getBean(String name) throws BeansException {
20         return (T) beanFactory.getBean(name);
21     }
22 
23     /**
24      * 獲取類型為requiredType的對象
25      * 
26      * @param clz
27      * @return
28      * @throws org.springframework.beans.BeansException
29      * 
30      */
31     public static <T> T getBean(Class<T> clz) throws BeansException {
32         T result = (T) beanFactory.getBean(clz);
33         return result;
34     }
35 
36     /**
37      * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true
38      * 
39      * @param name
40      * @return boolean
41      */
42     public static boolean containsBean(String name) {
43         return beanFactory.containsBean(name);
44     }
45 
46     /**
47      * 判斷以給定名字註冊的bean定義是一個singleton還是一個prototype。
48      * 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException)
49      * 
50      * @param name
51      * @return boolean
52      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
53      * 
54      */
55     public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
56         return beanFactory.isSingleton(name);
57     }
58 
59     /**
60      * @param name
61      * @return Class 註冊對象的類型
62      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
63      * 
64      */
65     public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
66         return beanFactory.getType(name);
67     }
68 
69     /**
70      * 如果給定的bean名字在bean定義中有別名,則返回這些別名
71      * 
72      * @param name
73      * @return
74      * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
75      * 
76      */
77     public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
78         return beanFactory.getAliases(name);
79     }
80 
81 }

最後,在具體業務邏輯中,調用插入請求的方法,即可。

TradeGetFundInfoQueue.put(userId, tradeId, quota, repaymentType, timeLimit);

四、測試函數

由於在項目中,所以我寫了另外一個測試函數(這個測試函數,會在下一篇文章中再次遇到),放在這裏。供大家參考:

 1 public class TestThreadPool {
 2 
 3     public static BlockingQueue<Runnable> queue = new  ArrayBlockingQueue<Runnable>(
 4             10000);
 5  
 6     public static void main(String[] args) {
 7         for (int i = 0; i < 2; i++) {
 8             queue.add(new TestThread("初始化"));
 9         }
10  
11         final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, queue);
12 
13         executor.prestartCoreThread();
14 
15  
16         new Thread(new Runnable() {
17             @Override
18             public void run() {
19                 while (true) {
20                     System.out.println("getActiveCount=" + executor.getActiveCount()
21                                     + ";getKeepAliveTime=" + executor.getKeepAliveTime(TimeUnit.SECONDS)
22                                     + ";getCompletedTaskCount=" + executor.getCompletedTaskCount()
23                                     + ";getCorePoolSize=" + executor.getCorePoolSize()
24                                     + ";getLargestPoolSize=" + executor.getLargestPoolSize()
25                                     + ";getMaximumPoolSize=" + executor.getMaximumPoolSize()
26                                     + ";getPoolSize=" + executor.getPoolSize()
27                                     + ";getTaskCount=" + executor.getTaskCount()
28                                     + ";getQueue().size()=" + executor.getQueue().size()
29                     );
30                     try {
31                         Thread.currentThread().sleep(200L);
32                     } catch (InterruptedException e) {
33                         e.printStackTrace();
34                     }
35                 }
36             }
37         }).start();
38  
39         new Thread(new Runnable() {
40             @Override
41             public void run() {
42                 int i = 0;
43                 while (true) {
44                     queue.add(new TestThread("生產者"));
45                     try {
46                         Thread.currentThread().sleep(100L);
47                     } catch (InterruptedException e) {
48                         e.printStackTrace();
49                     }
50                     i++;
51                     if (i > 100) break;
52                 }
53             }
54         }).start();
55     }
56 }
57  
58 class TestThread implements Runnable {
59     public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
60     private String name;        //創建者
61     private Date addDate;       //添加到隊列的日期
62  
63     TestThread(String name) {
64         this.name = name;
65         this.addDate = new Date();
66     }
67  
68     @Override
69     public void run() {
70         System.out.println(Thread.currentThread().getName() +
71                 ":創建者=" + name + ",創建時間=" + sdf.format(addDate) + ",執行時間=" + sdf.format(new Date()) + ",當前隊列大小=" + TestThreadPool.queue.size());
72  
73         System.out.println(TestThreadPool.queue.peek());
74         try {
75             Thread.currentThread().sleep(1000L);
76         } catch (InterruptedException e) {
77             e.printStackTrace();
78         }
79     }
80 }

測試的結果大致是這個樣子的:

技術分享

最後,希望這篇文章對你有幫助,感謝朋友的幫助!

異步線程池的實現(一)-------具體實現方法