モーダルを閉じる工作HardwareHub ロゴ画像

工作HardwareHubは、ロボット工作や電子工作に関する情報やモノが行き交うコミュニティサイトです。さらに詳しく

利用規約プライバシーポリシー に同意したうえでログインしてください。

工作HardwareHub ロゴ画像 (Laptop端末利用時)
工作HardwareHub ロゴ画像 (Mobile端末利用時)

Akka 基本的な使い方 (Scala)

モーダルを閉じる

ステッカーを選択してください

モーダルを閉じる

お支払い内容をご確認ください

購入商品
」ステッカーの表示権
メッセージ
料金
(税込)
決済方法
GooglePayマーク
決済プラットフォーム
確認事項

利用規約をご確認のうえお支払いください

※カード情報はGoogleアカウント内に保存されます。本サイトやStripeには保存されません

※記事の執筆者は購入者のユーザー名を知ることができます

※購入後のキャンセルはできません

作成日作成日
2016/02/27
最終更新最終更新
2021/10/07
記事区分記事区分
一般公開

目次

    アカウント プロフィール画像 (サイドバー)

    自動化が好きです。

    0
    ステッカーを贈るとは?

    Akka 2.4.2 を用いたサンプルコード集です。動作には Java 8 以降が必要です。

    Akka requires that you have Java 8 or later installed on your machine.

    インストール方法は複数提供されています。その一部を記載します。

    公式ドキュメント (2.4.2)

    アクターの生成

    アクター (スレッド) は生成されると無限ループに入ります。receive したメッセージを逐次処理します。アクター内でアクターを生成することもできます。スレッドセーフにするために、やり取りするメッセージは val で宣言した String, IntMap などのイミュータブルなものにすべきです。

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.event.Logging
    
    class MyActor extends Actor {
      val log = Logging(context.system, this) // ロガー
    
      // アクター内でアクターを生成できます。
      val child = context.actorOf(Props(classOf[MyActor2], "myArg3", "myArg4"), name = "myChild")
    
      def receive = {
        case s: String => {
          log.info("received: %s" format s)
          child ! s
        }
        case _ => {
        }
      }
    }
    
    class MyActor2(arg1: String, arg2: String) extends Actor {
      val log = Logging(context.system, this)
      def receive = {
        case s: String => {
          log.info("args: %s, %s, received: %s" format (arg1, arg2, s))
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
    
        // とても重い処理。アプリケーション内に一つだけ作るようにします。
        val system = ActorSystem("mySystem") // 名前はログなどに記載されます。
    
        // アクターの設定。イミュータブルにすることでスレッドセーフにします。
        val props1 = Props[MyActor]
        val props2 = Props(classOf[MyActor2], "myArg1", "myArg2") // 引数あり
    
        // アクターの生成。ActorRef が返ります。
        val actor1 = system.actorOf(props1, name = "myActor1") // 名前はログなどで使用されます。
        val actor2 = system.actorOf(props2, name = "myActor2") // 名前の重複はアクター間で許されません。
    
        while(true) {
          actor1 ! "hi actor1"
          actor2 ! "hi actor2"
          Thread.sleep(1000)
        }
      }
    }
    

    出力ログ

    [INFO] [02/27/2016 22:32:42.277] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor2] args: myArg1, myArg2, received: hi actor2
    [INFO] [02/27/2016 22:32:42.277] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor1] received: hi actor1
    [INFO] [02/27/2016 22:32:42.278] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor1/myChild] args: myArg3, myArg4, received: hi actor1
    

    アクターの停止

    生成したアクターは stop で停止できます。システム全体を停止するためには terminate を使用します。system.shutdown は非推奨になりました。アクター内で生成したアクターの場合は watch で監視することで正常に停止した際に Terminatedreceive して独自の処理を実行できます。

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.actor.ActorRef
    import akka.actor.Terminated
    import akka.pattern.ask // '?' を使用するため
    import akka.util.Timeout
    import scala.concurrent.duration._ // "5 seconds" を利用するため
    import scala.concurrent.Await
    import scala.concurrent.Future
    import scala.language.postfixOps
    
    class MyActor extends Actor {
    
      // 子スレッドを生成して監視。障害時の停止は検知できません。正常終了を検知します。
      val child = context.actorOf(Props[MyActor2], name = "myChild")
      context.watch(child)
    
      // 依頼元
      var lastSender: ActorRef = context.system.deadLetters // 既定は `/dev/null` 相当の deadLetters
    
      def receive = {
        case "kill" => {
          context.stop(child) // 停止処理
          lastSender = sender // 返信先
        }
        case Terminated(`child`) => {
          // 変数へのバインドを回避する目的でバッククォートで囲んでいます。
          lastSender ! "finished"
        }
        case _ => {
          context.stop(self) // 自分を停止することもできます。
        }
      }
    }
    
    class MyActor2 extends Actor {
      def receive = {
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
    
        // アクターを生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
        Thread.sleep(1000)
    
        // (子スレッドの) 停止を要請
        implicit val timeout = Timeout(5 seconds) // '?' の暗黙の引数として必要な情報です。
        val future: Future[Any] = actor ? "kill"
        // val future: Future[String] = ask(actor, "kill").mapTo[String] // としても同じです。
    
        // 同期して待つ (非同期ではない。ここで処理が停止)
        val result = Await.result(future, timeout.duration).asInstanceOf[String]
        // val result = Await.result(future, timeout.duration) // ↑の後者の場合は String に変換済み。
    
        println(result)
    
        // アクターの停止
        system.stop(actor)
        Thread.sleep(1000)
    
        // システムの停止 (`system.shutdown` は非推奨になりました。)
        system.terminate
      }
    }
    

    出力結果

    finished
    

    アクター障害発生時の復旧

    例外が発生してもアクターは自動で再起動します。再起動時に独自の処理をフックして入れ込むことができます。

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.event.Logging
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      // preStart -> preRestart -> postRestart/preStart -> postStop
      // [---- Old Instance ----] [-------- New Instance --------]
      // [---- Old Child -------] [-------- New Child -----------]
      // <---------------- Same MailBox ------------------------->
    
      // New Instance のコンストラクト時にも実行されます。
      val child = context.actorOf(Props[MyActor2], name = "myChild")
    
      override def preStart: Unit = {
        log.info("preStart")
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        // これを定義しない場合は `preStart` の処理が実行されます。
        // (その際 `context.stop(child)` は実行されます)
        log.info("preRestart")
        if (message.nonEmpty)
          log.info("last message was: %s" format message)
        context.stop(child) // stop しないと old child が残存し `myChild` で名前が重複するため例外が発生します。
      }
    
      override def postRestart(reason: Throwable): Unit = {
        log.info("postRestart")
      }
    
      override def postStop: Unit = {
        log.info("postStop")
      }
    
      def receive = {
        case "crash" => {
          throw new Exception
        }
        case s: String => {
          log.info(s)
          child ! s
        }
        case _ => {
        }
      }
    }
    
    class MyActor2 extends Actor {
      val log = Logging(context.system, this)
    
      override def preStart: Unit = {
        log.info("preStart")
      }
    
      override def postStop: Unit = {
        log.info("postStop")
      }
    
      def receive = {
        case s: String => {
          log.info(s)
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
        Thread.sleep(1000)
    
        // メッセージを送信
        actor ! "crash"
    
        while(true) {
          Thread.sleep(1000)
          actor ! "hi"
        }
      }
    }
    

    実行結果

    [INFO] [02/28/2016 03:10:43.461] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] preStart
    [INFO] [02/28/2016 03:10:43.461] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] preStart
    [ERROR] [02/28/2016 03:10:44.473] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] null
    java.lang.Exception
    at myapp.MyActor$$anonfun$receive$1.applyOrElse(Main.scala:42)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
    at myapp.MyActor.aroundReceive(Main.scala:8)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    
    [INFO] [02/28/2016 03:10:44.474] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] preRestart
    [INFO] [02/28/2016 03:10:44.474] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] last message was: Some(crash)
    [INFO] [02/28/2016 03:10:44.478] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] postStop
    [INFO] [02/28/2016 03:10:44.483] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] postRestart
    [INFO] [02/28/2016 03:10:44.483] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] preStart
    [INFO] [02/28/2016 03:10:45.467] [mySystem-akka.actor.default-dispatcher-6] [akka://mySystem/user/myActor] hi
    [INFO] [02/28/2016 03:10:45.468] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
    [INFO] [02/28/2016 03:10:46.472] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] hi
    [INFO] [02/28/2016 03:10:46.472] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
    

    アクターの検索

    actorSelection によって、アクターを検索できます。ただし、この方法は特殊な場合に有用なものであって、通常は ActorRef をコンストラクト時に渡したり、メッセージ内で ActorRef を受け渡しすることが推奨されます

    It is always preferable to communicate with other Actors using their ActorRef instead of relying upon ActorSelection. Exceptions are

    sending messages using the At-Least-Once Delivery facility
    initiating first contact with a remote system

    In all other cases ActorRefs can be provided during Actor creation or initialization, passing them from parent to child or introducing Actors by sending their ActorRefs to other Actors within messages.

    import akka.actor.{Actor, ActorIdentity, ActorRef, ActorSystem, Identify, Props}
    import akka.event.Logging
    import akka.pattern.ask
    import akka.util.Timeout
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._
    import scala.language.postfixOps
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      val identifyId = 1 // 問い合わせ番号
      val child = context.actorOf(Props[MyActor2], name = "myChild")
    
      var lastSender = context.system.deadLetters
    
      def receive = {
        case "search" => {
          context.actorSelection("/user/myActor/myChild") ! Identify(identifyId) // 絶対パス
    
          // その他の指定方法
          // context.actorSelection("../myActor/myChild") ! Identify(identifyId) // 相対パス
          // context.actorSelection("myChild") ! Identify(identifyId) // 相対パス (`./myChild` の意味)
          // context.actorSelection("myChi*") ! Identify(identifyId) // ワイルドカード
    
          lastSender = sender
        }
        case ActorIdentity(`identifyId`, Some(ref)) => {
          log.info("found")
          lastSender ! ref // 検索結果を返す
        }
        case ActorIdentity(`identifyId`, None) => {
          log.info("not found")
        }
        case s: String => {
          log.info(s)
          child ! s
        }
        case _ => {
        }
      }
    }
    
    class MyActor2 extends Actor {
      val log = Logging(context.system, this)
      def receive = {
        case s: String => {
          log.info(s)
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
        implicit val timeout = Timeout(5 seconds)
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
        Thread.sleep(1000)
    
        // アクター内で別のアクターの検索を要請
        val future: Future[Any] = actor ? "search"
        val result = Await.result(future, timeout.duration).asInstanceOf[ActorRef]
        result ! "hi child"
    
        // アクター外で検索する場合
        val identifyId = 2
        val future2: Future[Any] = system.actorSelection("/user/myActor") ? Identify(identifyId)
        val result2 = Await.result(future2, timeout.duration)
        result2 match {
          case ActorIdentity(`identifyId`, Some(ref)) => {
            ref ! "hi"
          }
          case ActorIdentity(`identifyId`, None) => {
          }
          case _ => {
          }
        }
    
        while(true) {
          Thread.sleep(1000)
        }
      }
    }
    

    実行結果

    [INFO] [02/28/2016 16:33:46.923] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] found
    [INFO] [02/28/2016 16:33:46.924] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] hi child
    [INFO] [02/28/2016 16:33:46.925] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] hi
    [INFO] [02/28/2016 16:33:46.925] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] hi
    

    ケースクラスをメッセージとして渡す

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.event.Logging
    
    case class MyCaseClass(prop1: Int, prop2: String)
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      def receive = {
        case MyCaseClass(prop1, prop2) => {
          log.info("prop1: %d, prop2: %s" format (prop1, prop2))
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        // ケースクラスを渡す
        val msg = new MyCaseClass(123, "abc")
    
        while(true) {
          Thread.sleep(1000)
          actor ! msg
        }
      }
    }
    

    実行結果

    [INFO] [02/28/2016 18:52:19.758] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor] prop1: 123, prop2: abc
    

    アクターの状態遷移

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.event.Logging
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      def angry: Receive = {
        case "be angry" => {
          log.info("already angry")
        }
        case "be happy" => {
          log.info("angry -> happy")
          context.become(happy)
        }
        case _ => {
          log.info("unbecome angry")
          context.unbecome
        }
      }
    
      def happy: Receive = {
        case "be angry" => {
          log.info("happy -> angry")
          context.become(angry)
        }
        case "be happy" => {
          log.info("already happy")
        }
        case _ => {
          log.info("unbecome happy")
          context.unbecome
        }
      }
    
      def receive = {
        case "be angry" => {
          log.info("become angry")
          context.become(angry)
        }
        case "be happy" => {
          log.info("become happy")
          context.become(happy)
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        // 状態遷移
        actor ! "be angry"
        Thread.sleep(1000)
    
        actor ! "be happy"
        Thread.sleep(1000)
    
        actor ! "be happy"
        Thread.sleep(1000)
    
        actor ! "dummy"
        Thread.sleep(1000)
    
        // 終了
        system.terminate
      }
    }
    

    実行結果

    [INFO] [02/28/2016 18:58:28.370] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] become angry
    [INFO] [02/28/2016 18:58:29.370] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] angry -> happy
    [INFO] [02/28/2016 18:58:30.371] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] already happy
    [INFO] [02/28/2016 18:58:31.374] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] unbecome happy
    

    他のアクターへのメッセージの転送

    forward すると sender を維持しつつ他のアクターにメッセージを転送できます。以下の例で ! によっていつもどおり送信した場合は Actor[akka://mySystem/user/myActor#-1013060199] からのメッセージであることが分かります。一方、転送した場合はアクター外からのメッセージのため Actor[akka://mySystem/deadLetters] が sender となっています。

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.event.Logging
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      val child = context.actorOf(Props[MyActor2], name = "myChild")
    
      def receive = {
        case s: String => {
          log.info(s)
          child.forward(s) // 転送
          child ! s // コピー送信
        }
        case _ => {
        }
      }
    }
    
    class MyActor2 extends Actor {
      val log = Logging(context.system, this)
      def receive = {
        case s: String => {
          log.info(s)
          log.info(sender.toString)
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        // メッセージを送信
        actor ! "hi"
        Thread.sleep(1000)
    
        // 終了
        system.terminate
      }
    }
    

    実行結果

    [INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] hi
    [INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
    [INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] Actor[akka://mySystem/deadLetters]
    [INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
    [INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] Actor[akka://mySystem/user/myActor#-1013060199]
    

    子アクター内で発生した例外処理

    アクターを生成したアクターは Supervisor とよばれ、子アクター内で発生した例外処理を行います。supervisorStrategy を override することで、既定の例外処理の設定を上書きできます。system.actorOf() を利用して生成したトップレベルのアクターは User Guardian Actor によって Supervise されるため、例外処理をカスタマイズするためには別の方法が必要です。

    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    import akka.event.Logging
    import akka.actor.OneForOneStrategy // それぞれの子アクターが個別に例外処理される
    // import akka.actor.AllForOneStrategy // ある子アクターで例外が発生すると、他のすべての子アクターにも同じ例外処理を適用
    // ↑ひとつが壊れるとすべてダメになるケースで使用します。
    import akka.actor.SupervisorStrategy._
    import scala.concurrent.duration._
    import scala.language.postfixOps
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      val child = context.actorOf(Props[MyActor2], name = "myChild")
    
      // ここで例外処理を設定。
      // ウィンドウサイズ 60 秒で 10 回を越えて Restart が発生すると、例外処理を行わずに Stop します。
      override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
        case _: ArithmeticException => Resume // ゼロ除算など。握り潰して何もなかったことにする (Resume)
        case _: NullPointerException => Restart
        case _: IllegalArgumentException => Stop
        case _: Exception => Escalate // MyActor で発生した例外として扱う
      }
    
      def receive = {
        case s: String => {
          log.info(s)
          child ! s
        }
        case _ => {
        }
      }
    }
    
    class MyActor2 extends Actor {
      val log = Logging(context.system, this)
    
      override def preStart: Unit = {
        log.info("preStart")
      }
    
      override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
        log.info("preRestart")
      }
    
      override def postRestart(reason: Throwable): Unit = {
        log.info("postRestart")
      }
    
      override def postStop: Unit = {
        log.info("postStop")
      }
    
      def receive = {
        case "let's throw ArithmeticException" => {
          throw new ArithmeticException
        }
        case "let's throw NullPointerException" => {
          throw new NullPointerException
        }
        case "let's throw IllegalArgumentException" => {
          throw new IllegalArgumentException
        }
        case "let's throw Exception" => {
          throw new Exception
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
        Thread.sleep(1000)
    
        actor ! "let's throw ArithmeticException"
        Thread.sleep(1000)
        actor ! "let's throw NullPointerException"
        Thread.sleep(1000)
        actor ! "let's throw IllegalArgumentException"
        Thread.sleep(1000)
        actor ! "let's throw Exception"
        Thread.sleep(1000)
    
        // 終了
        system.terminate
      }
    }
    

    実行結果

    [INFO] [02/28/2016 21:36:08.122] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] preStart
    [INFO] [02/28/2016 21:36:09.121] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] let's throw ArithmeticException ←Resume
    [WARN] [02/28/2016 21:36:09.127] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] null
    [INFO] [02/28/2016 21:36:10.125] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] let's throw NullPointerException ←Restart
    [ERROR] [02/28/2016 21:36:10.127] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] null
    java.lang.NullPointerException
    at myapp.server.MyActor2$$anonfun$receive$2.applyOrElse(Main.scala:62)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
    at myapp.server.MyActor2.aroundReceive(Main.scala:38)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    
    [INFO] [02/28/2016 21:36:10.128] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] preRestart
    [INFO] [02/28/2016 21:36:10.130] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] postRestart
    [INFO] [02/28/2016 21:36:11.129] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] let's throw IllegalArgumentException ←Stop
    [ERROR] [02/28/2016 21:36:11.130] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] null
    java.lang.IllegalArgumentException
    at myapp.server.MyActor2$$anonfun$receive$2.applyOrElse(Main.scala:65)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
    at myapp.server.MyActor2.aroundReceive(Main.scala:38)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    
    [INFO] [02/28/2016 21:36:11.132] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] postStop
    [INFO] [02/28/2016 21:36:12.133] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor] let's throw Exception ←Stop された後のため `child ! s` としても dead letters 行き。
    [INFO] [02/28/2016 21:36:12.135] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] Message [java.lang.String] from Actor[akka://mySystem/user/myActor#-1824119388] to Actor[akka://mySystem/user/myActor/myChild#-563721550] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
    

    Future の扱い方

    アクターに ? (ask) すると Future が返されます。Future はモナドのひとつです。コレクション型を扱う際に使用する formap などで処理できます。

    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import akka.pattern.ask
    import akka.util.Timeout
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.concurrent.ExecutionContext.Implicits.global
    
    class MyActor extends Actor {
      def receive = {
        case s: String => {
          sender ! s
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
        implicit val timeout = Timeout(5 seconds)
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        // Future を生成 (ask)
        val future1: Future[Any] = actor ? "hi"
        val future2: Future[Any] = actor ? "hello"
    
        // 変換
        val future3: Future[Int] = for(f <- future1) yield f.asInstanceOf[String].length
        val future4: Future[Int] = future2.map { _.asInstanceOf[String].length } // としても同じ
    
        // 「待ちの処理」が完了したら実行する処理
        for(f <- future3) {
          println("future3 = %d" format f) //=> future3 = 2
        }
        future4.foreach { f => // としても同じ
          println("future4 = %d" format f) //=> future4 = 5
        }
    
        // 「すべての待ちの処理」が完了したら実行する処理
        for {
          f3 <- future3
          f4 <- future4
        } {
          println("future3 + future4 = %d" format (f3 + f4)) //=> future3 + future4 = 7
        }
    
        // filter もできる
        val future5 = future4.filter { _ == 0 }
        for(f <- future5) println("future5 = %d" format f) // 実行されない (空のリストを処理するイメージ)
    
        // 無限個の Future
        val futureList: List[Future[Any]] = List[String]("a", "aa", "aaa").map{ actor ? _ }
        val futureLengthList: List[Future[Int]] =
          for(future <- futureList) yield
            for(f <- future) yield f.asInstanceOf[String].length
        val futureSum: Future[Int] =
          for(lengthList <- Future.sequence(futureLengthList)) yield
            lengthList.reduceLeft{ (l, f) => l + f }
        for(f <- futureSum)
          println("sum = %d" format f) //=> sum = 6
    
        while(true) {
          Thread.sleep(1000)
        }
      }
    }
    

    コールバック

    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import akka.pattern.ask
    import akka.util.Timeout
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.util.{Failure, Success}
    
    class MyActor extends Actor {
      def receive = {
        case s: String => {
          sender ! s
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
        implicit val timeout = Timeout(5 seconds)
    
        // アクターの生成
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        // Future を生成 (ask)
        val future: Future[Any] = actor ? "hi"
    
        // コールバックの書き方 (その1)
        // 成功時
        future onSuccess {
          case s: String => {
            println(s) //=> hi
          }
          case _ => {
          }
        }
        // 失敗時
        future onFailure {
          case e: Exception => {
          }
        }
    
        // コールバックの書き方 (その2)
        // 完了時
        future onComplete {
          case Success(result) => {
            println(result) //=> hi
          }
          case Failure(faulure) => {
          }
        }
    
        // コールバックの書き方 (その3)
        // 連鎖
        future andThen {
          case Success(result) => {
            println("success") //=> success (これが発動してから↓)
          }
          case Failure(failure) => {
            println("failure")
          }
        } andThen {
          case _ => {
            println("finalize") //=> finalize (これが発動)
          }
        }
    
        while(true) {
          Thread.sleep(1000)
        }
      }
    }
    

    遅延実行

    処理をスケジューリングして遅延実行させることができます。

    import akka.actor.{Actor, ActorSystem, Props}
    import akka.event.Logging
    import scala.concurrent.duration._
    import scala.language.postfixOps
    import scala.concurrent.ExecutionContext.Implicits.global
    
    class MyActor extends Actor {
      val log = Logging(context.system, this)
    
      def receive = {
        case "trigger" => {
          log.info("trigger")
          val schedule = context.system.scheduler.scheduleOnce(10 seconds, self, "start looping") // 10秒後に `!` を実施。
          // schedule.cancel // 開始まではキャンセルが可能。
        }
        case "start looping" => {
          log.info("start looping")
          context.system.scheduler.schedule(2 seconds, 1 second, self, "loop") // 2秒後に「1秒間隔で `!` を実施」を開始。
        }
        case "loop" => {
          log.info("loop")
          context.system.scheduler.scheduleOnce(1 second) { // 1秒後に処理を実施。
            log.info("(loop)")
          }
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        actor ! "trigger"
    
        while(true) {
          Thread.sleep(1000)
        }
      }
    }
    

    実行結果

    [INFO] [2016-02-29 01:26:22.904] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] trigger
    ↓10秒後
    [INFO] [2016-02-29 01:26:32.920] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] start looping
    ↓2秒後
    [INFO] [2016-02-29 01:26:34.942] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
    [INFO] [2016-02-29 01:26:35.941] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
    [INFO] [2016-02-29 01:26:35.961] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] (loop)
    [INFO] [2016-02-29 01:26:36.941] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
    [INFO] [2016-02-29 01:26:36.959] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] (loop)
    [INFO] [2016-02-29 01:26:37.942] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
    [INFO] [2016-02-29 01:26:37.959] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] (loop)
    

    Logback との連携

    Logback をロガーとして設定できます

    build.sbt

    libraryDependencies ++= Seq(
      "ch.qos.logback" % "logback-classic" % "1.1.3",
      "org.slf4j" % "slf4j-api" % "1.7.12",
      "com.typesafe.akka" %% "akka-slf4j" % "2.4.2"
    )
    

    src/main/resources/application.conf

    akka {
      loggers = ["akka.event.slf4j.Slf4jLogger"]
      loglevel = "INFO"
    }
    

    src/main/resources/logback.xml

    <configuration>
      <property name="LOG_DIR" value="./log" />
      <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_DIR}/myapp.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
          <!-- daily rollover  -->
          <fileNamePattern>${LOG_DIR}/myapp.%d{yyyy-MM-dd}.log</fileNamePattern>
          <!-- keep 90 days' worth of history -->
          <maxHistory>90</maxHistory>
        </rollingPolicy>
        <encoder>
          <pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
        </encoder>
      </appender>
      <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
          <pattern>%msg%n</pattern>
        </encoder>
      </appender>
      <root level="info">
        <appender-ref ref="FILE" />
        <appender-ref ref="STDOUT" />
      </root>
    </configuration>
    

    Main.scala

    import akka.actor.{Actor, ActorSystem, Props}
    import akka.event.Logging
    
    class MyActor extends Actor {
      val log = Logging(context.system.eventStream, classOf[MyActor])
      def receive = {
        case s: String => {
          log.info("logging... {}, {}", s, 123)
        }
        case _ => {
        }
      }
    }
    
    object Main {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("mySystem")
        val props = Props[MyActor]
        val actor = system.actorOf(props, name = "myActor")
    
        while(true) {
          actor ! "hi"
          Thread.sleep(1000)
        }
      }
    }
    

    実行結果 (log/myapp.log)

    2016-02-29 00:52:22,236 INFO [mySystem-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger [Slf4jLogger.scala:85] Slf4jLogger started
    2016-02-29 00:52:22,271 INFO [mySystem-akka.actor.default-dispatcher-5] q.c.s.MyActor [Slf4jLogger.scala:79] logging... hi, 123
    2016-02-29 00:52:23,269 INFO [mySystem-akka.actor.default-dispatcher-5] q.c.s.MyActor [Slf4jLogger.scala:79] logging... hi, 123
    
    0
    詳細設定を開く/閉じる
    アカウント プロフィール画像 (本文下)

    自動化が好きです。

    記事の執筆者にステッカーを贈る

    有益な情報に対するお礼として、またはコメント欄における質問への返答に対するお礼として、 記事の読者は、執筆者に有料のステッカーを贈ることができます。

    さらに詳しく →
    ステッカーを贈る コンセプト画像

    Feedbacks

    Feedbacks コンセプト画像

      ログインするとコメントを投稿できます。

      関連記事

      • Scala 文字列の処理
        書式指定 object Main { def main(args: Array[String]): Unit = { println("%d + %d = %d".format(1, 1, 2)) //=> 1 + 1 = 2 } } 文字列の比較 ヒアドキュメント 他の言語でいう「ヒアドキュメント」のようなものは """ で囲うことで実現できます。 object Main ...
        したくんしたくん6/18/2018に更新
        いいねアイコン画像0
      • Scala 日付に関する処理
        Date クラスを文字列にフォーマット import java.util.Date object Main { def main(args: Array[String]): Unit = { // format は Date に限らない文字列用の機能です。 println("%d-%d-%d" format (1, 1, 1)) //=> 1-1-1 printl...
        したくんしたくん6/5/2018に更新
        いいねアイコン画像0
      • 酢豚の基本的な使い方 (sbt)
        sbt は Scala および Java を主な対象としたビルドツールです。Scala Build Tool の略ではありませんが、Simple Build Tool という明示的な記述も公式ドキュメントなどには見当りません。以下 sbt の基本的な使用例をまとめます。使用した sbt のバージョンは 0.13 です。 公式ドキュメント [sbt 0.13](http://www.scala-sb...
        ねこねこ6/30/2018に更新
        いいねアイコン画像0
      • Scala 関数のサンプルコード
        「デフォルト引数」および「Unit 型を返す関数」 object HelloWorld { def main(args: Array[String]): Unit = { def myPrint(myArg: String = "default_value") = println(myArg + "!") val result = myPrint() //=> defau...
        したくんしたくん6/26/2018に更新
        いいねアイコン画像0
      • Scala 組み込みの制御構造
        if-else 条件分岐で知られる if-else は三項演算子のようにも使用されます。 object HelloWorld { def main(args: Array[String]): Unit = { val myVal = if (!args.isEmpty) args(0) else "default" println(myVal) } ...
        したくんしたくん10/7/2021に更新
        いいねアイコン画像0