Akka Internals (Akkaの内部動作を知る) メッセージを送る ! メソッド

リチャード 伊真岡 blog

Akka Internals (Akkaの内部動作を知る) メッセージを送る ! メソッド

twitter:RichardImaokaJP

今回の記事のポイント

Akkaで ! メソッドの呼び出しを行うとき、

    actor ! "Message from main()"

実際のAkka内部のコードから

  • Actorの!メソッドが完全にNon-Blockingである

ことを確認したいと思います。これは

  • メッセージのやり取りにNon-Blocking(lock-free) Queueを使う

ことで実現されています。

概要の説明

前回の記事のコードの一部を抜き出して…

この2行は、

    val system = ActorSystem("ActorDebuggerSystem")
    val actor  = system.actorOf(Props[LoggingActor])

イメージで表すとこのようになります。

Actorの生成は実際には別スレッドでおこなわれ、メインスレッド側では別スレッドでのActor生成が完了したかどうか気にする必要なくメッセージを送れます

Actor宛て(正確にはActorRef)には!メソッドを使ってメッセージを送ることができます。

    actor ! "Message from main()"

詳細

詳細 メッセージを送るまで

Actor(Ref)の!メソッドはCell traitのメソッドを呼び出します。

//trait Cell in akka/actor/dungeon/ActorCell.scala
  final def sendMessage(message: Any, sender: ActorRef): Unit
    = sendMessage(Envelope(message, sender, system))

CellはActorの内部で使われているtraitで、普段は気にする必要はありません。

messageは"Message from main()"というString型の文字列でしたが、それをEnvelopeというクラスにラップしています。

Envelopeはmessage, sender, systemという3つのメンバを持っているだけの単純なクラスです。

そしてCellのsendMessage()メソッドは…

//trait Dispatch in akka/actor/Dispatch.scala
  def sendMessage(msg: Envelope): Unit =
    try {
      ...
      dispatcher.dispatch(this, msg)
    } 

このDispatcherというのが何かを説明するのは難しいのですが…

ここではAkkaにおいて、**「メッセージを送り出すもの」**くらいの意味で捉えておきましょう。

Space Alcでdispatchを検索すると"〔書類・荷物などを〕送る、送り出す、発送する、急送する"という意味があることがわかります。

dispatcher.dispatch()メソッド

上の図の水色で囲った部分、dispatcher.dispatch()メソッドの中身を見ていきましょう。

//class Dispatcher in akka/dispatch/Dispatcher.scala
  protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
    val mbox = receiver.mailbox
    mbox.enqueue(receiver.self, invocation)
    registerForExecution(mbox, true, false)
  }

まずreceiver: ActorCellのMailboxを取得して

    val mbox = receiver.mailbox

Image courtesy of digitalart at FreeDigitalPhotos.net

そのMailboxにenqueue()メソッドを使ってメッセージ(ここではinvocationという名前に変わっています)を送っています

    mbox.enqueue(receiver.self, invocation)

Image courtesy of digitalart at FreeDigitalPhotos.net

enqueue()メソッドの中身はこうです。

//class Mailbox in akka/dispatch/Mailbox.scala
  def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)

実はこのMailboxにはMessageQueueというものが付随していて、そのMessageQueueは以下のように宣言されています。

//object UnboundedMailbox in in akka/dispatch/Mailbox.scala
  class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { ... }

ConcurrentLinkedQueueというのはJavaの標準APIから来ていて、Oracleのドキュメンテーションを見ると…

ConcurrentLinkedQueue

この実装では、効率のよい非ブロックのアルゴリズムが使用されます。このアルゴリズムは次の資料で記述されているものに基づきます「Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms」Maged M. Michael、Michael L. Scott

すなわち、Non-Blockingなキューであることがわかります。ここまでの時点まではAkkaの!メソッドはNon-Blockingであることが確認できました!

残りの部分もNon-Blockingであることを確認していきましょう。

registerForExecution() - メッセージを送った後、Mailboxを別スレッドの実行用にマークしておく

さきほどのDispatcherのdispatch()メソッドに戻るとmbox.enqueue()の呼び出しが終了すると次に、

    registerForExecution(mbox, true, false)

が呼び出されています。この中身は…

//class Dispatcher in akka/dispatch/Dispatcher.scala
 protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
    ...
    if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { 
      if (mbox.setAsScheduled()) {
        ...
          executorService execute mbox
        ...
      } ...
    } ...
  }

と、Mailboxがmbox.setAsScheduled()によって(別スレッドでの)実行用にマークされていることがわかります。

executorService execute mboxは実際にexecuteをその場で行うのでなく…

//java/util/concurrent/Executor.java
public interface Executor {
    /**
     * Executes the given command at some time in the future.  ...
     */
    void execute(Runnable command);
}

とあるように、将来のどこかの時点で別スレッドで実行を行うメソッドです。

Image courtesy of digitalart at FreeDigitalPhotos.net

まとめ

以上で見てきたように

    actor ! "Message from main()"

は以下のようにmainスレッド内でメッセージを

  • MailBoxのMessagingQueue(Non-Blocking Queue)に入れ

  • ExecutorServiceを使って(Mailboxごと)将来のexcecuteをスケジュール(すぐには実行しない)

することにより、完全にNon-Blockingになっていることがわかります。