博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
akka cluster sharding
阅读量:7033 次
发布时间:2019-06-28

本文共 4369 字,大约阅读时间需要 14 分钟。

cluster sharding 的目的在于提供一个框架,方便实现 DDD,虽然我至今也没搞明白 DDD 到底适用于是什么场合,但是 cluster sharding 却是我目前在做的一个 project 扩展到集群上非常需要的工具。

sharding 要做这么几件事

1. 对于每一个 entity,创建一个 actor。该 entity 有 Id 作为唯一标示。该 entity 的所有消息都由此 actor 来处理

2. 该 actor 在一段时间内不工作时,会超时并 kill self

3. 当一个集群中加入新的节点时,新的 actor 会被自动创建到新 node 上,或者老的actor 会负载均衡,迁移到新 node 上。同样的,当节点挂掉时,挂掉的 actor 会迁移到新的

在 sharding 被提出之前,google group 和 stackoverflow 上有很多人希望有这么一个东西,那时候还是 2011 年,现在已经 2015 年了。

 

sharding 的原理和用法 doc 都有比较详细的说明,下面做个小测试:

 

首先是 entity actor 的定义:

object ActionWorker {  def props(): Props = Props(new ActionWorker)  //must return s: Command  val idExtractor: ShardRegion.IdExtractor = {    case s: Command => (s.id, s)  }  val shardResolver: ShardRegion.ShardResolver = msg => msg match {    case s: Command   => (math.abs(s.id.hashCode) % 100).toString  }  val shardName: String = "ActionWorker"}

  

idExtractor 会从 message 中抽取 id,作为路由的依据,而返回值的第二项是 actor 收到的消息

shardResolver 根据 id 确定 shard 所在的位置,哈希函数的设定方式我也没有自习研究,doc 说是 id 的十倍就可以。

class ActionWorker extends Actor {  val log = Logging(context.system, this)  println("action worker is created")  context.setReceiveTimeout(30 seconds)  override def receive: Receive = {    case Command(id, payload) =>      val selfDesc = self.path.parent.name + "-" + self.path.name      println("here i am, working: " + selfDesc)      log.info("here i am, working: " + selfDesc)    case ReceiveTimeout =>      log.info("nothing to do, better kill myself")      val selfDesc = self.path.parent.name + "-" + self.path.name      println("here i am, working: " + selfDesc)      println("nothing to do, better kill myself")      context.stop(self)  }

  

actor 的定义没有特别之处,需要注意的是

1. actor 收到的消息是 idExtract 的第二项,而不是 s: Command 那个东西

2. actor 没有一个正常的途径得到自己的id,一个 workaroud 的办法是通过 self.path.name 来得到自己的id,再据此完成某些初始化操作

 

worker 处理数据,生成数据的 actor 叫做 Bot

class Bot extends Actor {  val log = Logging(context.system, this)  val tickTask = context.system.scheduler.schedule(3.seconds, 10.seconds, self, Command(""))  def receive = create  val postRegion = ClusterSharding(context.system).shardRegion(ActionWorker.shardName)  val create: Receive = {    case Command(id, payload) =>      val postId = Random.nextInt(5).toString      log.info("bot create new command and received: " + postId)      println("new command postID = " + postId)      postRegion ! Command(postId, postId)  }}

  

Bot 生成的数据要传到 worker actor,注意 postRegion 的获得方式,它首先从 actorSystem 中得到 ClusterSharding 集合(一个 actorSystem 可能会有多个 shard),然后根据 shardName 定位到唯一的 shard。最后把需要发送的消息传给 postRegin,postRegin 会完成转发。

 

Main 方法的 startUp 函数

def startup(ports: Seq[String]): Unit = {    ports foreach { port =>      // Override the configuration of the port      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).        withFallback(ConfigFactory.load())      // Create an Akka system      val system = ActorSystem("ClusterSystem", config)      startupSharedJournal(system, startStore = (port == "2551"), path =        ActorPath.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/user/store"))      ClusterSharding(system).start(        typeName = ActionWorker.shardName,        entryProps = Some(ActionWorker.props()),        idExtractor = ActionWorker.idExtractor,        shardResolver = ActionWorker.shardResolver)      if (port != "2551" && port != "2552")        system.actorOf(Props[Bot], "bot")    }  }  def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {    // Start the shared journal on one node (don't crash this SPOF)    // This will not be needed with a distributed journal    if (startStore)      system.actorOf(Props[SharedLeveldbStore], "store")    // register the shared journal    import system.dispatcher    implicit val timeout = Timeout(15.seconds)    val f = (system.actorSelection(path) ? Identify(None))    f.onSuccess {      case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system)      case _ =>        system.log.error("Shared journal not started at {}", path)        system.shutdown()    }    f.onFailure {      case _ =>        system.log.error("Lookup of shared journal at {} timed out", path)        system.shutdown()    }  }

  

startupSharedJournal 是必须要执行的,不然 cluster 跑不起来。(后面测试了下,发现能跑起来)

 

ClusterSharding 在本 actorSystem 启动,参数比较直观。

 

需要注意的是:

1. sharedJournal 是在测试情况下才用得到的,在 prod 环境下应该使用 journal

2. cluster sharding 借助 cluster singleton 实现

 

转载地址:http://mgjal.baihongyu.com/

你可能感兴趣的文章
resin app server安装总结
查看>>
订单信息表和订单明细表
查看>>
背包九讲
查看>>
AS莫名报错 Error:Could not download junit.jar (junit:junit:4.12): No cached version available
查看>>
右侧客服 运动案例
查看>>
T4 Editor地址
查看>>
小程序文档
查看>>
QQ分享-定制分享卡片
查看>>
DataTable的用法
查看>>
17_服务器提权
查看>>
Python文件指针与Python函数
查看>>
免费16WiFi被吐槽
查看>>
移动UI自动化-Page Objects Pattern
查看>>
-------分割线------\n FriskyPuppy的图论学习之路!
查看>>
JZ2440 裸机驱动 第13章 LCD控制器(2)
查看>>
连邦IT服务IT用户、厂商和服务商
查看>>
浪潮信息10亿投向云计算
查看>>
SVN分支与主干
查看>>
读书笔记--精通CSS高级Web标准解决方案(二)---CSS基础之CSS选择器
查看>>
NodeJS基础(一)
查看>>