1. 程式人生 > >WCF實現長連接

WCF實現長連接

resolv model new void 通過 找到 container 主動 ||

由於WCF的機制,連接池會在連接建立一定時間後超時,即使設置了超時時間非常長,也可能被服務端系統主動回收。之前做項目時碰到了這個問題,所以項目上考慮采用長連接,自動管理連接池,當連接超時後,自動重建,保持會話,這樣在業務層就不需要再去處理連接超時的問題。具體的思路是,在程序啟動時,先將需要使用長連接的連接放到長連接容器中,並設置連接的最大數量,在使用時,輪詢使用連接,當使用時捕獲到異常時,自動切換到下一個連接,並重建上一個連接。代碼如下:

AliveConnection類功能是保持連接,具體執行WCF調用,在檢測到連接不可用時進行重建。

class AliveConnection<T> where
T : System.ServiceModel.ICommunicationObject, new() { private string _endpointConfigName = string.Empty; private T _instance = default(T); /// <summary> /// 正在執行其他過程時,設置為正忙。執行完成後閑置 /// 連接出錯後,正在重新連接創建時設置為正忙,解除正忙狀態有倆種情況: /// 1.第一次重建連接成功後; /// 2.在線程中重試成功後;
/// </summary> public bool IsBusy { get; set; } internal AliveConnection(string endpointConfigName) { if (string.IsNullOrEmpty(endpointConfigName.Trim())) throw new ArgumentException("終結點不能配置為空。"); _endpointConfigName = endpointConfigName;
//_instance = CreateConnection(); } internal bool Execute(Action<T> expression) { try { Open(); expression(_instance); return true; } catch (System.ServiceModel.CommunicationException e) { return false; } } internal bool Execute<TResult>(Func<T, TResult> expression,out TResult result) { result = default(TResult); try { Open(); result = expression(_instance); return true; } catch (System.ServiceModel.CommunicationException e) { return false; } } private void Open() { if (_instance == null)//使用時才創建 { _instance = CreateConnection(); _instance.Faulted += Faulted; } if (_instance.State != System.ServiceModel.CommunicationState.Opened) _instance.Open(); } private void Faulted(object sender, EventArgs e) { lock (_instance) { IsBusy = true; //失敗後鎖住並重新建立連接 _instance = CreateConnection(); } } private T CreateConnection() { try { var instance = (T)Activator.CreateInstance(typeof(T), _endpointConfigName); IsBusy = false; return instance; } catch (Exception e) { IsBusy = true; RetryWhenFailed(); throw new Exception("創建連接失敗,請檢測終結點配置和服務。", e); } } //創建一個線程來不間斷重試創建連接 private void RetryWhenFailed() { int retryTimes = 0; Task.Factory.StartNew(() => { while (true) { //如果拋出異常,表示創建失敗,繼續重試,如果睡眠時間大於60秒,則睡眠時間不再增加 try { retryTimes++; _instance = (T)Activator.CreateInstance(typeof(T), _endpointConfigName); IsBusy = false; break; } catch { int sleepMillionSeconds = retryTimes * 5 * 1000; sleepMillionSeconds = sleepMillionSeconds > 60 * 1000 ? 60 * 1000 : sleepMillionSeconds; System.Threading.Thread.Sleep(sleepMillionSeconds); continue; } } }); } }

另外我們需要一個類,來做連接的初始化,並做輪詢,並且暴露執行的函數。

 /// <summary>
    /// WCF長連接容器
    /// </summary>
    /// <typeparam name="T">待創建的WCF服務類型</typeparam>
    public class AliveConnectionContainer<T>
        where T : System.ServiceModel.ICommunicationObject, new()
    {
        #region fields
        private List<AliveConnection<T>> _connections = null;//所有連接
        private int _currentConnectionIndex = 0;//當前使用的連接的索引
        #endregion

        #region Octor
        /// <summary>
        /// 通過終結點配置名稱,創建長連接。如果終結點數不等於連接數,則輪詢跟節點配置列表,最終創建達到連接數的連接。
        /// </summary>
        /// <param name="maxConnection">需要創建的長連接數</param>
        /// <param name="endpointConfigNames">所有的終結點配置名稱,對應配置節點裏bind的name</param>
        /// <exception>如果終結點配置為空,則拋出異常,如果創建失敗,也會拋出異常</exception>
        public AliveConnectionContainer(int maxConnection, params string[] endpointConfigNames)
        {
            _connections = new List<AliveConnection<T>>(maxConnection);

            int tmpIndex = 0;
            for (int index = 0; index < maxConnection; index++)
            {
                if (tmpIndex >= endpointConfigNames.Count()) tmpIndex = 0;
                _connections.Add(new AliveConnection<T>(endpointConfigNames[tmpIndex]));
            }
        }
        #endregion

        #region public method
        /// <summary>
        /// 執行服務調用,會一直輪詢執行直到執行成功
        /// </summary>
        /// <param name="expression">需要調用的處理方法</param>
        public void Execute(Action<T> expression)
        {
            Func<bool> executeExpression = () => Instance.Execute(expression);
            Execute(executeExpression);
        }    
        /// <summary>
        /// 執行服務調用,會一直輪詢執行直到執行成功
        /// </summary>
        /// <param name="expression">需要調用的處理方法</param>
        public TResult Execute<TResult>(Func<T, TResult> expression)
        {
            TResult result = default(TResult);
            Func<bool> executeExpression = () => Instance.Execute(expression,out result);
            Execute(executeExpression);
            return result;
        }

        private void Execute(Func<bool> expression)
        {
            bool success = false;
            int failedCount = 0;
            try
            {
                while (true)
                {
                    success = expression();
                    if (!success) failedCount++;
                    else break;

                    if (failedCount >= _connections.Count) throw new Exception("沒有可用的服務,請檢測服務運行狀態。");
                }
            }
            catch (Exception e)
            {
                throw new Exception("執行WCF服務調用失敗。", e);
            }
        }

        #endregion

        #region private method
        private AliveConnection<T> Instance
        {
            get
            {
                if (_connections == null || _connections.Count == 0) throw new Exception("沒有可用的連接,請先設置連接。");
                AliveConnection<T> result;
                while (!(result = GetInstance()).IsBusy) break;//輪詢直到找到空閑的連接
                return result;
            }
        }

        private AliveConnection<T> GetInstance()
        {
            if (_currentConnectionIndex >= _connections.Count) _currentConnectionIndex = 0;
            return _connections[_currentConnectionIndex++];
        }
        #endregion
    }

使用靜態類,做全局的WCF連接註冊和檢索使用

/// <summary>
    /// 長連接服務的管理類
    /// </summary>
    /// <example>
    /// AliveConnectionManager.Register<Service>(endpoints,10);
    /// var client = AliveConnectionManager.Resolve<Service>();
    /// List<Movie> movies;
    /// client.Execute(service => movies = service.GetMovies());
    /// </example>
    public static class AliveConnectionManager
    {
        private static Dictionary<Type, object> _container = new Dictionary<Type, object>();
        /// <summary>
        /// 根據輸入的終結點列表,在app.config文件中查找對應的終結點,並建立連接
        /// </summary>
        /// <typeparam name="T">要註冊的WCF的服務類型</typeparam>
        /// <param name="maxConnection">連接數</param>
        /// <param name="endpointConfigNames">配置的終結點列表</param>
        public static void Register<T>(int maxConnection, params string[] endpointConfigNames)
            where T : System.ServiceModel.ICommunicationObject, new()
        {
            if (_container.ContainsKey(typeof(T))) throw new ArgumentException(string.Format("容器中已經添加過{0}的長連接服務。", typeof(T)));
            _container.Add(typeof(T), new AliveConnectionContainer<T>(maxConnection, endpointConfigNames));
        }

        /// <summary>
        /// 獲取類型為T的長連接服務
        /// </summary>
        /// <typeparam name="T">待獲取的WCF的服務類型</typeparam>
        /// <returns>對應類型為T的服務</returns>
        public static AliveConnectionContainer<T> Resolve<T>() where T : System.ServiceModel.ICommunicationObject, new()
        {
            Type type = typeof(T);
            if (!_container.ContainsKey(type)) throw new ArgumentException(string.Format("沒有找到類型為{0}的長連接服務。", type));
            return _container[type] as AliveConnectionContainer<T>;
        }
    }

服務註冊代碼

 AliveConnectionManager.Register<Service>(endpoints,10);

調用服務

var client = AliveConnectionManager.Resolve<Service>();
 List<Movie> movies;
client.Execute(service => movies = service.GetMovies());

Service即我們在客戶端引入的WCF服務

WCF實現長連接