前两天被问到有关Zookeeper的一些问题,但确没有很好的回答上来。 之前也接触过Zookeeper,比如使用其Watcher实现一些通知, 或者Leader选举功能,也可以作为如Dubbo服务框架的注册中心等, 但是对Zookeeper的运作原理等是不太了解的,也没有认真仔细研究过, 特此希望通过阅读和记录文章,深入理解一番Zookeeper,觉得其中很多分布式的思想,会对自己有很大的帮助。
git clone [email protected]:apache/zookeeper.git
cd zookeeper
ant eclipse
就像螺丝刀对螺丝,螺丝刀可以转动螺丝,但这并不能完全说明它的功能,更重要的是它可以组装我们的家具和电子设备等。 而Zookeeper则可以在 分布式系统中帮助我们协调任务。
在典型的Master-Worker架构中, Worker需要告诉Master自己可以接受任务, 然后Master将分配任务给Worker。 又例如,在多线程上下文中有用的同步原语在分布式系统中同样也有用,但有一个重要的区别, 在无共享架构中,不同的机器除了共享网络外,不会共享其他任何东西。 虽然有很多消息传递算法可以实现同步原语,但是通过一些提供共享存储和有序属性的组件来实现会更容易一些, Zookeeper就是这样做的。
协调也不总是同步原语的形式, 比如Leader选举或者锁, 而元数据配置通常用来实现传递别人应该做什么,比如,Master-Worker架构中, Worker需要知道分配给它的任务,即便Master崩溃了。
1. Apache HBase。HBase中,Zookeeper用于选举集群Master,跟踪可用的Server,和保存集群元数据。
2. Apache Kafka。Kafka中,Zookeeper用于崩溃检测,实现Topic发现,和维护Topic的生产和消费状态。
3. Apache Solr。Solr中,Zookeeper用于存储集群的元数据信息及协调元数据的更新。
4. Yahoo!Fetching Server。Fetching Service中,Zookeeper用于Master选举,崩溃检测,元数据保存。
5. Facebook Messages。Messages中,Zookeeper用于实现分片和故障迁移的控制器,和服务发现。
...
1. 强一致,有序性和持久化操作。
2. 典型的同步原语实现。
3. 在分布式系统中,更容易处理往往不正确的并发行为。
Zookeeper不适合用于大容量存储。对于大容量存储 ,我们完全可以考虑使用数据库或者分布式文件系统等,重要的是,我应该将业务数据 和 用于协调和控制的数据分离。Zookeeper本身并没有提供如 Master选举,跟踪活动进程的功能,而是提供一些实现这些功能的工具。
1. Master崩溃。Master一旦崩溃,系统将不能在分配新任务或者重新分配来自Worker的任务。
2. Worker崩溃。Worker一旦崩溃,那么分配给它的任务将不能被完成。
3. 通信失败。Master和Worker之间不能交换信息,Worker将不能学习分配给它的新任务。
1. Master选举。让可用的Master分配任务给Worker的过程是至关重要的。
2. 崩溃检测。Master必须能检测Worker什么时候崩溃或者不可连接。
3. 组成员管理。Master必须能知道哪些Worker可以执行任务。
4. 元数据管理。Master和Worker必须能以可靠的方式存储任务分配和执行状态的信息。
so easily, in fact, that some developers use it without completely understanding some of the cases that require the developer to make decisions that ZooKeeper cannot make by itself. One of the purposes of writing this book is to make sure that developers understand what they need to do to use ZooKeeper effectively and why they need to do it that way.
1. /workers节点下的是所有worker节点,图中表明foo.com:2181的worker节点可用。
若worker不可用了,将从/workers下移除。
2. /tasks节点下的是所有创建的和等待被执行的任务节点,每当客户端向Master提交任务时,将在/tasks下添加新的任务节点。
3. /assign节点下的是分配给woker节点的任务。
create /path data
创建节点/path, 并存储数据data
delete /path
删除节点/path
exists /path
检查节点/path是否存在
setData /path data
设置节点/path的数据
getData /path
获取节点/path的数据
getChildren /path
获取节点/path的子节点
持久节点只能通过delete调用才能删除, 可用于存储一些应用数据(即使它的创建者不存在了,但这些数据依然需要保存),如Master-Worker中的任务分配信息。 而临时节点,会在客户端崩溃或断开连接时被删除,或者通过delete调用。比如, 在Master-Worker中的Master和Worker节点均采用临时节点,当Master崩溃或连接断开, 该节点将自动删除,既而才能进行Master选举,临时节点暂时还不支持有子节点,即便以后支持, 其子节点也必须是临时节点。
一个Znode可以被设置为sequential,连续节点将被分配一个唯一且连续的整数,
该整数会追加到节点的path后,如客户端创建一个path为/task/task-的连续节点,最终节点的path会变成/task/task-1。
综上,Zookeeper节点支持四种模式:
持久节点,临时节点,持久连续节点,临时连续节点。
1. 客户端c1对节点/tasks设置了Watch
2. 客户端c2在节点/tasks添加了新节点
3. 客户端c1收到了通知
4. 客户端c1设置新的Watch, 但在设置之前, 客户端c3在节点/tasks添加了新的节点
5. 此时客户端c1就丢失了来自客户端c3的更新通知
Quorum模式下,Zookeeper复制树节点数据到所有的Server。 但是不是说需要等待所有数据复制到所有Server后,才能响应客户端请求继续,往往quorum会比较小,比如,我们的Zookeeper集群有 5个Server,而quorum=3,那么只要有3个Server已经有复制的数据了,就能让客户端继续操作,而其他两个Server最终会同步到数据。 大多数情况下,我们容忍小于N/2个Server崩溃,集群建议用奇数个Server组成。
tar -xvzf zookeeper-3.4.5.tar.gz
cd zookeeper-3.4.6
# zookeeper配置文件
mv conf/zoo_sample.cfg conf/zoo.cfg
# 设置zoo.cfg的dataDir
dataDir=~/path/to/data
# 启动zookeeper,start-foreground表示ZK运行在前台
bin/zkServer.sh [start-foreground]
# 运行客户端
bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[dubbo, zookeeper, pampas, pubsub]
[zk: localhost:2181(CONNECTED) 1] create /workers ""
Created /workers
[zk: localhost:2181(CONNECTED) 2] ls /
[dubbo, zookeeper, workers, pampas, pubsub]
[zk: localhost:2181(CONNECTED) 3] delete /workers
[zk: localhost:2181(CONNECTED) 4] ls /
[dubbo, zookeeper, pampas, pubsub]
[zk: localhost:2181(CONNECTED) 5] quit
# 停止Server
bin/zkServer.sh stop
1. Client初始化连接,状态转为CONNECTING(①)
2. Client与Server成功建立连接,状态转为CONNECTED(②)
3. Client丢失了与Server的连接或者没有接受到Server的响应,状态转为CONNECTING(③)
4. Client连上另外的Server或连接上了之前的Server,状态转为CONNECTED(②)
5. 若会话过期(是Server负责声明会话过期,而不是Client ),状态转为CLOSED(⑤),状态转为CLOSED
6. Client也可以主动关系会话(④),状态转为CLOSED
在Client创建会话时,可以指定超时时间T,若Server在T内没有收到客户端任何消息,将认为会话过期。 而对于Client,若在在T/3时没有收到Server的响应,将发送一个心跳信息给Server, 在2/3T时刻还是没有收到Server的响应,则会尝试重连其他Server(这样就还有T/3时间查找另外的Server)。
在Client重连不同的Server时,该Server的状态必须同该Client观察到的Server状态一致(或者更新),这一点很重要。 Client不能连接上状态过时的Server(没有看到该Client已经看到的服务器状态变化)。在Zookeeper实现中, 系统会为每一个更新连接分配一个有序的事务标识符(zxid),这样就能保证Client不会重连上比自己过时的Server,如图
# 心跳间隔时间(ms)
tickTime=2000
# Follower与Leader初始化时能容忍的时间(initLimit * tickTime)
initLimit=10
# Follower与Leader请求响应时能容忍的时间(syncLimit * tickTime)
syncLimit=5
# server1数据目录,每个server各自配置
dataDir=/path/to/server1/data
# 客户端连接端口
clientPort=2182
# server列表:
# .n(标记server)=(server地址):(投票时所用端口):(选举Leader时所用端口)
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2990:3990
server.3=127.0.0.1:2991:3991
server1/zookeeper-3.4.6/bin/zkServer.sh start-foreground
# 输出
2015-03-02 19:56:47,485 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2182:QuorumPeer@714] - LOOKING
2015-03-02 19:56:47,486 [myid:1] - INFO [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2182:FastLeaderElection@815] - New election. My id = 1, proposed zxid=0x1a00000006
2015-03-02 19:56:47,488 [myid:1] - INFO [WorkerReceiver[myid=1]:FastLeaderElection@597] - Notification: 1 (message format version), 1 (n.leader), 0x1a00000006 (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x1a (n.peerEpoch) LOOKING (my state)
2015-03-02 19:56:47,510 [myid:1] - WARN [WorkerSender[myid=1]:QuorumCnxManager@382] - Cannot open channel to 2 at election address /127.0.0.1:3990
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.toSend(QuorumCnxManager.java:341)
at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.process(FastLeaderElection.java:449)
at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.run(FastLeaderElection.java:430)
at java.lang.Thread.run(Thread.java:744)
2015-03-02 19:56:47,560 [myid:1] - WARN [WorkerSender[myid=1]:QuorumCnxManager@382] - Cannot open channel to 3 at election address /127.0.0.1:3991
2015-03-02 20:02:41,487 [myid:1] - INFO [/127.0.0.1:3889:QuorumCnxManager$Listener@511] - Received connection request /127.0.0.1:65469
2015-03-02 20:02:41,492 [myid:1] - INFO [WorkerReceiver[myid=1]:FastLeaderElection@597] - Notification: 1 (message format version), 3 (n.leader), 0x1a00000006 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x1a (n.peerEpoch) FOLLOWING (my state)
server1/zookeeper-3.4.6/bin/zkCli.sh -server 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184
虽然Zookeeper没有原生提供锁操作,但是通过Zookeeper提供的一些API, 较容易实现分布式锁。我们可以利用临时节点来实现,多个进程都尝试创键临时节点/lock, 但最终只会有一个进程P能创建成功,而其他没能创建成功的进程,可以在节点/lock上Watch(相当于等待锁释放), 一旦进程P处理完事务,断开连接,节点/lock被自动删除,其他进程将得到通知,进而继续创建节点/lock,以争得锁资源。 (这里使用临时节点,是为了防止获得锁的进程突然崩溃而没有释放锁,导致死锁发生)。
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 3] create -e /master "master1.example.com:2223"
Created /master
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 4] ls /
[zookeeper, master]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 5] get /master
"master1.example.com:2223"
cZxid = 0x1b00000002
ctime = Mon Mar 02 20:39:06 CST 2015
mZxid = 0x1b00000002
mtime = Mon Mar 02 20:39:06 CST 2015
pZxid = 0x1b00000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x34bda5de5120000
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 1] create -e /master "master2.example.com:2223"
Node already exists: /master
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 1] create -e /master "master2.example.com:2223"
Node already exists: /master
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 2] stat /master true
cZxid = 0x1b00000002
ctime = Mon Mar 02 20:39:06 CST 2015
mZxid = 0x1b00000002
mtime = Mon Mar 02 20:39:06 CST 2015
pZxid = 0x1b00000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x34bda5de5120000
dataLength = 26
numChildren = 0
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 0] ls /
[zookeeper, master]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 1] create -e /master "master2.example.com:2223"
Node already exists: /master
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 2] stat /master true
cZxid = 0x1b00000002
ctime = Mon Mar 02 20:39:06 CST 2015
mZxid = 0x1b00000002
mtime = Mon Mar 02 20:39:06 CST 2015
pZxid = 0x1b00000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x34bda5de5120000
dataLength = 26
numChildren = 0
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 3]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDeleted path:/master
ls /
[zookeeper]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 4] create -e /master "master2.example.com:2223"
Created /master
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 5] ls /
[zookeeper, master]
zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 6] create /workers ""
Created /workers
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 7] create /tasks ""
Created /tasks
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 8] create /assign ""
Created /assign
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 9] ls /
[zookeeper, workers, tasks, master, assign]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 10] ls /workers true
[]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 11] ls /tasks true
[]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 0] create -e /workers/worker1.example.com "worker1.example.com:2224"
Created /workers/worker1.example.com
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 12]
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/workers
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 1] create /assign/worker1.example.com ""
Created /assign/worker1.example.com
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 2] ls /assign/worker1.example.com true
[]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 3]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 0] create -s /tasks/task- "cmd"
Created /tasks/task-0000000000
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 1] ls /tasks/task-0000000000 true
[]
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/tasks
ls /tasks
[task-0000000000]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 13] ls /workers
[worker1.example.com]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 14] create /assign/worker1.example.com/task-0000000000 ""
Created /assign/worker1.example.com/task-0000000000
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/assign/worker1.example.com
ls /assign/worker1.example.com
[task-0000000000]
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 4] create /tasks/task-0000000000/status "done"
Created /tasks/task-0000000000/status
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/tasks/task-0000000000
get /tasks/task-0000000000
"cmd"
cZxid = 0x1b0000000e
ctime = Mon Mar 02 23:26:43 CST 2015
mZxid = 0x1b0000000e
mtime = Mon Mar 02 23:26:43 CST 2015
pZxid = 0x1b00000010
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 1
[zk: 127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184(CONNECTED) 3] get /tasks/task-0000000000/status
"done"
cZxid = 0x1b00000010
ctime = Mon Mar 02 23:52:18 CST 2015
mZxid = 0x1b00000010
mtime = Mon Mar 02 23:52:18 CST 2015
pZxid = 0x1b00000010
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
以上就是整个Master-Worker架构的主要工作机制,虽然只是一个模拟过程, 但是对我们理解Master-Worker工作原理是很有帮助的,对以后要研究代码实现,也是一个很好的铺垫。