Zookeeper深入理解(二)(编程实践之高级API:Curator)
2015 年 03 月 20 日
zookeeper

    Curator是基于Zookeeper Client的一套高级API集, 其主要目的是隐藏Zookeeper复杂的连接管理细节,其实现了一些基本的工具,如缓存等,像 createdeletegetData等操作,允许我们链式调用, 被称为fluent,同时也提供了如 命名空间自动重连等功能。本文将使用 Curator来实现之前的Master

  • Curator客户端

  • 在使用Curator客户端前,我们需要先通过工厂方法创建客户端
  • /**
     * connectString: zookeeper服务器列表
     * retryPolicy: 当发生连接失败时,采取的策略
     */
    CuratorFramework zkc = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
        
  • Fluent API

  • Fluent API允许我们执行链式调用。比如之前, 我们创建节点的操作为
  • zk.create("/mypath", new byte[0],
                ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        
  • 而通过Fluent API大概为
  • zkc.create().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
        
  • create()方法将返回一个 CreateBuilder实例, 后续调用将会对CreateBuilder进行扩展, Builders也支持deletesetDatacheckExistsgetChildren等操作。 同时,我们也可以通过inBackground()方法进行异步调用
  • zkc.create().inBackground().withMode(CreateMode.PERSISTENT).forPath("/mypath", new byte[0]);
        
  • 我们也可以通过watched()方法进行进行监听
  • zkc.getData().inBackground().watched().forPath("/mypath");
        
  • 监听器

  • 监听器将处理Curator库生成的事件,我们仅需要将通过Curator客户端注册这些监听器即可。 我们需要实现一个CuratorListener
  • CuratorListener masterListener = new CuratorListener() {
    
        public void eventReceived(CuratorFramework client, CuratorEvent event) {
            try {
                switch (event.getType()) {
                    case CHILDREN:
                        //...
                        break;
                    case CREATE:
                        //...
                        break;
                    case DELETE:
                        //...
                        break;
                    case WATCHED:
                        //...
                        break;
                }
            } catch(Exception e){
                //...
            }
        }
    }
        
  • 然后我们需要注册监听器
  • client = CuratorFrameworkFactory.newClient(hostPort, retryPolicy);
    client.getCuratorListenable().addListener(masterListener);
        
  • 有一个专门用于监听后台线程发生异常时的监听器UnhandledErrorListener
  • UnhandledErrorListener errorsListener = new UnhandledErrorListener() {
        public void unhandledError(String message, Throwable e) {
            LOG.error("Unrecoverable error: " + message, e);
            try {
                //...
            } catch (IOException ioe) {
                LOG.warn( "Exception when closing.", ioe );
            }
        }
    };
    client.getUnhandledErrorListenable().addListener(errorsListener);
        
  • Curator中的状态改变

  • Curator暴露了一些不同于Zookeeper的状态集,比如有 SUSPENDED状态, 使用LOST状态表示会话过期,下面是Curator中的连接状态机
  • 还有一个额外的READ_ONLY状态, 但和我们的案例无关,当Zookeeper只读模式被开启,并且Client连接的Server进入只读模式时, 连接将变成只读的,只读的Server将不能参与投票, 这时,Client将丢失任何Zookeeper集群的状态更新。 如果Client发出了更新操作,并且集群中的部分Server能够达到投票数, 那么这种更新操作是可以的。
  • 两个边界问题

  • 有两个有意思的错误,Curator处理得很好。 第一个是创建有序节点时出现的错误, 第二个则是删除节点时发生的错误。
  • 有序节点: 如果Client在获取到Server返回的有序节点的名称之前,Server崩溃或者Client连接断开了, 那么Client将不能知道创建的节点的名称,这时可能Client需要重新发起调用,为了解决这个问题, CreateBuilder提供了 withProtection方法, 来告诉Curator客户端用一个唯一标识符作为有序节点的前缀,当Client创建有序节点失败,重试的时候, 就能根据该标识符判断有序节点是否创建成功。

    删除保证: 同样,当Client执行delete操作后断开了连接, 那么将不能知道该节点是否被成功删除, 当该节点的存在性关乎到某些资源的请求和释放时, 这就比较关键了,Curator客户端提供了一个 DeleteBuilder接口来保证节点被成功删除(只要Curator客户端实例是有效的)。

  • 代码实例

  • Curator提供了多种工具,可见这里。这里主要介绍 LeaderLatchLeaderSelectorPathChildrenCache
  • Leader Latch

  • LeaderLatch可用于选举Master,如
  • leaderLatch = new LeaderLatch(client, "/master", myId);
        
  • 之后我们还需要注册一个LeaderLatchListener监听器, 实现isLeadernotLeader 方法
  • /**
     * 当Client变成Leader时调用
     */
    public void isLeader() {            
        try{
            // 启动wokers
            workersCache.getListenable().addListener(workersCacheListener);
            workersCache.start();
            // 恢复未分配的任务
            (new RecoveredAssignments(client.getZookeeperClient().getZooKeeper())).recover( new RecoveryCallback() {
                public void recoveryComplete (int rc, List<String> tasks) {
                    try{
                        if(rc == RecoveryCallback.FAILED) {
                            LOG.warn("Recovery of assigned tasks failed.");
                        } else {
                            LOG.info( "Assigning recovered tasks" );
                            // 等待恢复的任务数,设置一个Latch
                            recoveryLatch = new CountDownLatch(tasks.size());
                            // 分配任务
                            assignTasks(tasks);
                        }
    
                        new Thread( new Runnable() {
                            public void run() {
                                try{
                                    // 等待所有任务恢复完成
                                    recoveryLatch.await();
                                    // 启动任务,并监听任务缓存
                                    tasksCache.getListenable().addListener(tasksCacheListener);
                                    tasksCache.start();
                                } catch (Exception e) {
                                    LOG.warn("Exception while assigning and getting tasks.", e  );
                                }
                            }
                        }).start();
                    } catch (Exception e) {
                        LOG.error("Exception while executing the recovery callback", e);
                    }
                }
            });
        } catch (Exception e) {
            LOG.error("Exception when starting leadership", e);
        }
    }
    
    /**
     * Client未能成为Leader时调用
     */
    @Override
    public void notLeader() {
        LOG.info( "Lost leadership" );
        try{
            close();
        } catch (IOException e) {
            LOG.warn("Exception while closing", e);
        }
    }
        
  • 上述代码片段我们放在实现了 LeaderLatchListener接口的 CuratorMasterLatch中, 所以我们还需要注册将自己到LeaderLatch的监听器中
  •  public void runForMaster() throws Exception {
        // 注册监听器
        client.getCuratorListenable().addListener(masterListener);
        client.getUnhandledErrorListenable().addListener(errorsListener);
    
        LOG.info( "Starting master selection: " + myId);
        // 开始Master选举
        leaderLatch.addListener(this);
        leaderLatch.start();
    }
        
  • Leader Selector

  • LeaderSelector也是用来选举Master的。 但其使用的是监听器接口是LeaderSelectorListener, 我们需要实现takeLeadershipstateChanged方法
  • leaderSelector = new LeaderSelector(client, "/master", this);
        
  • 实现takeLeadershipstateChanged
  •   
    /**
     * 获取到领导权后被调用
     */
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        // 启动Worker
        workersCache.getListenable().addListener(workersCacheListener);
        workersCache.start();
    
        // 恢复之前Master未分配的任务
        (new RecoveredAssignments(client.getZookeeperClient().getZooKeeper())).recover( new RecoveryCallback() {
            public void recoveryComplete (int rc, List<String> tasks) {
                try{
                    if(rc == RecoveryCallback.FAILED) {
                        LOG.warn("Recovery of assigned tasks failed.");
                    } else {
                        LOG.info( "Assigning recovered tasks" );
                        recoveryLatch = new CountDownLatch(tasks.size());
                        assignTasks(tasks);
                    }
    
    
                    new Thread( new Runnable() {
                        public void run() {
                            try{
                                //等待所有任务恢复完成
                                recoveryLatch.await();
                                // 启动任务
                                tasksCache.getListenable().addListener(tasksCacheListener);
                                tasksCache.start();
                            } catch (Exception e) {
                                LOG.warn("Exception while assigning and getting tasks.", e  );
                            }
                        }
                    }).start();
    
                    // 表明Client获取到了Leader权
                    leaderLatch.countDown();
                } catch (Exception e) {
                    LOG.error("Exception while executing the recovery callback", e);
                }
            }
        });
    
        // 阻止Master关闭前,就释放了Leader权
        closeLatch.await();
    
    }
    
    /**
     * 连接状态发生变化时被调用
     */
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        switch(newState){
            case CONNECTED:
                //Nothing to do in this case.
                break;
            case RECONNECTED:
                // 再次成为Leader
                break;
            case SUSPENDED:
                
                break;
            case LOST:
                try{
                    // 断开连接,就关闭自己
                    close(); 
                } catch (IOException e) {
                    LOG.warn( "Exception while closing", e );
                }
                break;
            case READ_ONLY:
                // ...
                break;
        } 
    }
        
  • 类似需要runForMaster
  • public void runForMaster() {
        client.getCuratorListenable().addListener(masterListener);
        client.getUnhandledErrorListenable().addListener(errorsListener);
        leaderSelector.setId(myId);
        leaderSelector.start();
    }
        
  • Children Cache

  • 代码中我们使用PathChildrenCache获取worker列表和task列表。 PathChildrenCache会保存一份本地缓存,当有变化时, 会通过我们注册的监听器通知我们。
  • 监听Worker列表有Worker删除时
  •     PathChildrenCacheListener workersCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
                if(event.getType() == Type.CHILD_REMOVED) {
                    try{
                        // 将分配给该Worker的task列表重新进行分配
                        getAbsentWorkerTasks(event.getData().getPath().replaceFirst("/workers/", ""));
                    } catch (Exception e) {
                        LOG.error("Exception while trying to re-assign tasks", e);
                    }
                } 
            }
        };
        
  • 对于任务列表,我们需要监听有新的任务提交时,需要进行分配
  • PathChildrenCacheListener tasksCacheListener = new PathChildrenCacheListener() {
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
            if(event.getType() == Type.CHILD_ADDED) {
                try{
                    assignTask(event.getData().getPath().replaceFirst("/tasks/", ""),
                            event.getData().getData());
                } catch (Exception e) {
                    LOG.error("Exception when assigning task.", e);
                }   
            }
        }
    };
        
  • 以上代码可见 CuratorMasterLatchCuratorMasterSelectorCuratorMaster
好人,一生平安。