Zookeeper
目录
Zookeeper 1
第一章 Zookeeper简介... 1
1.1 Zookeeper概述和功能... 1
1.2 Zookeper安装... 1
1.3 Zookeper数据模型... 3
第二章 Zookeeper命令操作... 4
1.1 Zookeeper Client 4
1.2 Zookeeper JavaClient 6
第三章 集群角色... 28
Zookeeper
第一章 Zookeeper简介
1.1 Zookeeper概述和功能
Zookeeper 是 Apache Hadoop 项目下的一个子项目, 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员.简称zk.
Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。其本质上就是提供一种集中式信息存储服务.就是一个将数据存放到内存中,以树形结构存储数据的服务.我们根据其存储数据的特点可以实现分布式统一配置中心,分布式锁等功能,主要用于分布式应用程序的高性能协调,集群高可靠等.
主要功能:
配置管理

分布式锁

3.服务器动态上下线

1.2 Zookeper安装
上传解压
#进入到apps文件夹 使用rz上传 也可以直接拖拽到crt上传 cd /opt/apps #上传后解压 tar -zxvf zookeeper-3.5.6.tar.gz
配置
#1.进入到解压后的zk文件夹下 创建一个文件夹zkData cd /opt/apps/zookeeper-3.5.6 mkdir zkData #2.进入到zk的conf文件夹下 将zoo_sample.cfg改为zoo.cfg cd conf mv zoo_sample.cfg zoo.cfg #3.修改zoo.cfg的配置 vi zoo.cfg #将第12行dataDir的值改为我们刚才创建的文件夹 12+shift+G 直接跳转 dataDir=/opt/apps/zookeeper-3.5.6/zkData #在文件的最后配置server的信息和端口 server.1=linux01:2888:3888 server.2=linux02:2888:3888 server.3=linux03:2888:3888 #4.进入到zkData文件夹下 添加linux01的myid为1 cd ../zkData echo 1 > myid
分发
#进入到linux01的/opt/apps文件夹下 scp -r zookeeper-3.5.6 linux02:$PWD scp -r zookeeper-3.5.6 linux03:$PWD #设置linux02上zk的myid为2 echo 2 > /opt/apps/zookeeper-3.5.6/zkData/myid cat /opt/apps/zookeeper-3.5.6/zkData/myid #设置linux03上zk的myid为3 echo 3 > /opt/apps/zookeeper-3.5.6/zkData/myid cat /opt/apps/zookeeper-3.5.6/zkData/myid
启动
#zk没有一键启动我们需要挨个启动 /opt/apps/zookeeper-3.5.6/bin/zkServer.sh start #zk查看状态 /opt/apps/zookeeper-3.5.6/bin/zkServer.sh status
一键启动脚本
#!/bin/bash for hostname in linux01 linux02 linux03 do echo "当前正在操作的主机$hostname" ssh $hostname "source /etc/profile ; /opt/apps/zookeeper-3.5.6/bin/zkServer.sh $1 ;exit" done
1.3 Zookeper数据模型
ZooKeeper是一个树形目录服务,其数据模型和linux的文件系统目录树很类似,拥有一个层次化结构。

这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息.节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
节点可以分为四大类:
- PERSISTENT持久化节点 - EPHEMERAL临时节点:-e - PERSISTENT_SEQUENTIAL持久化顺序节点:-s - EPHEMERAL_SEQUENTIAL临时顺序节点: -es
第二章 Zookeeper命令操作
1.1 Zookeeper Client
1.1.1 服务端命令
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh start
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh stop
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh status
/opt/apps/zookeeper-3.5.6/bin/zkServer.sh restart
1.1.2 客户端命令
连接服务端 /opt/apps/zookeeper-3.5.6/bin/zkCli.sh -server linux001:2181
连接本地
/opt/apps/zookeeper-3.5.6/bin/zkCli.sh
查看命令帮助
help
查看 ls 目录
创建节点
create /节点名 描述 create -e /节点名 描述
临时节点 create -s /节点名 描述 顺序节点
create -e -s /节点 描述 临时顺序节点
查看节点
get /节点名
设置节点描述
set /节点名 描述
删除节点
delete /节点名
删除非空节点
deleteAll /节点名
监听
ls -w 路径
-- 监听根目录 所有子节点的变化 ls - w /
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/
get -w 路径
-- 监听 /app3 节点数据的变化
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app3
1.2 Zookeeper JavaClient
1.2.1 Cuator介绍
Curator是 ApacheZooKeeper的Java客户端库.
常见的ZooKeeper Java APl :
原生JavaAPI
zkClient
Curator
Curator项目的目标是简化ZooKeeper客户端的使用。
Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
http://curator.apache.org
坐标
<!--curator-->
. <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>
1.2.2 建立连接
/*
CuratorFrameworkFactory
静态方法获取连接对象
static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
String connectString:连接的字符串 zkserver地址:端口
int sessionTimeoutMs:会话超时时间 单位ms 默认 60 * 1000
int connectionTimeoutMs 连接超时时间 单位ms 默认 15 * 1000 RetryPolicy retryPolicy 重试策略 接口
实现类ExponentialBackoffRetry
static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) CuratorFramework
方法
void start() 开始连接
*/
public class Test {
public static void main(String[] args) {
String connectString = "linux001:2181";
// //基础等待时间 20 重试次数 10
RetryPolicy r = new ExponentialBackoffRetry(20,10);
// CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectString, r);
// curatorFramework.start();;
//第二种方式
CuratorFramework client = CuratorFrameworkFactory.builder(). connectString(connectString)
.retryPolicy(r).build(); client.start();
}
}
1.2.3 创建节点
/*
CuratorFramework
创建节点
create().forPath(节点) 创建节点 不指定数据 默认为 ip地址
create().forPath(节点,byte[] data) 创建节点 指定数据 字节数组 create().withMode().forPath(节点,byte[] data) 创建节点 指定节点类型 默认类型 持久化 CreateMode.EPHEMERAL 临时节点
CreateMode.PERSISTENT 持久节点 CreateMode.PERSISTENT_SEQUENTIAL 持久顺序节点 CreateMode.EPHEMERAL_SEQUENTIAL 临时顺序节点 create().creatingParentsIfNeeded().forPath(节点,byte[] data) 创建多级节点
*/
public class Test {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//创建节点 由于指定了命名空间 所以会在 /doit下创建
//没有指定数据 默认数据是当前客户端的ip地址
// client.create().forPath("/app1");
//创建节点 指定数据
//client.create().forPath("/app2","abc".getBytes());
//创建临时节点 指定数据
// client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","aa".getBytes()); //Thread.sleep(10000);
//创建多级节点
client.create().creatingParentsIfNeeded().forPath("/app3/p1/p", "abc".getBytes()); client.close();
}
}
1.2.4 查看节点
/*
查看节点
* 1. 查询数据:get: getData().forPath()
* 2. 查询子节点: ls: getChildren().forPath()
* 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
*/
public class Test02 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//查询节点数据
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//查询子节点
List<String> list = client.getChildren().forPath("/");
for (String s : list) {
System.out.println(s);
}
//查询节点状态信息 ls -s
Stat s = new Stat();
client.getData().storingStatIn(s).forPath("/app1");
System.out.println(s.getCzxid());
client.close();
}
}
1.2.5 修改数据
/**
* 修改数据
* 1. 基本修改数据:setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
*/ public class Test03 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//基本修改数据
// client.setData().forPath("/app1","v1".getBytes());
// byte[] bytes = client.getData().forPath("/app1");
// System.out.println(new String(bytes));
//查询版本
Stat s = new Stat();
client.getData().storingStatIn(s).forPath("/app1"); int version = s.getVersion();
//使用版本进行设置
client.setData().withVersion(version).forPath("/app1");
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
client.close();
}
}
1.2.6 删除节点
/**
* 删除节点: delete deleteall
* 1. 删除单个节点:delete().forPath("/app1");
* 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
* 4. 回调:inBackground
*/
public class Test04 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//删除单个节点
// client.delete().forPath("/app1");
//删除带子节点的节点
// client.delete().deletingChildrenIfNeeded().forPath("/app3");
//必须成功的删除:为了防止网络抖动。本质就是重试。
// client.delete().guaranteed().forPath("/aa"); client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("我被删除了");
System.out.println(event);
}
}).forPath("/app2");
client.close();
}
}
1.2.7 注册监听
监听数据变化
public class Test05 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
//创建 NodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
nodeCache.start(true);
// Thread.sleep(1000000);
}
}
监听子节点变化
public class Test06 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app3",true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override
public void childEvent(CuratorFramework curatorFramework,
PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了"); System.out.println(pathChildrenCacheEvent);
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = pathChildrenCacheEvent.getData().getData(); System.out.println(new String(data));
}
}
});
pathChildrenCache.start();
Thread.sleep(1000000);
}
}
/*
TreeCache 相当于 NodeCache与PathChildrenCache组合
监听当前节点 与 子节点
*/
public class Test07 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorUtils.getClient();
client.start();
TreeCache treeCache = new TreeCache(client,"/app4"); treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework,
TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent);
}
});
treeCache.start();
Thread.sleep(10000000);
}
}
第三章 集群角色
在ZooKeeper集群服中务中有三个角色:
Leader 领导者 :
1. 处理事务请求 2. 集群内部各服务器的调度者
Follower 跟随者 :
1.处理客户端非事务请求,转发事务请求给Leader服务器
2.参与Leader选举投票
Observer 观察者:
1.处理客户端非事务请求,转发事务请求给Leader服务器

选举机制
假设3台linux linux01 启动 投自己一票 广播 linux02 启动 收到linux01 广播 投自己一票 linux01 发现 linux02的id比自己的id大 投linux02一票 linux02 投票超过半数 选为 leader linux03 启动 由于已经有了leader follower
如果leader宕机
看谁的版本号新 谁为leader 如果版本号相同 谁id大 谁是leader