Akka Internals (Akkaの内部動作を知る) メッセージを送る ! メソッド
今回の記事のポイント
Akkaで ! メソッドの呼び出しを行うとき、
actor ! "Message from main()"
実際のAkka内部のコードから
- Actorの!メソッドが完全にNon-Blockingである
ことを確認したいと思います。これは
- メッセージのやり取りにNon-Blocking(lock-free) Queueを使う
ことで実現されています。
概要の説明
前回の記事のコードの一部を抜き出して…
この2行は、
val system = ActorSystem("ActorDebuggerSystem")
val actor = system.actorOf(Props[LoggingActor])
イメージで表すとこのようになります。
Actorの生成は実際には別スレッドでおこなわれ、メインスレッド側では別スレッドでのActor生成が完了したかどうか気にする必要なくメッセージを送れます。
Actor宛て(正確にはActorRef)には!メソッドを使ってメッセージを送ることができます。
actor ! "Message from main()"
詳細
詳細 メッセージを送るまで
Actor(Ref)の!メソッドはCell traitのメソッドを呼び出します。
//trait Cell in akka/actor/dungeon/ActorCell.scala
final def sendMessage(message: Any, sender: ActorRef): Unit
= sendMessage(Envelope(message, sender, system))
CellはActorの内部で使われているtraitで、普段は気にする必要はありません。
messageは"Message from main()"というString型の文字列でしたが、それをEnvelopeというクラスにラップしています。
Envelopeはmessage, sender, systemという3つのメンバを持っているだけの単純なクラスです。
そしてCellのsendMessage()メソッドは…
//trait Dispatch in akka/actor/Dispatch.scala
def sendMessage(msg: Envelope): Unit =
try {
...
dispatcher.dispatch(this, msg)
}
このDispatcherというのが何かを説明するのは難しいのですが…
ここではAkkaにおいて、**「メッセージを送り出すもの」**くらいの意味で捉えておきましょう。
Space Alcでdispatchを検索すると"〔書類・荷物などを〕送る、送り出す、発送する、急送する"という意味があることがわかります。
dispatcher.dispatch()メソッド
上の図の水色で囲った部分、dispatcher.dispatch()メソッドの中身を見ていきましょう。
//class Dispatcher in akka/dispatch/Dispatcher.scala
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
}
まずreceiver: ActorCellのMailboxを取得して
val mbox = receiver.mailbox
Image courtesy of digitalart at FreeDigitalPhotos.net
そのMailboxにenqueue()メソッドを使ってメッセージ(ここではinvocationという名前に変わっています)を送っています
mbox.enqueue(receiver.self, invocation)
Image courtesy of digitalart at FreeDigitalPhotos.net
enqueue()メソッドの中身はこうです。
//class Mailbox in akka/dispatch/Mailbox.scala
def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
実はこのMailboxにはMessageQueueというものが付随していて、そのMessageQueueは以下のように宣言されています。
//object UnboundedMailbox in in akka/dispatch/Mailbox.scala
class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { ... }
ConcurrentLinkedQueueというのはJavaの標準APIから来ていて、Oracleのドキュメンテーションを見ると…
この実装では、効率のよい非ブロックのアルゴリズムが使用されます。このアルゴリズムは次の資料で記述されているものに基づきます「Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms」Maged M. Michael、Michael L. Scott
すなわち、Non-Blockingなキューであることがわかります。ここまでの時点まではAkkaの!メソッドはNon-Blockingであることが確認できました!
残りの部分もNon-Blockingであることを確認していきましょう。
registerForExecution() - メッセージを送った後、Mailboxを別スレッドの実行用にマークしておく
さきほどのDispatcherのdispatch()メソッドに戻るとmbox.enqueue()の呼び出しが終了すると次に、
registerForExecution(mbox, true, false)
が呼び出されています。この中身は…
//class Dispatcher in akka/dispatch/Dispatcher.scala
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
...
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) {
if (mbox.setAsScheduled()) {
...
executorService execute mbox
...
} ...
} ...
}
と、Mailboxがmbox.setAsScheduled()によって(別スレッドでの)実行用にマークされていることがわかります。
executorService execute mboxは実際にexecuteをその場で行うのでなく…
//java/util/concurrent/Executor.java
public interface Executor {
/**
* Executes the given command at some time in the future. ...
*/
void execute(Runnable command);
}
とあるように、将来のどこかの時点で別スレッドで実行を行うメソッドです。
Image courtesy of digitalart at FreeDigitalPhotos.net
まとめ
以上で見てきたように
actor ! "Message from main()"
は以下のようにmainスレッド内でメッセージを
-
MailBoxのMessagingQueue(Non-Blocking Queue)に入れ
-
ExecutorServiceを使って(Mailboxごと)将来のexcecuteをスケジュール(すぐには実行しない)
することにより、完全にNon-Blockingになっていることがわかります。