欢迎光临散文网 会员登陆 & 注册

Zookeeper

2023-06-01 21:03 作者:取闹  | 我要投稿

目录

Zookeeper 1

第一章 Zookeeper简介... 1

1.1 Zookeeper概述和功能... 1

1.2 Zookeper安装... 1

1.3 Zookeper数据模型... 3

第二章 Zookeeper命令操作... 4

1.1 Zookeeper Client 4

1.2 Zookeeper JavaClient 6

第三章 集群角色... 28


                                              Zookeeper


第一章 Zookeeper简介

1.1 Zookeeper概述和功能

Zookeeper 是 Apache Hadoop 项目下的一个子项目, 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员.简称zk.

Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。其本质上就是提供一种集中式信息存储服务.就是一个将数据存放到内存中,以树形结构存储数据的服务.我们根据其存储数据的特点可以实现分布式统一配置中心,分布式锁等功能,主要用于分布式应用程序的高性能协调,集群高可靠等.

主要功能:

配置管理

编辑



分布式锁

编辑


3.服务器动态上下线

1.2 Zookeper安装

上传解压

#进入到apps文件夹 使用rz上传 也可以直接拖拽到crt上传 cd /opt/apps   #上传后解压 tar -zxvf zookeeper-3.5.6.tar.gz

配置

#1.进入到解压后的zk文件夹下 创建一个文件夹zkData cd /opt/apps/zookeeper-3.5.6 mkdir zkData #2.进入到zk的conf文件夹下 将zoo_sample.cfg改为zoo.cfg cd conf mv zoo_sample.cfg zoo.cfg #3.修改zoo.cfg的配置 vi zoo.cfg    #将第12行dataDir的值改为我们刚才创建的文件夹  12+shift+G 直接跳转    dataDir=/opt/apps/zookeeper-3.5.6/zkData    #在文件的最后配置server的信息和端口    server.1=linux01:2888:3888    server.2=linux02:2888:3888    server.3=linux03:2888:3888 #4.进入到zkData文件夹下 添加linux01的myid为1 cd ../zkData echo 1 > myid

分发

#进入到linux01的/opt/apps文件夹下 scp -r zookeeper-3.5.6 linux02:$PWD scp -r zookeeper-3.5.6 linux03:$PWD #设置linux02上zk的myid为2 echo 2 > /opt/apps/zookeeper-3.5.6/zkData/myid cat  /opt/apps/zookeeper-3.5.6/zkData/myid #设置linux03上zk的myid为3 echo 3 > /opt/apps/zookeeper-3.5.6/zkData/myid cat /opt/apps/zookeeper-3.5.6/zkData/myid

启动

#zk没有一键启动我们需要挨个启动 /opt/apps/zookeeper-3.5.6/bin/zkServer.sh start #zk查看状态 /opt/apps/zookeeper-3.5.6/bin/zkServer.sh status

一键启动脚本

#!/bin/bash for hostname in linux01 linux02 linux03 do    echo "当前正在操作的主机$hostname"    ssh $hostname "source /etc/profile ; /opt/apps/zookeeper-3.5.6/bin/zkServer.sh $1 ;exit" done

1.3 Zookeper数据模型

ZooKeeper是一个树形目录服务,其数据模型和linux的文件系统目录树很类似,拥有一个层次化结构。

编辑


这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息.节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

节点可以分为四大类:

- PERSISTENT持久化节点 - EPHEMERAL临时节点:-e - PERSISTENT_SEQUENTIAL持久化顺序节点:-s - EPHEMERAL_SEQUENTIAL临时顺序节点: -es

第二章 Zookeeper命令操作

1.1 Zookeeper Client

1.1.1 服务端命令

/opt/apps/zookeeper-3.5.6/bin/zkServer.sh start

/opt/apps/zookeeper-3.5.6/bin/zkServer.sh stop

/opt/apps/zookeeper-3.5.6/bin/zkServer.sh status

/opt/apps/zookeeper-3.5.6/bin/zkServer.sh restart

1.1.2 客户端命令

连接服务端 /opt/apps/zookeeper-3.5.6/bin/zkCli.sh -server  linux001:2181 

连接本地

/opt/apps/zookeeper-3.5.6/bin/zkCli.sh 

查看命令帮助

help 

 查看 ls  目录

 创建节点

 create  /节点名 描述 create -e /节点名 描述 

  临时节点 create -s /节点名 描述   顺序节点 

 create -e -s /节点 描述   临时顺序节点

查看节点 

get /节点名 

设置节点描述 

set /节点名 描述 

删除节点

delete /节点名

删除非空节点 

deleteAll /节点名

监听

ls -w 路径    

 -- 监听根目录 所有子节点的变化   ls - w /   

 WATCHER::

 WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/ 

get -w 路径 

-- 监听 /app3 节点数据的变化

 WATCHER::

 WatchedEvent state:SyncConnected type:NodeDataChanged path:/app3


1.2 Zookeeper JavaClient

1.2.1 Cuator介绍

Curator是 ApacheZooKeeper的Java客户端库.

常见的ZooKeeper Java APl :

        原生JavaAPI

        zkClient

        Curator

Curator项目的目标是简化ZooKeeper客户端的使用。

Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。

http://curator.apache.org

坐标

           <!--curator-->       

.     <dependency>         

       <groupId>org.apache.curator</groupId>           

     <artifactId>curator-framework</artifactId>         

       <version>4.0.0</version>       

     </dependency>       

     <dependency>      

          <groupId>org.apache.curator</groupId>        

        <artifactId>curator-recipes</artifactId> 

                <version>4.0.0</version> 

            </dependency>       

         <dependency>      

          <groupId>org.apache.curator</groupId>    

        <artifactId>curator-client</artifactId>        

        <version>4.0.0</version>    

        </dependency>

1.2.2 建立连接

/*   

  CuratorFrameworkFactory        

   静态方法获取连接对象            

   static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)                    

        String connectString:连接的字符串    zkserver地址:端口           

                     int sessionTimeoutMs:会话超时时间 单位ms   默认 60 * 1000                 

               int connectionTimeoutMs 连接超时时间 单位ms  默认 15 * 1000                                                RetryPolicy retryPolicy 重试策略   接口                           

                           实现类ExponentialBackoffRetry        

      static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)                         CuratorFramework          

              方法          

                void  start() 开始连接

*/ 

public class Test {  

  public static void main(String[] args) {     

   String connectString = "linux001:2181";

//        //基础等待时间 20  重试次数 10     

   RetryPolicy r = new ExponentialBackoffRetry(20,10); 

//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, r);

//        curatorFramework.start();;     

   //第二种方式     

   CuratorFramework client = CuratorFrameworkFactory.builder().                                                        connectString(connectString)                      

          .retryPolicy(r).build();        client.start();  

 } 

}

1.2.3 创建节点

/*   

 CuratorFramework      

 创建节点      

 create().forPath(节点) 创建节点 不指定数据 默认为 ip地址   

 create().forPath(节点,byte[] data)  创建节点 指定数据 字节数组        create().withMode().forPath(节点,byte[] data)  创建节点 指定节点类型 默认类型 持久化                    CreateMode.EPHEMERAL 临时节点            

           CreateMode.PERSISTENT 持久节点                                                          CreateMode.PERSISTENT_SEQUENTIAL 持久顺序节点                    CreateMode.EPHEMERAL_SEQUENTIAL  临时顺序节点        create().creatingParentsIfNeeded().forPath(节点,byte[] data) 创建多级节点

*/

 public class Test { 

  public static void main(String[] args) throws Exception {     

   CuratorFramework client = CuratorUtils.getClient();     

   client.start();     

   //创建节点  由于指定了命名空间 所以会在 /doit下创建     

   //没有指定数据 默认数据是当前客户端的ip地址     

   //  client.create().forPath("/app1");      

  //创建节点 指定数据      

  //client.create().forPath("/app2","abc".getBytes());      

  //创建临时节点  指定数据 

//        client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","aa".getBytes());        //Thread.sleep(10000);     

   //创建多级节点       

 client.create().creatingParentsIfNeeded().forPath("/app3/p1/p", "abc".getBytes());        client.close();  

  } 

}

1.2.4 查看节点

/*   

  查看节点    

* 1. 查询数据:get: getData().forPath()     

* 2. 查询子节点: ls: getChildren().forPath()     

* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()

*/ 

public class Test02 {    

public static void main(String[] args) throws Exception {       

 CuratorFramework client = CuratorUtils.getClient();       

 client.start();      

  //查询节点数据    

    byte[] bytes = client.getData().forPath("/app1");      

  System.out.println(new String(bytes));    

    //查询子节点    

    List<String> list = client.getChildren().forPath("/");    

    for (String s : list) {     

       System.out.println(s);      

  }     

   //查询节点状态信息  ls -s     

   Stat s = new Stat(); 

        client.getData().storingStatIn(s).forPath("/app1");   

     System.out.println(s.getCzxid());  

      client.close(); 

    }

 }

1.2.5 修改数据

/**   

  * 修改数据     

* 1. 基本修改数据:setData().forPath()     

* 2. 根据版本修改: setData().withVersion().forPath()    

* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。     

*/ public class Test03 {  

 public static void main(String[] args) throws Exception {       

 CuratorFramework client = CuratorUtils.getClient();      

  client.start();     

   //基本修改数据 

//        client.setData().forPath("/app1","v1".getBytes());

 //        byte[] bytes = client.getData().forPath("/app1"); 

//        System.out.println(new String(bytes));        

//查询版本      

  Stat s = new Stat();     

   client.getData().storingStatIn(s).forPath("/app1");        int version = s.getVersion();        

//使用版本进行设置       

 client.setData().withVersion(version).forPath("/app1");    

    byte[] bytes = client.getData().forPath("/app1");    

    System.out.println(new String(bytes));           

     client.close();   

    }

 }

1.2.6 删除节点

/**    

* 删除节点: delete deleteall     

* 1. 删除单个节点:delete().forPath("/app1");     

* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");     

* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");     

* 4. 回调:inBackground     

*/

public class Test04 {   

 public static void main(String[] args) throws Exception {       

 CuratorFramework client = CuratorUtils.getClient();      

  client.start();       

 //删除单个节点    

 //   client.delete().forPath("/app1");     

   //删除带子节点的节点

//        client.delete().deletingChildrenIfNeeded().forPath("/app3");       

 //必须成功的删除:为了防止网络抖动。本质就是重试。

//        client.delete().guaranteed().forPath("/aa");        client.delete().guaranteed().inBackground(new BackgroundCallback() {            

@Override          

  public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {                System.out.println("我被删除了");             

   System.out.println(event);         

   }      

  }).forPath("/app2");       

 client.close();   

 } 

}

1.2.7 注册监听

监听数据变化

public class Test05 {   

 public static void main(String[] args) throws Exception {      

  CuratorFramework client = CuratorUtils.getClient();       

 client.start();      

  //创建 NodeCache对象     

   NodeCache nodeCache = new NodeCache(client,"/app1");     

   //2.注册监听      

 nodeCache.getListenable().addListener(new NodeCacheListener() {        

    @Override       

     public void nodeChanged() throws Exception {         

       System.out.println("节点变化了");        

        byte[] data = nodeCache.getCurrentData().getData();            

    System.out.println(new String(data));       

     }     

   });    

  //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据      

  nodeCache.start(true);        

 //        Thread.sleep(1000000);    

}

 }

监听子节点变化

public class Test06 {  

  public static void main(String[] args) throws Exception {      

  CuratorFramework client = CuratorUtils.getClient();     

   client.start();   

     PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app3",true);        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {            @Override      

      public void childEvent(CuratorFramework curatorFramework, 

PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {          

      System.out.println("子节点变化了");                System.out.println(pathChildrenCacheEvent);            

    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();                if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){           

         byte[] data = pathChildrenCacheEvent.getData().getData();                    System.out.println(new String(data));        

        }        

    }    

    });        

pathChildrenCache.start();           

     Thread.sleep(1000000);   

 }

 }

/*   

  TreeCache 相当于 NodeCache与PathChildrenCache组合    

    监听当前节点 与 子节点 

 */

public class Test07 {  

 public static void main(String[] args) throws Exception {     

   CuratorFramework client = CuratorUtils.getClient();      

  client.start();      

  TreeCache treeCache = new TreeCache(client,"/app4");        treeCache.getListenable().addListener(new TreeCacheListener() {   

         @Override     

       public void childEvent(CuratorFramework curatorFramework, 

TreeCacheEvent treeCacheEvent) throws Exception {       

         System.out.println(treeCacheEvent);       

     }    

    });   

     treeCache.start();      

  Thread.sleep(10000000); 

    }

 }

第三章 集群角色

在ZooKeeper集群服中务中有三个角色:

Leader 领导者 :

1. 处理事务请求 2. 集群内部各服务器的调度者

Follower 跟随者 :

1.处理客户端非事务请求,转发事务请求给Leader服务器

2.参与Leader选举投票

Observer 观察者:

1.处理客户端非事务请求,转发事务请求给Leader服务器


选举机制

假设3台linux linux01 启动 投自己一票 广播 linux02 启动 收到linux01 广播   投自己一票 linux01 发现 linux02的id比自己的id大  投linux02一票 linux02 投票超过半数 选为 leader   linux03 启动 由于已经有了leader  follower

如果leader宕机

看谁的版本号新 谁为leader 如果版本号相同 谁id大 谁是leader


Zookeeper的评论 (共 条)

分享到微博请遵守国家法律