本文将讲述如何使用Zookeeper Client API进行编程,
以在分布式应用中实现协调功能,
Zookeeper支持Java和C的API,本文主要使用Java语言。
-
使用Zookeeper API
-
首先添加Maven依赖
-
创建Zookeeper会话
-
Zookeeper API会围绕一个Zookeeper 句柄,通过该句柄来调用各种API,该句柄也就是
Zookeeper会话。如果会话连接断开了,该会话会被迁移到另一台Server上(如图),只要会话仍然存活着,
该句柄将一直有效,Client就会持续保持和Server的连接,若句柄被关掉,Client将告诉Server关掉会话。
-
Zookeeper提供了一个创建句柄的构造器
-
实现Watcher
-
为了收到Zookeeper的通知,我们需要实现Watcher接口
-
比如,我们可以实现一个Master
-
运行上面的程序则会看到如下日志
-
然而,当我们这时将server停掉,Client会尝试重新连接,并且Client也没有收到任何事件
-
再次启动server,Client再次接收到SyncConnected事件
-
既然Zookeeper Client能够自己尝试尽可能连接上Server,所以对于我们,尽量不要亲自去管理连接。
-
我们可以通过telnet观察下Zookeeper状态,可以看到两个Client,一个是之前Master程序,一个就是当前telnet会话。
-
同样,我们还可以通过dump查看一些会话信息,可以看到有一个会话(Master)
-
如果现在我们关掉Master,其实会话并不会马上销毁,而会等到超时。最终才没有了会话
-
当然我们也可以手动关闭连接,而不是等到超时
-
获取领导权(Mastering)
-
现在我们希望Master能够获取到领导权,
之前的文章讲述过,我们需要创建一个临时节点如/master,若该节点已经存在,
说明Master已经存在,否则自己将成为Master。
-
这里,我们创建节点时,指明了节点的权限控制
OPEN_ACL_UNSAFE,这并不安全,但对于例子足以,
在创建节点时,我们捕获了NodeExistsException异常,
表明Master已经存在,而ConnectionLossException异常,
该异常通常由于网络错误,Server崩溃导致,这时Client并不知道请求是否被Server处理,我们不做处理,
而是让Client API尝试重新连接。InterruptedException
异常是由客户端线程导致,可交由外部去处理。接着我们通过getData()
来获取节点数据
-
我们启动两个进程
-
将看到输出
-
异步获取领导权(Mastering)
-
Zookeeper的API都会有一个异步的版本,如创建节点
-
该方法会立即返回,只是将Server请求放入队列中,请求过程放在另一个线程中,
当收到Server响应时,这些响应将在一个专门的回调线程中处理(
因此,你不应该在回调中做太重的操作,或者执行阻塞操作,这会严重影响后续的回调执行),
此方法不会抛出异常,任何请求错误将编码为回调的第一个参数,回调形式为
-
将上面的实现改为异步回调后
-
构建元数据
-
Master-Worker架构中,我需要初始化一些元数据,如
/workers,
/tasks,
/assign
-
注册工作者(Worker)
-
接下来我们会在/workers建立起可用的Worker节点,Worker实现
-
为了保证状态更新的最终一致性,上面Worker实现中需要着重注意的是updateStatus()方法,需要使用
synchronized
和status == this.status判断,否则会出现本地和服务器状态不一致的情况,
比如下面的场景
1. Worker开始执行任务task-1,因此调用
setStatus("working")(即this.status = "working")。
2. Client API发起setData()调用,但是此时发生网络错误。
3. 于是Client API认为与Server失去连接,但在调用statusUpdateCallback回调之前,
Worker完成了任务,变得空闲。
4. 因此调用setStatus("idle")(即this.status = "idle")。
5. 然而,现在Client开始处理第二步中产生的网络错误回调statusUpdateCallback,
若不判断status == this.status,Worker的状态将被设置为
"working",这就会造成Client和Server状态不一致。
-
似乎觉得将synchronized放到setStatus()前更合理一些,
会不会出现setStatus("working")后,执行第一条this.status = status之前,
CPU时间片切换,Worker就完成了任务,并setStatus("idle"),即
setData("idle")调用在setData("working")之前。
-
任务排队
-
最后的组件则是Client,
我们将把任务创建在/tasks节点下,
并且为每个任务分配唯一的序号。
-
创建任务节点时,如果创建失败,我们就尝试重新创建,但当有多个应用都在创建该任务时,
有可能同一个任务会被创建多次,若要控制某个任务最多创建一次,可以为任务分配唯一的ID。
-
管理员Client
-
我们还需要一个管理员Client来查看系统的状态,通常使用getData()和getChildren()方法。
-
处理状态变化
-
应用需要关注Zookeeper的状态变化,这并不少见,比如
Master-Worker架构中,
备份master需要知道主master是否崩溃,
Workers需要知道是否有新的任务分配给自己,在Zookeeper中,
可以通过Watcher机制来实现状态变化监听。
-
一次性触发
-
Watches是连接
Znode和事件类型
(Znode数据变化,或节点被删除)的一种一次性触发器。
Watches是同Client的连接会话想关联的,一旦会话过期了,
该会话创建的Watches将被移除。
Watches可以在多个Server之前跨连接,
比如,当Client与集群中的一个Server断开,然后重连到另一个Server,Client将发送未处理的Watches
到该Server,当注册一个Watch时,该Server会检查该Watch
监听的节点是否发生变化,如发生变化,将通知该Client,否则Client就将该Watch注册到新的Server上。
-
一次性触发器会丢失事件吗?
-
答案是肯定的。然而,这种问题值得讨论。
但对于在收到通知
和注册新的Watch
之间丢失事件,我们都可以通过主动读取Zookeeper状态来解决。
比如,Worker收到有新的任务分配通知后,
可以通过getChildren()获取所有分配的任务(即使在收到通知后,又有新任务分配),
再对任务分配节点进行Watch。
-
如何设置Watches
-
Zookeeper API所有读操作--
getData(),
getChildren(),
exists()
都提供Watch的设置,
为了实现watch机制,
我们需要实现Watcher接口,
实现process()方法
-
WatchedEvent主要属性
-
事件状态KeeperState分为
-
事件类型EventType分为
-
通用模式
-
在Zookeeper应用中,一个比较通用的代码片段
-
Master-Worker案例
-
Master-Worker架构中涉及到几个主要状态变化的地方
1. 领导权(Master)改变
2. Master监听Worker列表改变
3. Master监听新的任务提交
4. Worker监听任务分配
5. Client等待任务执行结果
-
领导权(Master)改变,
当主Master崩溃时,
备份Master需要立即转变为
主Master,
所以备份Master需要监听
主Master节点删除时,
自己就要尝试创建创建主Master节点。
为了设置监听,我们创建一个叫masterExistsWatcher的监听器
-
下图将更清晰阐述创建master时,可能出现的操作
(a):成功创建主Master,不许做其他事情。
(b):如果由于Master存在而创建失败,将对master节点进行监听。
(c):在
runForMaster()和正在执行exists()
之间,master节点有可能被删除,这种情况下,回调masterCreateCb
就可能认为master节点仍然存在,于是仅仅调用
masterExists()来监听master节点的存在性,
进而会继续在回调masterExistsCallback中,
执行runForMaster()尝试创建master节点,
若创建master节点成功,masterExistsWatcher将得到通知,
但这并没有什么意义,因为是自己创建master造成了变化。
(d):若(c)中创建master节点失败,
我们又继续监听Master的存在性。
-
Master监听Worker列表变化
-
Master需要监听Worker列表的变化,以便分配新任务或者重新分配未完成的任务。
可以通过getChildren()监听Worker列表变化
-
Master分配新任务
-
当Master一旦发现有新的任务提交时,则会选择一个Worker,并把该任务分配给这个Worker,
所以我们也需要监听任务的变化
-
Worker等待任务分配
-
之前我们已经将Worker注册自己到Zookeeper集群中
-
Worker还需要监听分配给自己的新任务
-
客户端监听任务执行结果
-
当Client提交任务后,需要知道任务被执行了及其状态。
当Worker执行一个任务时,
会在/status下创建任务执行状态节点。
-
另一种方式: Multiop
-
Multiop并不是Zookeeper的原生设计,
但是在3.4.0时加入到了Zookeeper中。
Multiop可以原子地执行多条Zookeeper操作,
这些操作要么都执行成功,或者都执行失败。通常,我们首先创建操作列表,然后执行
multi,最后获取操作结果列表,如
-
当然也有对应的异步调用方式
-
同样,可以使用Transaction来执行多个操作
-
其中commit()也支持异步调用
-
对于之前Master分配任务时,需要删除
/tasks下的任务节点,并在
/assign下创建任务分配节点,
这里就可以使用multi来完成。
-
使用Watches替代显示的缓存管理器
-
通常我们不愿意Client需要获取Zookeeper数据时,每次都通过
API去读取,
这既费时又消耗服务端资源,最好是在Client做一些数据缓存,
当服务端数据发生变化时,则通知Client作数据更新操作(这也是典型的观察者模式)
-
时序保证
-
当实现Zookeeper应用时,有几个关键的问题需要注意
-
写时序
-
Zookeeper状态是跨集群中所有的Server进行复制的,所有Server的状态改变时序应该是保持一致的。
比如集群中一个Server先创建了一个节点/z,
然后删除了一个节点/z',那么集群中的其他Server也会以相同的时序更新状态。
-
读时序
-
在实际中,多个Client可能在不同的时刻看到Zookeeper状态的变化,比如
1. Client c1更新了节点/z的数据,并收到了确认信息。
2. Client c1直接通过TCP发送信息给Client c2,通知c2节点/z状态发生了改变。
3. Client c2读取节点/z的状态,但是确观察到c1更新之前的状态
-
就像如图所示的问题
-
要解决这样的问题,我们应该尽量通过
Watcher来同步状态的更新,
而不是让Client之间直接通信。
-
通知时序
-
Zookeeper通知时序是根据状态更新的时序来触发通知的。
-
Watches的羊群效应和可伸缩性
-
如果1000个Client都监听这某个节点,那么当对应的事件被触发时,
将会为这1000个Client发出通知,这无疑会增大延迟操作。
所以对某节点的的某事件监听,最好Client不要太多。