1. 程式人生 > >Java併發程式設計-擼一個數據庫連線池

Java併發程式設計-擼一個數據庫連線池

章節目錄

  • 等待超時模式的使用場景
  • 可以掌握的技能
    • 等待/通知 消費者/生產者模式
    • CountDownLatch、AtomicInteger、靜態內部類、LinkedList、動態代理的使用

1.等待超時模式

場景

當我們呼叫方法時,這個方法返回的資源比較重要,比如獲取資料庫連線池中連線控制代碼。但是這個資源的返回隨著業務量的增加,那麼獲取資源(連線池連線)的時間就會增加,那麼呼叫一個方法時就要等待一段時間(一般來說是給定一個時間段),如果該方法能夠在一段時間內獲取到結果,那麼將結果立刻返回,反之,超時返回預設結果。

等待/通知的經典範式,即加鎖、條件迴圈、處理邏輯3個步驟,這種正規化沒辦法做到超時等待,對經典範式做很小的改動,就可以實現超時等待。

等待超時模式虛擬碼:

   public synchronized Object get(long mills) throws InterruptedException {
       Object result = null;
       long future = System.currentTimeMills() + mills;
       long remaining = mills;
       while (result == null && remaining > 0) {
            wait(remaining);//釋放鎖,阻塞 mills 毫秒
            remaining = future - System.currentTimeMills();
       }

       return result;//如果超時之後獲取到result則不返回null
   }

超時等待的作用就是不會永遠阻塞呼叫者,但是 超時之後被喚醒,知識將執行緒從等待佇列移動至阻塞佇列,繼續向下進行返回result還是要重新獲取鎖,如果一直獲取不到鎖,那麼result也不會列印。只是增加了靈活性。

2.可以掌握的技能

實戰
使用等待超時模式擼一個簡單資料庫連線池,在示例中模擬:

  • 從連線池獲取連線 RunnerThread
  • 使用連線 RunnerThread
  • 釋放連線 RunnerThread
    注意:客戶端獲取(消費)連線的過程被設定為等待超時、等待/通知兩種模式
    ConnectionPool.java-資料庫連線池
package org.seckill.DBConnection;

import java.sql.Connection;
import java.util.LinkedList;

/**
 * 資料庫連線池物件
 */
public class ConnectionPool {

    //連結串列list(池)維護 connection 連線物件
    private LinkedList<Connection> pool = new LinkedList<Connection>();

    //構造方法 初始化池中連線
    public ConnectionPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(ConnectionDriver.createConnection());//建立initialSize個代理Connection物件
            }
        }
    }

    //釋放connection ,相當於-生產者
    public void releaseConnection(Connection connection) {
        if (connection != null) {//有效歸還連線
            synchronized (pool) {
                pool.addLast(connection);
                //生產者動作完畢後,需要喚醒所有消費者
                pool.notifyAll();
            }
        }
    }

    //獲取connection控制代碼,相當於消費者,採用超時等待與等待/通知兩種策略
    public Connection fetchConnection(long mills) throws InterruptedException {
        synchronized (pool) {
            if (mills < 0) {//非超時等待模式,採用等待/通知模式
                while (pool.isEmpty()) {
                    pool.wait();//本示例中不演示這種模式下獲取連線的情景
                }

                return pool.removeFirst();
            } else {//超時等待模式
                long future = System.currentTimeMillis() + mills;
                long remaining = mills;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }

                Connection connection = null;

                if (!pool.isEmpty()) {
                    connection = pool.removeFirst();//返回頭結點物件
                }

                return connection;
            }
        }
    }

}

ConnectionDriver.java-動態生成Connection代理物件

package org.seckill.DBConnection;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;

/**
 * 資料庫連線驅動,
 * 動態代理獲取實現java.sql.Connection 介面的代理物件
 */
public class ConnectionDriver {

    static class ConnectionHandler implements InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName() == "commit") {
                Thread.sleep(100);
            }
            return null;
        }

    }

    //獲取Connection的動態代理類
    public static final Connection createConnection() {
        return (Connection) Proxy.newProxyInstance(
                ConnectionDriver.class.getClassLoader(),//類載入器
                new Class<?>[]{Connection.class},//Connection實現的介面列表,包含Connection介面
                new ConnectionHandler());//與代理物件繫結的handler
    }
}

ConnectionPoolTest.java--測試類

package org.seckill.DBConnection;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class ConnectionPoolTest {
    //執行緒池中初始化10個連線
    static ConnectionPool connectionPool = new ConnectionPool(10);
    //保證所有的ConnectionRunner 能夠同時開始
    static CountDownLatch start = new CountDownLatch(1);
    //main執行緒將等待所有的Connection Runner結束後才開始執行
    static CountDownLatch end;

    public static void main(String[] args) throws Exception {
        //ConnectionRunner 執行緒數量,可以修改執行緒數量進行觀察
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;//每個執行緒進行20次fetchConnetion動作
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();

        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread");
            thread.start();
        }

        start.countDown();//使所有執行緒同時執行
        end.await();//主執行緒等待所有執行緒執行完
        System.out.println("總的請求次數" + threadCount * count);
        System.out.println("獲取到的連線總數" + got);
        System.out.println("未獲取到的連線總數" + notGot);


    }

    static class ConnectionRunner implements Runnable {
        int count;//每個執行緒fetchConnetion的次數
        AtomicInteger got;//記錄fetchConnection 成功的次數
        AtomicInteger notGot;//記錄fetchConnetion 未成功的次數

        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            try {
                start.await();//等待 所有ConnectionRunner 初始化成功且處於Runnable狀態,同時開始執行,由主執行緒控制的
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            while (count > 0) {
                try {
                    //從連線池中獲取連線,如果1000ms內無法獲取到,將會返回null。
                    Connection connection = connectionPool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
                            connection.commit();
                        } finally {
                            //歸還連線
                            connectionPool.releaseConnection(connection);
                            got.incrementAndGet();//對獲取次數狀態進行更改
                        }
                    } else {
                        notGot.incrementAndGet();//對未獲取次數狀態進行更改
                    }
                } catch (Exception e) {

                } finally {
                    count--;//執行次數遞減
                }
            }

            end.countDown();
        }
    }
}

執行結果

1.設定RunnerConnection threadCount數為10

2836699-fdc708f68490eda0.png threadCount = 10

2.設定RunnerConnection threadCount數為20

2836699-bfe297d3518b9ef0.png threadCount = 20

3.設定RunnerConnection threadCount數為50

2836699-b8e2f3bcf06d3ff7.png threadCount = 50

4.設定RunnerConnection threadCount數為100

2836699-aa4d162c2898de1a.png threadCount = 50

可以看到隨著 runnerConnection 連線執行緒數的遞增,連線的穩定性是越來越低的。但使用者呼叫不會長時間阻塞到connect fetch 上,而是按時返回。