squbs-9. 使用Zookeeper搭建squbs服务集群

原文地址:Clustering squbs Services using ZooKeeper

概览

squbs通过zkcluster模块实现服务集群。zkcluster是一个 Akka 扩展 ,利用Zookeeper来管理akka集群和分区。这与 Akka Cluster的领导&成员管理系统的功能近似。更强大的是它提供了分区支持,并消除了 entry-nodes的需要。

配置

我们需要一个squbsconfig/zkcluster.conf文件在运行目录下。他需要提供以下属性:

  • connectionString:一个逗号分隔的zookeeper节点字符串
  • namespace: 有效的znode路径名称,这将是其之后所有znode创建节点的父代。
  • segments: 划分的分片数

以下是一个zkcluster.conf文件内容的例子:

zkCluster {
    connectionString = "zk-node-01.squbs.org:2181,zk-node-02.squbs.org:2181,zk-node-03.squbs.org:2181"
    namespace = "clusteredservicedev"
    segments = 128
}

用户指导

从简单的注册扩展开始,介绍所有常见的akka扩展。然后你访问 zkClusterActor 并使用如下:

val zkClusterActor = ZkCluster(system).zkClusterActor

// Query the members in the cluster
zkClusterActor ! ZkQueryMembership

// Matching the response
case ZkMembership(members:Set[Address]) =>


// Query leader in the cluster
zkClusterActor ! ZkQueryLeadership
// Matching the response
case ZkLeadership(leader:Address) =>

// Query partition (expectedSize = None), create or resize (expectedSize = Some[Int])
zkClusterActor ! ZkQueryPartition(partitionKey:ByteString, notification:Option[Any] = None, expectedSize:Option[Int] = None, props:Array[Byte] = Array[Byte]())
// Matching the response
case ZkPartition(partitionKey:ByteString, members: Set[Address], zkPath:String, notification:Option[Any]) =>
case ZkPartitionNotFound(partitionKey: ByteString) =>


// Monitor or stop monitoring the partition change
zkClusterActor ! ZkMonitorPartition
zkClusterActor ! ZkStopMonitorPartition
// Matching the response
case ZkPartitionDiff(partitionKey: ByteString, onBoardMembers: Set[Address], dropOffMembers: Set[Address], props: Array[Byte] = Array.empty) =>

// Removing partition
zkClusterActor ! ZkRemovePartition(partitionKey:ByteString)
// Matching the response
case ZkPartitionRemoval(partitionKey:ByteString) =>


// List the partitions hosted by a certain member
zkClusterActor ! ZkListPartitions(address: Address)
// Matching the response
case ZkPartitions(partitionKeys:Seq[ByteString]) =>

// monitor the zookeeper connection state
val eventStream = context.system.eventStream
eventStream.subscribe(self, ZkConnected.getClass)
eventStream.subscribe(self, ZkReconnected.getClass)
eventStream.subscribe(self, ZkLost.getClass)
eventStream.subscribe(self, ZkSuspended.getClass)

// quit the cluster
zkCluster(system).zkClusterActor ! PoisonPill

// add listener when quitting the cluster
zkCluster(system).addShutdownListener(listener: () => Unit)

依赖

将以下依赖加入到你的build.sbt或scala构建文件中:

"org.squbs" %% "squbs-zkcluster" % squbsVersion

设计

想要变更zkcluster的需读取这个:

  • Membership 基于zookeeper临时节点,关闭的session将会通过ZkMembershipChanged更改leader
  • Leadership基于 curator框架的LeaderLatch,新的选举结果将会广播 ZkLeaderElected到所有节点。
  • 分区通过leader计算,并通过leader节点的 ZkPartitionsManager 写入znode。
  • 分区修改只能通过leader,通过ZkPartitionsManager 来强制执行修改。
  • ZkPartitionsManager 的follower节点将watch Zookeeper中的变更节点。一旦leader在rebalance之后改变分区,ZkPartitionsManager 的follower节点将会获得通知并更新他们内存中分区信息快照。
  • 想被分区变更 ZkPartitionDiff 通知到的,需发送ZkMonitorPartition至集群actor来注册。

ZkMembershipMonitor是处理成员和领导的actor类型

ZkPartitionsManager 是处理分区管理的actor

ZkClusterActor 是一个提供用户查询的接口actor

    原文作者:吕亦行
    原文地址: https://www.jianshu.com/p/4787b85a4750
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞