curator原始碼(一) 初始化、啟動和關閉。
Curator框架是zookeeper客戶端框架,官網有句話說的很簡潔:curator對於zookeeper就像Guava對於java。
重複策略,例項化,眾多實用的食譜選單(分散式鎖,計數器,佇列,柵欄,訊號量,路徑快取)。
初始化
1.直接呼叫CuratorFrameworkFactory的newClient方法
/**
* 建立客戶端
* @param connectString zk地址
* @param sessionTimeoutMs session超時
* @param connectionTimeoutMs 路徑超時
* @param retryPolicy 重複策略
* @return client
*/
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs,
int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
/**
* 返回一個用來建立CuratorFramework新的builder
* @return new builder
*/
public static Builder builder()
{
return new Builder();
}
返回的Builder是CuratorFrameworkFactory的內部類,主要用於流式的建立CuratorFramework,裡面包含所需引數
private EnsembleProvider ensembleProvider;
private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS;
private RetryPolicy retryPolicy;
private ThreadFactory threadFactory = null;
private String namespace;
private List<AuthInfo> authInfos = null;
private byte[] defaultData = LOCAL_ADDRESS;
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
private boolean useContainerParentsIfAvailable = true;
ensembleProvider(配置提供者) 可以通過在Builder裡的以下2個方法構造。設定伺服器的連結地址,
格式: host:port,host2:port2…..
主要由此提供連結地址,供後期zookeeper裡使用。
public Builder connectString(String connectString)
{
ensembleProvider = new FixedEnsembleProvider(connectString);
return this;
}
public Builder ensembleProvider(EnsembleProvider ensembleProvider)
{
this.ensembleProvider = ensembleProvider;
return this;
}
2.通過Builder的build函式建立客戶端。
/**
* 根據builder裡的值建立新的CuratorFramework
* @return new CuratorFramework
*/
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
3.CuratorFrameworkImpl定義
CuratorFrameworkImpl為CuratorFramework介面的一個實現,平時主要用到的就是此client。
該建構函式主要還是使用Builder裡的預設配置的一些引數,這些引數可以通過CuratorFrameworkFactory李的Builder去流式建立。
設定如下引數,如ZookeeperFactory 工廠,CuratorZookeeperClient【重點,客戶端的工作主要靠它】,listeners 監聽,backgroundOperations 後臺執行行為,namespace 名稱空間(用於放置在路徑的字首),
threadFactory CuratorFrameworkImpl的執行緒工廠,connectionStateManager 連結狀態管理器, compressionProvider 壓縮器等等。
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()),
null, null, null, null, null, watchedEvent, null);
processEvent(event);
}
}, builder.getRetryPolicy(), builder.canBeReadOnly());
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
authInfos = buildAuths(builder);
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
}
3.1 ZookeeperFactory的構建
//1.從builder裡拿出ZookeeperFactory
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//2.builder裡的預設ZookeeperFactory
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
//3.ZookeeperFactory的預設實現類,預設zookeeper工廠
//實際上就是new一個org.apache.zookeeper.ZooKeeper,原生態的Zookeeper。
public class DefaultZookeeperFactory implements ZookeeperFactory
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
//4.包裝一層,加上鑑權資訊
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout,
Watcher watcher, boolean canBeReadOnly) throws Exception
{
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
for ( AuthInfo auth : authInfos )
{
zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zooKeeper;
}
};
}
3.2 ConnectionStateManager的構建
設定其service為單例ExecutorService。主要用在監控連結狀態。
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory)
{
this.client = client;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
service = Executors.newSingleThreadExecutor(threadFactory);
}
4.CuratorZookeeperClient定義
在CuratorFrameworkImpl初始化的時候構建
1.初始化,需要新建Watcher
this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
processEvent(event);//呼叫本類的方法
}
}, builder.getRetryPolicy(), builder.canBeReadOnly());
2.listeners採用函數語言程式設計,此處的監聽執行就是在ConnectionState裡的process呼叫的。
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
TimeTrace trace = client.startTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
logError("Event listener threw exception", e);
}
return null;
}
});
}
3.主要有2個欄位ConnectionState連線狀態,retryPolicy 重複策略就在如下定義。
private final ConnectionState state;
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
初始化主要做了如下工作,校驗重複策略不能為空,校驗配置連結地址提供者不能為空,初始化ConnectionState,設定重複策略。
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly)
{
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
}
5.ConnectionState定義
ConnectionState實現了Watcher和Closeable介面。
設定了連結地址的配置策略,session過期時間,連結超時時間,設定日誌追蹤驅動器(使用的預設的DefaultTracerDriver),
將傳遞進來的watcher放入parentWatchers中,最後定義HandleHolder。
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs,
int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
}
//這個地方的this就是ConnectionState實現Watcher介面的原因。
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
//主要用來執行CuratorFrameworkImpl裡的監聽。
//第一、根據當前ConnectionState的isConnected與事件的狀態,
//去檢查當前的事件的KeeperState,在checkState過程中,
//若為SyncConnected和ConnectedReadOnly表示為連結狀態,其他則為斷鏈狀態;
//同時若過期Expired,會重新連結zookeeper,
//不過期的會去判斷當前連結地址是否發生變化,若發生也會重新連結zookeeper。
//同時若連結狀態與之前的不同再修改狀態。(詳細的在以後講去了)
//第二、將parentWatchers裡的所有watcher全部呼叫一次。
@Override
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
if ( event.getType() == Watcher.Event.EventType.None )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}
for ( Watcher parentWatcher : parentWatchers )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
parentWatcher.process(event);
timeTrace.commit();
}
}
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
{
isConnected = false;
break;
}
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
//若過期Expired,會重新連結zookeeper
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
// NOP
break;
}
}
if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() )
{
handleNewConnectionString();
}
return isConnected;
}
6.HandleHolder定義
簡單的設定5個欄位
HandleHolder(ZookeeperFactory zookeeperFactory,
Watcher watcher, EnsembleProvider ensembleProvider,
int sessionTimeout, boolean canBeReadOnly)
{
this.zookeeperFactory = zookeeperFactory;
this.watcher = watcher;
this.ensembleProvider = ensembleProvider;
this.sessionTimeout = sessionTimeout;
this.canBeReadOnly = canBeReadOnly;
}
總結
- 主要使用到CuratorFrameworkImpl,CuratorZookeeperClient,ConnectionState,HandleHolder4個類,一步一步往後定義;
- 同時使用ConnectionStateManager進行連結狀態監控;
啟動
CuratorFrameworkImpl啟動
先校驗狀態是否為LATENT並設定為STARTED。
啟動connectionStateManager。
連結狀態管理器中增加一個監聽器,用於在連結狀態時將client的logAsErrorConnectionErrors設為true。
再啟動CuratorZookeeperClient。
executorService定義成2個執行緒的執行器,1個為監聽,1個為後臺。並執行後臺的迴圈操作。
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
connectionStateManager.start(); // ordering dependency - must be called before client.start()
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
client.start();
executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
backgroundOperationsLoop();
return null;
}
});
}
catch ( Exception e )
{
handleBackgroundOperationException(null, e);
}
}
ConnectionStateManager啟動
啟動一個ConnectionStateManager的執行緒執行者,通過listeners的函數語言程式設計去執行監聽狀態變化。
/**
* Start the manager
*/
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
private void processEvents()
{
try
{
while ( !Thread.currentThread().isInterrupted() )
{ //take方法可以阻塞
final ConnectionState newState = eventQueue.take();
if ( listeners.size() == 0 )
{
log.warn("There are no ConnectionStateListeners registered.");
}
listeners.forEach
(
new Function<ConnectionStateListener, Void>()
{
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
CuratorZookeeperClient啟動
需要校驗CuratorZookeeperClient是否已啟動,若啟動拋異常。
再去啟動ConnectionState。
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
IllegalStateException ise = new IllegalStateException("Already started");
throw ise;
}
state.start();
}
ConnectionState啟動
先啟動連結地址配置器。
再重設。
在重設中,原子性的增加例項次數1次;
同時將連結狀態設為false,
再去啟動HandleHolder,
同時最重要一步zooKeeper.getZooKeeper()初始化原生態的連結;
void start() throws Exception
{
log.debug("Starting");
ensembleProvider.start();
reset();
}
private synchronized void reset() throws Exception
{
log.debug("reset");
instanceIndex.incrementAndGet();
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
zooKeeper.closeAndReset();
zooKeeper.getZooKeeper(); // initiate connection
}
HandleHolder啟動
可以看出借用內部介面Helper來完成操作的。
Helper介面有2方法,一個獲取原生的zookeeper,一個是獲取連結地址路徑。
private interface Helper
{
ZooKeeper getZooKeeper() throws Exception;
String getConnectionString();
}
重點還是在closeAndReset方法上。
主要還是在helper上的初始化,
當第一次呼叫getZooKeeper 時用synchronized 包裹,並會對連結地址和zookeeper定義,同時再次定義helper物件,並將此次定義好的兩個值作為返回值去實現此前的2個方法。此刻以後每次呼叫getZooKeeper 時均從子helper裡的方法。避免同時有客戶端new zookeeper。
void closeAndReset() throws Exception
{
internalClose();
// first helper is synchronized when getZooKeeper is called. Subsequent calls
// are not synchronized.
helper = new Helper()
{
private volatile ZooKeeper zooKeeperHandle = null;
private volatile String connectionString = null;
@Override
public ZooKeeper getZooKeeper() throws Exception
{
synchronized(this)
{
if ( zooKeeperHandle == null )
{
connectionString = ensembleProvider.getConnectionString();
zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
}
helper = new Helper()
{
@Override
public ZooKeeper getZooKeeper() throws Exception
{
return zooKeeperHandle;
}
@Override
public String getConnectionString()
{
return connectionString;
}
};
return zooKeeperHandle;
}
}
@Override
public String getConnectionString()
{
return connectionString;
}
};
}
ZooKeeper getZooKeeper() throws Exception
{
return (helper != null) ? helper.getZooKeeper() : null;
}