我正在使用Apache Curator图书馆在Zookeeper上进行领导选举.我的应用程序代码部署在各种机器中,我需要从一台机器执行我的代码,这就是为什么我在zookeeper上进行领导选举,以便我可以检查我是否是领导者,然后执行此代码.
以下是我的LeaderElectionExecutor类,它确保每个应用程序都有一个策展人实例
public class LeaderElectionExecutor { private ZookeeperClient zookClient; private static final String LEADER_NODE = "/testleader"; private static class Holder { static final LeaderElectionExecutor INSTANCE = new LeaderElectionExecutor(); } public static LeaderElectionExecutor getInstance() { return Holder.INSTANCE; } private LeaderElectionExecutor() { try { String hostname = Utils.getHostName(); String nodes = "host1:2181,host2:2181; zookClient = new ZookeeperClient(nodes,LEADER_NODE,hostname); zookClient.start(); // added sleep specifically for the leader to get selected // since I cannot call isLeader method immediately after starting the latch TimeUnit.MINUTES.sleep(1); } catch (Exception ex) { // logging error System.exit(1); } } public ZookeeperClient getZookClient() { return zookClient; } }
以下是我的ZookeeperClient代码 –
// can this class be improved in any ways? public class ZookeeperClient { private CuratorFramework client; private String latchPath; private String id; private LeaderLatch leaderLatch; public ZookeeperClient(String connString,String latchPath,String id) { client = CuratorFrameworkFactory.newClient(connString,new ExponentialBackoffRetry(1000,Integer.MAX_VALUE)); this.id = id; this.latchPath = latchPath; } public void start() throws Exception { client.start(); leaderLatch = new LeaderLatch(client,latchPath,id); leaderLatch.start(); } public boolean isLeader() { return leaderLatch.hasLeadership(); } public Participant currentLeader() throws Exception { return leaderLatch.getLeader(); } public void close() throws IOException { leaderLatch.close(); client.close(); } public CuratorFramework getClient() { return client; } public String getLatchPath() { return latchPath; } public String getId() { return id; } public LeaderLatch getLeaderLatch() { return leaderLatch; } }
现在在我的应用程序中,我正在使用这样的代码 –
public void method01() { ZookeeperClient zookClient = LeaderElectionExecutor.getInstance().getZookClient(); if (zookClient.isLeader()) { // do something } } public void method02() { ZookeeperClient zookClient = LeaderElectionExecutor.getInstance().getZookClient(); if (zookClient.isLeader()) { // do something } }
问题陈述:-
在策展人库中 – 调用isLeader()在启动锁定后立即不起作用.领导选择需要时间.而且由于这个原因,我已经在我的LeaderElectionExecutor代码中添加了1分钟的睡眠,但是我觉得这不是正确的方法.
有没有更好的办法呢?记住这一点,我需要一种方法来检查我是否是领导者,然后执行这段代码.我不能在一个方法中做所有事情,所以我需要调用来自不同类和方法的isLeader方法来检查我是否是领导者,然后执行这段代码.
我正在使用Zookeeper 3.4.5和Curator 1.7.1版本.
解决方法
一旦我解决了一个非常类似于你的问题.这是我怎么做到的.
首先,我有我的对象由Spring管理.所以我有一个LeaderLatch可以通过容器注入.使用LeaderLatch的组件之一是LeadershipWatcher,一个Runnable接口的实现,将领导事件发送到其他组件.这些最后一个组件是我命名为LeadershipObserver的接口的实现. LeadershipWatcher的实现主要是如下代码:
@Component public class LeadershipWatcher implements Runnable { private final LeaderLatch leaderLatch; private final Collection<LeadershipObserver> leadershipObservers; /* constructor with @Inject */ @Override public void run() { try { leaderLatch.await(); for (LeadershipObserver observer : leadershipObservers) { observer.granted(); } } catch (InterruptedException e) { for (LeadershipObserver observer : leadershipObservers) { observer.interrupted(); } } } }
由于这只是一个草图,我建议您加强此代码,也许应用命令模式调用观察者,甚至将观察者提交到线程池,如果他们的作业阻塞或长时间运行cpu密集型任务.