Akka Internals (Akkaの内部動作を知る) Remoteでメッセージを送る場合の ! メソッドの動作
今回の記事のポイント
Remote Actorを使うときにAkkaの ! メソッドの内部動作は変わり、
Remote Actorにメッセージを送るときは、数段階の「内部」Actorを経由して送られる
ということを確認していきたいと思います。
前回の記事 Akka Internals (Akkaの内部動作を知る) メッセージを送る ! メソッド
概要の説明
まず、送信元のActor Aからあて先のActor Bには、Local Actorの時と同じように、 ! メソッドを使って送ります。
//Code within actorA
actorB ! "Some Message"
ただし、実際には以下の3つの「内部」Actorを経由します。
RemoteActorRefの実装によって、Remoteのメッセージ送信の動作がLocalの時とは変わっている
この違いは、Remote Actorに送るときは、メッセージの宛先のActorRefが、RemoteActorRefになっているためで、その! メソッドは以下のようになっています。
//class RemoteActorRef in akka/remote/RemoteActorRefProvider.scala
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
...
try remote.send(message, Option(sender), this) catch handleException
}
このメソッドは以下のsendメソッドを呼び出しますが、sendメソッドはメッセージの元々のあて先(actorB)をEndpointManager型のActorに差し替えます。
//class Remoting (extends RemoteTranspor) in akka/remote/Remoting.scala
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match {
case Some(manager) ⇒ manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender)
...
}
これは、Sendというcase classに元々の宛先を保存し、manager変数(EndpointManager型のActor)のtellメソッドを呼ぶことで実現しています。
Image courtesy of digitalart at FreeDigitalPhotos.net
Sendというcase classに内包されたメッセージは、さらに(前回の記事で見たように)Envelopeというcase classに内包されて、EndPointManager actorのMailboxに届けられます。
Image courtesy of digitalart at FreeDigitalPhotos.net
そこから、ReliableDeliverySupervisorというActorを経由して、EndPointWriterというActorに届けられます。
Image courtesy of digitalart at FreeDigitalPhotos.net
このEndPointWriterが実際に元々の宛先であったRemote ActorのActorBにメッセージを送ります。
Sendというcase classに元々の宛先を保存しているので、複数のActorを経由しても最終的にActorBにメッセージを送ることができます。
EndPointWriterの動作
このEndPointWriterの実装はやや複雑なのですが、簡単に言うと
- メッセージを逐一送っていくのではなく、一旦バッファにためて後で一気に送る
このEndPointWriteの実装や「どれくらいの頻度でバッファを全部クリアに一気にメッセージを送るか」のチューニング、というのはメッセージングのパフォーマンスを左右する重要なところなので、いつか記事を書ければと思います。
さて、このEndPointWriterというActorの実装を見ていくと、メッセージをためるバッファはJavaの標準クラスであるLinkedListを使っています。
//class EndpointReader in akka.remote.Endpoint.scala
val buffer = new java.util.LinkedList[AnyRef]
このバッファの実装はAkkaのパフォーマンスを大きく左右すると考えられるので、おそらくLinkedListは十分なパフォーマンスを持っているということなのでしょう。
次に以下のメソッドですが
//class EndpointReader in akka.remote.Endpoint.scala
def sendBufferedMessages(): Unit = {
...
val ok = writePrioLoop() && writeLoop(SendBufferBatchSize)
...
}
このsendBufferedMessages()メソッドが呼ばれると、メッセージをためたバッファがクリアされ、メッセージがRemote Actor宛てに一気に送信されることになります。
EndPointWriterのなかではsendBufferedMessages()が繰り返し呼ばれて(EndPointWriter自身が呼び出しを決められた秒数毎にスケジュールしている)いることがわかります。
sendBufferedMessages()内で呼び出されるwritePrioLoop() も writeLoop(SendBufferBatchSize) も、バッファ内部の個別のメッセージを送る際には以下のwriteSend()メソッドを呼び出していて、
//class EndpointReader in akka.remote.Endpoint.scala
def writeSend(s: Send): Boolean = try {
handle match {
case Some(h) ⇒
...
val pdu = codec.constructMessage( ... )
...
val ok = h.write(pdu)
...
}
このhandleというのは
handle = Option[AkkaProtocolHandle]
となっていて、
val ok = h.write(pdu)
をよびだすと、AkkaProtocolHandle traitのインスタンス(デフォルトではtcp向けの実装)を使って、実装されたプロトコルでメッセージを送ります。
AkkaProtocolHandle についても、いつか別の記事を書こうと思います。