原文地址:Clustering squbs Services using ZooKeeper
概览
squbs通过zkcluster模块实现服务集群。zkcluster是一个 Akka 扩展 ,利用Zookeeper来管理akka集群和分区。这与 Akka Cluster的领导&成员管理系统的功能近似。更强大的是它提供了分区支持,并消除了 entry-nodes
的需要。
配置
我们需要一个squbsconfig/zkcluster.conf
文件在运行目录下。他需要提供以下属性:
- connectionString:一个逗号分隔的zookeeper节点字符串
- namespace: 有效的znode路径名称,这将是其之后所有znode创建节点的父代。
- segments: 划分的分片数
以下是一个zkcluster.conf
文件内容的例子:
zkCluster {
connectionString = "zk-node-01.squbs.org:2181,zk-node-02.squbs.org:2181,zk-node-03.squbs.org:2181"
namespace = "clusteredservicedev"
segments = 128
}
用户指导
从简单的注册扩展开始,介绍所有常见的akka扩展。然后你访问 zkClusterActor
并使用如下:
val zkClusterActor = ZkCluster(system).zkClusterActor
// Query the members in the cluster
zkClusterActor ! ZkQueryMembership
// Matching the response
case ZkMembership(members:Set[Address]) =>
// Query leader in the cluster
zkClusterActor ! ZkQueryLeadership
// Matching the response
case ZkLeadership(leader:Address) =>
// Query partition (expectedSize = None), create or resize (expectedSize = Some[Int])
zkClusterActor ! ZkQueryPartition(partitionKey:ByteString, notification:Option[Any] = None, expectedSize:Option[Int] = None, props:Array[Byte] = Array[Byte]())
// Matching the response
case ZkPartition(partitionKey:ByteString, members: Set[Address], zkPath:String, notification:Option[Any]) =>
case ZkPartitionNotFound(partitionKey: ByteString) =>
// Monitor or stop monitoring the partition change
zkClusterActor ! ZkMonitorPartition
zkClusterActor ! ZkStopMonitorPartition
// Matching the response
case ZkPartitionDiff(partitionKey: ByteString, onBoardMembers: Set[Address], dropOffMembers: Set[Address], props: Array[Byte] = Array.empty) =>
// Removing partition
zkClusterActor ! ZkRemovePartition(partitionKey:ByteString)
// Matching the response
case ZkPartitionRemoval(partitionKey:ByteString) =>
// List the partitions hosted by a certain member
zkClusterActor ! ZkListPartitions(address: Address)
// Matching the response
case ZkPartitions(partitionKeys:Seq[ByteString]) =>
// monitor the zookeeper connection state
val eventStream = context.system.eventStream
eventStream.subscribe(self, ZkConnected.getClass)
eventStream.subscribe(self, ZkReconnected.getClass)
eventStream.subscribe(self, ZkLost.getClass)
eventStream.subscribe(self, ZkSuspended.getClass)
// quit the cluster
zkCluster(system).zkClusterActor ! PoisonPill
// add listener when quitting the cluster
zkCluster(system).addShutdownListener(listener: () => Unit)
依赖
将以下依赖加入到你的build.sbt或scala构建文件中:
"org.squbs" %% "squbs-zkcluster" % squbsVersion
设计
想要变更zkcluster的需读取这个:
- Membership 基于
zookeeper
临时节点,关闭的session将会通过ZkMembershipChanged
更改leader - Leadership基于
curator
框架的LeaderLatch
,新的选举结果将会广播ZkLeaderElected
到所有节点。 - 分区通过leader计算,并通过leader节点的
ZkPartitionsManager
写入znode。 - 分区修改只能通过leader,通过ZkPartitionsManager 来强制执行修改。
-
ZkPartitionsManager
的follower节点将watch Zookeeper中的变更节点。一旦leader在rebalance之后改变分区,ZkPartitionsManager
的follower节点将会获得通知并更新他们内存中分区信息快照。 - 想被分区变更
ZkPartitionDiff
通知到的,需发送ZkMonitorPartition
至集群actor来注册。
ZkMembershipMonitor
是处理成员和领导的actor类型
ZkPartitionsManager
是处理分区管理的actor
ZkClusterActor
是一个提供用户查询的接口actor