Typesafe Akka Remote Sampleの図解 - 2/2 LookupApplication編

リチャード 伊真岡 blog

Typesafe Akka Remote Sampleの図解 - 2/2 LookupApplication編

twitter:RichardImaokaJP

TypeSafeのAkka Remote Samples with Scalaに含まれる2つ目のサンプルアプリケーション

前回の記事に引き続き、何かとわかりにくいTypeSafe社の@TypeSafeのAkka Remoteのサンプルについて、図解していきたいと思います。

サンプルの中には2つのアプリケーションが含まれていて、この記事はその2つ目、LookupApplicationについてです

メッセージの型としてのcase class MathOpは前回の記事でも解説した通りです

// sample/remote/calculator/MathOp.scala
trait MathOp

final case class Add(nbr1: Int, nbr2: Int) extends MathOp

final case class Subtract(nbr1: Int, nbr2: Int) extends MathOp

final case class Multiply(nbr1: Int, nbr2: Int) extends MathOp

final case class Divide(nbr1: Double, nbr2: Int) extends MathOp

Akkaでよく使われるcase classをメッセージの型として使う方法です。足し、引き、掛け、割り算に相当する以上の4つが定義されています。

MathResultも同様に前回の記事の通りです

// sample/remote/calculator/MathOp.scala
trait MathResult

final case class AddResult(nbr: Int, nbr2: Int, result: Int) extends MathResult

final case class SubtractResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult

final case class MultiplicationResult(nbr1: Int, nbr2: Int, result: Int) extends MathResult

final case class DivisionResult(nbr1: Double, nbr2: Int, result: Double) extends MathResult

それぞれに対する結果型も用意されています。以下で見るようにActorはこれらの型のメッセージをやり取りして、計算の入力と結果を受け渡します。

CalculatorActorは計算入力を受け取って結果を返す、LookupActorはRemoteで生成されたCalculatorActorを探して、監視したうえで、計算を行わせます

// sample/remote/calculator/calculatorActor.scala
class CalculatorActor extends Actor {
  def receive = {
    case Add(n1, n2) =>
      println("Calculating %d + %d".format(n1, n2))
      sender() ! AddResult(n1, n2, n1 + n2)
    case Subtract(n1, n2) =>
      println("Calculating %d - %d".format(n1, n2))
      sender() ! SubtractResult(n1, n2, n1 - n2)
    case Multiply(n1, n2) =>
      println("Calculating %d * %d".format(n1, n2))
      sender() ! MultiplicationResult(n1, n2, n1 * n2)
    case Divide(n1, n2) =>
      println("Calculating %.0f / %d".format(n1, n2))
      sender() ! DivisionResult(n1, n2, n1 / n2)
  }
}

こちらも前回の記事で解説した通りです。

例えばMultiply型のメッセージを受け取ったときは、その結果であるMultiplicationResult型のメッセージを送信元"sender"に投げ返します。

LookupActorは(LookupActorから見て)RemoteにあるCalculatorActorを探しに行きます

今まで出てきたActorに比べてLookupActorはやや複雑です

//sample/remote/calculator/LookupActor.scala
class LookupActor(path: String) extends Actor { ... }

まず、上記のConstructionの部分を見ましょう。path変数には

  • path = "akka.tcp://CalculatorSystem@127.0.0.1:2552/user/calculator"

が入ってきます。

LookupActorは最初に呼び出される関数sendIdentifyRequest()はActorSelectionに対して、Identifyメッセージを送っています

Identifyについては後述しますので、まずはActorSelectionについて。

ActorSelectionは、上記の"akka.tcp://..."のようなパス(URL)に対して!メソッドでメッセージを送ることができます。

つまり、Akkaでは

  • ActorRef
  • ActorSelection

の2つに対して!メソッドでメッセージが送れることになります。

Akkaに備わっているのIdentify, ActorIdentityメッセージ型は、ActorSelection宛てにメッセージを送ったときにActorRefを得ることができます

次に、Identify, ActorIdentityは、Akkaに備わっているメッセージ型です。

AkkaのActorはIdentifyを受け取ると、ActorIdentityをsender()に返します。その際、ActorIdentityはActorRefを第2引数にもっています。

//akka.actor.ActorIdentity
 case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) 

LookupActorではその第2引数として返ってきたActorRefを使って、context.watch()しています。

context.watch()すると、別のActorを監視することができる、すなわち監視対象のActorがStopすると、Terminatedメッセージを受け取ることになります。

    case Terminated(`actor`) =>
      println("Calculator terminated")
      sendIdentifyRequest()
      context.become(identifying)

LookupActorではこのあともう一度sendIdentifyRequest()を呼んでいるので、TerminatedになったCalculatorActorの代わりのCalculatorActor(別インスタンス)が同じパス

  • "akka.tcp://CalculatorSystem@127.0.0.1:2552/user/calculator"

上にあれば、再びCalculatorActorを監視に入れることになります。

context.become(active(actor))によって、receiveメソッドの実装はactiveメソッドに切り替わります

無事CalculatorActorの監視に成功したら、次は

      context.become(active(actor))

によって、receiveメソッドの動作をactive()メソッドに入れ替えます。

  def active(actor: ActorRef): Actor.Receive = {
    case op: MathOp => actor ! op
    case result: MathResult => result match {
      case AddResult(n1, n2, r) =>
        printf("Add result: %d + %d = %d\n", n1, n2, r)
      case SubtractResult(n1, n2, r) =>
        printf("Sub result: %d - %d = %d\n", n1, n2, r)
    }
    case Terminated(`actor`) =>
      println("Calculator terminated")
      sendIdentifyRequest()
      context.become(identifying)
    case ReceiveTimeout =>
    // ignore

これは、

  • MathOpを受け取ればCalculatorActor (actor) に転送
  • MatuResultを受け取ればprintf表示
  • Terminatedを(優位つの監視対象である)CalculatorActorから受け取れば、もう一度でsendIdentifyRequest()監視

という動作をします。

LokupApplication

最後にアプリケーションの説明です。これも前回の記事同様、main関数はややこしいのですが…、とにかくstartRemoteCalculatorSystem()とstartRemoteLookupSystem()という二つの関数を走らせるだけです。

「コマンドライン引数」のCalculatorとLookupを渡すと、2つの関数をの別のプロセスで走らせることができます。

sbt "runMain sample.remote.calculator.CreationApplication Calculator"
sbt "runMain sample.remote.calculator.CreationApplication Lookup"

args.isEmpty、すなわちコマンドライン引数を渡さないと、一つのプロセスの中で2つの関数を走らせます。s

object LookupApplication {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty || args.head == "Calculator")
      startRemoteCalculatorSystem()
    if (args.isEmpty || args.head == "Lookup")
      startRemoteLookupSystem()
  }

  def startRemoteCalculatorSystem(): Unit = {
    ...
  }

  def startRemoteLookupSystem(): Unit = {
    ...
  }
}

startRemoteCalculatorSystem()はActorSystemを初期化したうえで、そこからCalculatorActorも生成してしまいます。

これにより、CalculatorActorは後述のLookupActorから見てRemoteになります。

startRemoteLookupSystem()は,CalculatorActorのパス

  • "akka.tcp://CalculatorSystem@127.0.0.1:2552/user/calculator"

をLookupActorに渡して、後はLookupActor経由でどんどんAddとSubtractメッセージを投げ続けるだけです。