1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| public class PathCacheExample {
public static void main(String[] args) throws InterruptedException {
ZKClient zkClient1 = register("Server1");
TimeUnit.SECONDS.sleep(3); ZKClient zkClient2 = register("Server2"); ZKClient zkClient3 = register("Server3");
TimeUnit.SECONDS.sleep(3); }
public static ZKClient register(String serverName) throws InterruptedException { ZKClient zkClient = new ZKClient(serverName); Thread thread = new Thread(zkClient); thread.start(); return zkClient; } }
class ZKClient implements Runnable { private static final String PATH = "/ServersCache5";
private CuratorFramework client = null; private PathChildrenCache cache = null; private String servername = null;
public void closeAllService() { closeCuratorFramework(); closePathChildrenCache(); }
public void closeCuratorFramework() { CloseableUtils.closeQuietly(client); }
public void closePathChildrenCache() { CloseableUtils.closeQuietly(cache); }
public ZKClient(String serverName) { this.servername = serverName; try { client = CuratorFrameworkFactory.newClient("0.0.0.0:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); cache = new PathChildrenCache(client, PATH, true); cache.start(); addListener(cache); } catch (Exception e) { e.printStackTrace(); } }
@Override public void run() { create(client, servername, servername); setValue(client, servername, servername); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } remove(client, servername); }
private void addListener(PathChildrenCache cache) { PathChildrenCacheListener listener = (client, event) -> { switch (event.getType()) { case CHILD_ADDED: { printNodeStateChange("added", event.getData().getPath()); break; } case CHILD_UPDATED: { printNodeStateChange("changed", event.getData().getPath()); break; } case CHILD_REMOVED: { printNodeStateChange("removed", event.getData().getPath()); break; } } }; cache.getListenable().addListener(listener); }
private void printNodeStateChange(String type, String path) { System.out.println(servername + " Monitor Node " + type + ": " + path + ". " + new Date().toLocaleString()); }
private static void remove(CuratorFramework client, String pathName) { String path = ZKPaths.makePath(PATH, pathName); try { client.delete().forPath(path); } catch (Exception e) { e.printStackTrace(); } }
private static void create(CuratorFramework client, String pathName, String data) { String path = ZKPaths.makePath(PATH, pathName); try { client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes()); } catch (Exception e) { e.printStackTrace(); } }
private static void setValue(CuratorFramework client, String pathName, String data) { String path = ZKPaths.makePath(PATH, pathName); try { client.setData().forPath(path, data.getBytes()); } catch (Exception e) { e.printStackTrace(); } } }
|