Curator是基于Zookeeper Client的一套高级API集, 其主要目的是隐藏Zookeeper复杂的连接管理细节,其实现了一些基本的工具,如锁, 缓存等,像 create, delete, getData等操作,允许我们链式调用, 被称为fluent,同时也提供了如 命名空间, 自动重连等功能。本文将使用 Curator来实现之前的Master。
/**
* connectString: zookeeper服务器列表
* retryPolicy: 当发生连接失败时,采取的策略
*/
CuratorFramework zkc = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
zk.create("/mypath", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
zkc.create().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
zkc.create().inBackground().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
zkc.getData().inBackground().watched().forPath("/mypath");
CuratorListener masterListener = new CuratorListener() {
public void eventReceived(CuratorFramework client, CuratorEvent event) {
try {
switch (event.getType()) {
case CHILDREN:
//...
break;
case CREATE:
//...
break;
case DELETE:
//...
break;
case WATCHED:
//...
break;
}
} catch(Exception e){
//...
}
}
}
client = CuratorFrameworkFactory.newClient(hostPort, retryPolicy);
client.getCuratorListenable().addListener(masterListener);
UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
public void unhandledError(String message, Throwable e) {
LOG.error("Unrecoverable error: " + message, e);
try {
//...
} catch (IOException ioe) {
LOG.warn( "Exception when closing.", ioe );
}
}
};
client.getUnhandledErrorListenable().addListener(errorsListener);
有序节点: 如果Client在获取到Server返回的有序节点的名称之前,Server崩溃或者Client连接断开了, 那么Client将不能知道创建的节点的名称,这时可能Client需要重新发起调用,为了解决这个问题, CreateBuilder提供了 withProtection方法, 来告诉Curator客户端用一个唯一标识符作为有序节点的前缀,当Client创建有序节点失败,重试的时候, 就能根据该标识符判断有序节点是否创建成功。
删除保证: 同样,当Client执行delete操作后断开了连接, 那么将不能知道该节点是否被成功删除, 当该节点的存在性关乎到某些资源的请求和释放时, 这就比较关键了,Curator客户端提供了一个 DeleteBuilder接口来保证节点被成功删除(只要Curator客户端实例是有效的)。
leaderLatch = new LeaderLatch(client, "/master", myId);
/**
* 当Client变成Leader时调用
*/
public void isLeader() {
try{
// 启动wokers
workersCache.getListenable().addListener(workersCacheListener);
workersCache.start();
// 恢复未分配的任务
(new RecoveredAssignments(client.getZookeeperClient().getZooKeeper())).recover( new RecoveryCallback() {
public void recoveryComplete (int rc, List<String> tasks) {
try{
if(rc == RecoveryCallback.FAILED) {
LOG.warn("Recovery of assigned tasks failed.");
} else {
LOG.info( "Assigning recovered tasks" );
// 等待恢复的任务数,设置一个Latch
recoveryLatch = new CountDownLatch(tasks.size());
// 分配任务
assignTasks(tasks);
}
new Thread( new Runnable() {
public void run() {
try{
// 等待所有任务恢复完成
recoveryLatch.await();
// 启动任务,并监听任务缓存
tasksCache.getListenable().addListener(tasksCacheListener);
tasksCache.start();
} catch (Exception e) {
LOG.warn("Exception while assigning and getting tasks.", e );
}
}
}).start();
} catch (Exception e) {
LOG.error("Exception while executing the recovery callback", e);
}
}
});
} catch (Exception e) {
LOG.error("Exception when starting leadership", e);
}
}
/**
* Client未能成为Leader时调用
*/
@Override
public void notLeader() {
LOG.info( "Lost leadership" );
try{
close();
} catch (IOException e) {
LOG.warn("Exception while closing", e);
}
}
public void runForMaster() throws Exception {
// 注册监听器
client.getCuratorListenable().addListener(masterListener);
client.getUnhandledErrorListenable().addListener(errorsListener);
LOG.info( "Starting master selection: " + myId);
// 开始Master选举
leaderLatch.addListener(this);
leaderLatch.start();
}
leaderSelector = new LeaderSelector(client, "/master", this);
/**
* 获取到领导权后被调用
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
// 启动Worker
workersCache.getListenable().addListener(workersCacheListener);
workersCache.start();
// 恢复之前Master未分配的任务
(new RecoveredAssignments(client.getZookeeperClient().getZooKeeper())).recover( new RecoveryCallback() {
public void recoveryComplete (int rc, List<String> tasks) {
try{
if(rc == RecoveryCallback.FAILED) {
LOG.warn("Recovery of assigned tasks failed.");
} else {
LOG.info( "Assigning recovered tasks" );
recoveryLatch = new CountDownLatch(tasks.size());
assignTasks(tasks);
}
new Thread( new Runnable() {
public void run() {
try{
//等待所有任务恢复完成
recoveryLatch.await();
// 启动任务
tasksCache.getListenable().addListener(tasksCacheListener);
tasksCache.start();
} catch (Exception e) {
LOG.warn("Exception while assigning and getting tasks.", e );
}
}
}).start();
// 表明Client获取到了Leader权
leaderLatch.countDown();
} catch (Exception e) {
LOG.error("Exception while executing the recovery callback", e);
}
}
});
// 阻止Master关闭前,就释放了Leader权
closeLatch.await();
}
/**
* 连接状态发生变化时被调用
*/
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
switch(newState){
case CONNECTED:
//Nothing to do in this case.
break;
case RECONNECTED:
// 再次成为Leader
break;
case SUSPENDED:
break;
case LOST:
try{
// 断开连接,就关闭自己
close();
} catch (IOException e) {
LOG.warn( "Exception while closing", e );
}
break;
case READ_ONLY:
// ...
break;
}
}
public void runForMaster() {
client.getCuratorListenable().addListener(masterListener);
client.getUnhandledErrorListenable().addListener(errorsListener);
leaderSelector.setId(myId);
leaderSelector.start();
}
PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
if(event.getType() == Type.CHILD_REMOVED) {
try{
// 将分配给该Worker的task列表重新进行分配
getAbsentWorkerTasks(event.getData().getPath().replaceFirst("/workers/", ""));
} catch (Exception e) {
LOG.error("Exception while trying to re-assign tasks", e);
}
}
}
};
PathChildrenCacheListener tasksCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
if(event.getType() == Type.CHILD_ADDED) {
try{
assignTask(event.getData().getPath().replaceFirst("/tasks/", ""),
event.getData().getData());
} catch (Exception e) {
LOG.error("Exception when assigning task.", e);
}
}
}
};