Zookeeper(五)Java客户端节点操作

    xiaoxiao2021-03-26  34

    使用同步API创建一个节点

    package book.chapter import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //ZooKeeper API创建节点,使用同步(sync)接口 public class ZooKeeper_Constructor_Usage_With_SID_PASSWD implements Watcher { private static CountDownLatch connectedSemphore = new CountDownLatch(1); public static void main(String[] args) { ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeeper:2181",5000,// new ZooKeeper_Create_API_Sync_Usage()); connectedSemphore.await(); String path1 = zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Success create znode:"+path1); String path2 = zookeeper.create("/zk-test-ephemeral-","".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("Success create znode:"+path2); } public void process(WatchedEvent event){ if(KeeperState.SyncConnected == event.getState()){ connectedSemphore.countDown(); } } }运行程序,输出结果如下:

    Receive watched event:WatchedEvent state:SyncConnected type:None path:null

    Success create znode:/zk-test-ephemeral-

    Success create znode:/zk-test-ephemeral-0001975508

    在上面这个程序片段中,使用了同步的节点创建接口:String create(final String path,byte data[],List<ACL> acl,CreateMode createMode)。在接口使用中,我们分别创建了两种类型的节点:临时节点和临时顺序节点。从返回的结果可以看出,如果创建了临时节点,那么API的返回值就是当时传入的path参数:如果创建了临时顺序节点,那么ZooKeeper会自动在节点后加上一个数字,并且在API接口的返回值中返回该数据节点的一个完整的节点路径。

    使用异步API创建一个节点

    package book.chapter import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; //ZooKeeper API创建节点,使用异步(async)接口 public class ZooKeeper_Create_API-ASync_Usage implements Watcher{ private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) { ZooKeeper zookeeper = new ZooKeeper("domain1.book.zookeepr:2181",5000,// new ZooKeeper_Create_API_ASync_Usage()); connectedSemaphore.await(); zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL, new IStringCallBack(),"I am contest."); zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(),"I am context."); zookeeper.create("/zk-test-ephemeral-","".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,new ISstringCallback(),"I am context."); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event){ if(KeeperState.SyncConnected == event.getState()){ connectedSemaphore.countDown(); } } } class IStringCallback implements AsyncCallback.StringCallback{ public void proceResult(int rc,String path,Object ctx,String name){ System.out.println("Create path result:【"+rc+","+path+","+"," + ctx+", real path name:"+ name); } } 运行程序,输出结果如下:

    Create path result:[0,/zk-test-ephemeral-,I am context, real path name:/zk-test-ephemeral-

    Create path result:[-110,/zk-test-ephemeral-,I am context.,real path name: null

    Create path result:[0,/zk-test-ephemeral-,I am context., real path name:/zk-test-ephemeral-0001975736

    从这个程序片段中可以看出,使用异步方式创建接口也很简单。用户仅仅需要实现AsyncCallback。StringCallback()接口即可。AsyncCallback包含了StatCallback、DataCalback、ACLCallback、ChildrenCallback、Children2Callback、StringCallback和VoidCallback七种不同的回调接口,用户可以在不同的异步接口中实现不同的接口。

    和同步接口最大的区别在于,节点的创建过程(包括网络通信和服务端的节点创建过程)是异步的。并且,在同步接口调用过程中,我们需要关注接口抛出异常的可能;但是在异步接口中,接口本身是不会抛出异常的、所有的异常都会在回调函数中通过Result Code(响应吗)来实现。

    下面来重点看下回调方法:void processResult(int rc,String path,Object ctx,String name)。这个方法的几个参数主要如表5-4所示。

    表5-4

    参数名说明rcResult Code,服务端响应码。客户端可以从这个响应码中识别出API调用的结果,常见的响应码如下: 0(OK):接口调用成功-4(ConnectionLoss):客户端和服务端连接已断开。-110(NodeExists):指定节点已存在。-112(SessionExpired):会话已过期 path接口调用时传入API的数据节点路径参数值ctx接口调用时传入API的ctx参数值name实际在服务端创建的节点名。在上述代码中,第三次创建节点时,由于创建的节点类型是顺序节点,因此在服务端没有真正创建好顺序节点之前,客户端无法知道节点的完整节点路径。于是,在回调方法中,服务端会返回这个数据节点的完整节点路径。

    4.删除节点

    客户端可以通过ZooKeeper的API来删除一个节点,有如下两个接口:

    public void delete(final String path,int version)

    pulbic void delete(final String path,int version,VoidCallback cb,Object ctx)

    这里列出的两个API分别是同步和异步的删除接口,API方法的参数说明如表5-5所示。

    表5-5

    参数名说明path指定数据节点的节点路径,即API调用的目的是删除该节点version指定节点的数据版本,即表明本次删除操作时针对该数据版本进行的cb注册一个异步回调函数ctx用于传递上下文信息的对象

    删除节点的接口和更新数据的接口是及其相似的,所以这里不再对示例代码做详细讲解,读者可以到book.chapter包下查看示例文件Delete_API_Sync_Usage.java。唯一需要指出的一点是,在ZooKeeper中,只允许删除叶子节点。也就是说,如果一个节点存在至少一个子节点的话,那么该节点将无法被直接删除,必须先删除掉其所有子节点。

    import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; /*4.获取子节点 getChildren有8个重载方法 zk.getChildren(path, watch); zk.getChildren(path, watcher); zk.getChildren(path, watch, stat); zk.getChildren(path, watcher, stat); zk.getChildren(path, watch, cb, ctx); zk.getChildren(path, watcher, cb, ctx); 其中回调函数有两种 ChildrenCallback Children2Callback 参数说明: path:节点路径 watch: boolean类型 如果为true 表示使用默认的Watcher 为false表示不需要Watcher watcher: 通知处理器 在本次获取子节点以后 一旦子节点有变化机会收到服务端传来的通知 stat: 指定数据节点的节点状态信息,传入一个旧的stat对象,当执行方法后 stat会被服务器响应的新stat替换 cb:回调函数 有两种类型 上面已经说过 ctx: 上下文 5.获取节点数据 获取节点数据有4个重载方法 zk.getData(path, watch, stat); zk.getData(path, watcher, stat); zk.getData(path, watch, cb, ctx); zk.getData(path, watcher, cb, ctx); 参数说明: path: 节点路径 watch:boolean类型 如果为true 表示使用默认的Watcher 为false表示不需要Watcher stat:指定数据节点的节点状态信息,传入一个旧的stat对象,当执行方法后 stat会被服务器响应的新stat替换 cb:回调函数 有两种类型 上面已经说过 ctx: 上下文 在获取节点数据时候 如果注册watcher 在节点数据发送变化的时候会通知客户端,当客户端收到通知以后,如果想下次数据发送变化再次收到通知, 需要重新注册watcher,获取子节点机制也如此 6.更新节点数据 更新节点数据也分为同步异步两个方法 zk.setData(path, data, version); zk.setData(path, data, version, cb, ctx); 参数说明:同上 */ /** * @author liuzhe * @version v 0.1 2017/2/6 19:59 */ public class ZookeeperSampleGetDataTest implements Watcher { public static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static Stat stat = new Stat(); public static ZooKeeper zk = null; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String path = "/zk-test"; zk = new ZooKeeper("127.0.0.1:2181", 5000, new ZookeeperSampleCreateTest()); connectedSemaphore.await(); //獲取子節點 /*zk.delete(path, 0); zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); //同步方法獲取children List childs = zk.getChildren(path, true); System.out.println("childs:"+childs); //異步方法獲取children zk.getChildren(path, true, new IChildren2Callback(), null); zk.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(Integer.MAX_VALUE);*/ //獲取節點數據 zk.delete(path, 0); zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(new String(zk.getData(path, true, stat))); //czxid:创建该节点的事务ID,mzxid:更新该节点的事务ID version:数据版本 System.out.println("Czxid:" + stat.getCzxid() + "Mzxid:" + stat.getMzxid() + "Version:" + stat.getVersion()); zk.setData(path, "123".getBytes(), -1); Thread.sleep(Integer.MAX_VALUE); } @Override public void process(WatchedEvent event) { System.out.println("fdfdfd"); if (Event.KeeperState.SyncConnected == event.getState()) { if (Event.EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); } else if (event.getType() == Event.EventType.NodeChildrenChanged) { try{ System.out.println("Get Child:"+zk.getChildren(event.getPath(),true)); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } else if (event.getType() == Event.EventType.NodeDataChanged) { try { System.out.println(new String(zk.getData(event.getPath(), true, stat))); System.out.println("Czxid: " + stat.getCzxid() + "Mzxid: " + stat.getMzxid() + "Version: " + stat.getVersion()); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } } } } class IChildren2Callback implements AsyncCallback.Children2Callback { @Override public void processResult(int rc, String path, Object ctx, List<String> childrens, Stat stat) { System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path + ", ctx " + ctx + ", childrens :" + childrens + ", stat: " + stat); } } /*其中更新节点数据 版本version问题 -1表示基于数据的最新版本更新 这里可以作为分布式锁的一个思路 如果客户端参入的version不是数据最新版本则会更新失败 比如目前节点"/zk-test"的数据版本为 2 而某个客户端尝试 执行 setData("/zk-test","test".getBytes(),1) 由于传入version为1 < 服务器目前版本2 这样就会更新失败 7.检测节点是否存在 zk.exists(path, watch); zk.exists(path, watcher); zk.exists(path, watch, cb, ctx); zk.exists(path, watcher, cb, ctx); 如果判断节点是否存在是 注册watcher 会对节点是否存在进行监听--创建节点,删除节点,节点数据更新都会通知客户端 8.权限控制 zookeeper提供了ACL的权限控制机制,简单来说就是通过控制zookeeper服务器上数据节点的ACL,来控制客户端对节点的访问权限 addAuthInfo(String scheme,byte[] auth); 参数说明: scheme: 权限控制模式 分为: world ,auth,digest,ip和super auth: 具体的权限信息 类似于shiro的权限字符串 如下代码: ZooKeeper zk1 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest()); zk1.addAuthInfo("digest", "test:true".getBytes()); zk1.create("/zk-test-auth", "123".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL); ZooKeeper zk2 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest()); zk2.addAuthInfo("digest", "test:true".getBytes()); System.out.println(new String(zk2.getData("/zk-test-auth", false, null))); ZooKeeper zk3 = new ZooKeeper("192.168.1.138:2181",5000,new ZookeeperSampleCreateTest()); zk3.addAuthInfo("digest", "test:false".getBytes()); zk3.getData("/zk-test-auth", false, null); zk2设置了正确的权限 所以可以获取到节点数据 zk3则会抛异常 org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-test-auth*/

    转载请注明原文地址: https://ju.6miu.com/read-662106.html

    最新回复(0)