Akka Internals (Akkaの内部動作を知る) Remoteでメッセージを送る場合の ! メソッドの動作

リチャード 伊真岡 blog

Akka Internals (Akkaの内部動作を知る) Remoteでメッセージを送る場合の ! メソッドの動作

twitter:RichardImaokaJP

今回の記事のポイント

Remote Actorを使うときにAkkaの ! メソッドの内部動作は変わり、

Remote Actorにメッセージを送るときは、数段階の「内部」Actorを経由して送られる

ということを確認していきたいと思います。

前回の記事 Akka Internals (Akkaの内部動作を知る) メッセージを送る ! メソッド

概要の説明

まず、送信元のActor Aからあて先のActor Bには、Local Actorの時と同じように、 ! メソッドを使って送ります。

//Code within actorA
    actorB ! "Some Message"

ただし、実際には以下の3つの「内部」Actorを経由します。

RemoteActorRefの実装によって、Remoteのメッセージ送信の動作がLocalの時とは変わっている

この違いは、Remote Actorに送るときは、メッセージの宛先のActorRefが、RemoteActorRefになっているためで、その! メソッドは以下のようになっています。

//class RemoteActorRef in akka/remote/RemoteActorRefProvider.scala
  override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
    ...
    try remote.send(message, Option(sender), this) catch handleException
  }

このメソッドは以下のsendメソッドを呼び出しますが、sendメソッドはメッセージの元々のあて先(actorB)をEndpointManager型のActorに差し替えます

//class Remoting (extends RemoteTranspor) in akka/remote/Remoting.scala
  override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
    case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
    ...
  }

これは、Sendというcase classに元々の宛先を保存し、manager変数(EndpointManager型のActor)のtellメソッドを呼ぶことで実現しています。

Image courtesy of digitalart at FreeDigitalPhotos.net

Sendというcase classに内包されたメッセージは、さらに(前回の記事で見たように)Envelopeというcase classに内包されて、EndPointManager actorのMailboxに届けられます。

Image courtesy of digitalart at FreeDigitalPhotos.net

そこから、ReliableDeliverySupervisorというActorを経由して、EndPointWriterというActorに届けられます。

Image courtesy of digitalart at FreeDigitalPhotos.net

このEndPointWriterが実際に元々の宛先であったRemote ActorのActorBにメッセージを送ります。

Sendというcase classに元々の宛先を保存しているので、複数のActorを経由しても最終的にActorBにメッセージを送ることができます。

EndPointWriterの動作

このEndPointWriterの実装はやや複雑なのですが、簡単に言うと

  • メッセージを逐一送っていくのではなく、一旦バッファにためて後で一気に送る

このEndPointWriteの実装や「どれくらいの頻度でバッファを全部クリアに一気にメッセージを送るか」のチューニング、というのはメッセージングのパフォーマンスを左右する重要なところなので、いつか記事を書ければと思います。

さて、このEndPointWriterというActorの実装を見ていくと、メッセージをためるバッファはJavaの標準クラスであるLinkedListを使っています。

//class EndpointReader in akka.remote.Endpoint.scala
  val buffer = new java.util.LinkedList[AnyRef]

このバッファの実装はAkkaのパフォーマンスを大きく左右すると考えられるので、おそらくLinkedListは十分なパフォーマンスを持っているということなのでしょう。

次に以下のメソッドですが

//class EndpointReader in akka.remote.Endpoint.scala
  def sendBufferedMessages(): Unit = {
    ...
    val ok = writePrioLoop() && writeLoop(SendBufferBatchSize)
    ...
  }

このsendBufferedMessages()メソッドが呼ばれると、メッセージをためたバッファがクリアされ、メッセージがRemote Actor宛てに一気に送信されることになります。

EndPointWriterのなかではsendBufferedMessages()が繰り返し呼ばれて(EndPointWriter自身が呼び出しを決められた秒数毎にスケジュールしている)いることがわかります。

sendBufferedMessages()内で呼び出されるwritePrioLoop() も writeLoop(SendBufferBatchSize) も、バッファ内部の個別のメッセージを送る際には以下のwriteSend()メソッドを呼び出していて、

//class EndpointReader in akka.remote.Endpoint.scala
  def writeSend(s: Send): Boolean = try {
    handle match {
      case Some(h) ⇒
        ...
        val pdu = codec.constructMessage( ... )
        ...
          val ok = h.write(pdu)
        ...
  }

このhandleというのは

handle = Option[AkkaProtocolHandle]

となっていて、

          val ok = h.write(pdu)

をよびだすと、AkkaProtocolHandle traitのインスタンス(デフォルトではtcp向けの実装)を使って、実装されたプロトコルでメッセージを送ります。

AkkaProtocolHandle についても、いつか別の記事を書こうと思います。