モーダルを閉じる工作HardwareHub ロゴ画像

工作HardwareHubは、ロボット工作や電子工作に関する情報やモノが行き交うコミュニティサイトです。さらに詳しく

利用規約プライバシーポリシー に同意したうえでログインしてください。

Akka 基本的な使い方 (Scala)

モーダルを閉じる

ステッカーを選択してください

お支払い手続きへ
モーダルを閉じる

お支払い内容をご確認ください

購入商品
」ステッカーの表示権
メッセージ
料金
(税込)
決済方法
GooglePayマーク
決済プラットフォーム
確認事項

利用規約をご確認のうえお支払いください

※カード情報はGoogleアカウント内に保存されます。本サイトやStripeには保存されません

※記事の執筆者は購入者のユーザー名を知ることができます

※購入後のキャンセルはできません

作成日作成日
2016/02/27
最終更新最終更新
2021/09/07
記事区分記事区分
一般公開

Akka 2.4.2 を用いたサンプルコード集です。動作には Java 8 以降が必要です。

Akka requires that you have Java 8 or later installed on your machine.

インストール方法は複数提供されています。その一部を記載します。

公式ドキュメント (2.4.2)

アクターの生成

アクター (スレッド) は生成されると無限ループに入ります。receive したメッセージを逐次処理します。アクター内でアクターを生成することもできます。スレッドセーフにするために、やり取りするメッセージは val で宣言した String, IntMap などのイミュータブルなものにすべきです。

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging

class MyActor extends Actor {
  val log = Logging(context.system, this) // ロガー

  // アクター内でアクターを生成できます。
  val child = context.actorOf(Props(classOf[MyActor2], "myArg3", "myArg4"), name = "myChild")

  def receive = {
    case s: String => {
      log.info("received: %s" format s)
      child ! s
    }
    case _ => {
    }
  }
}

class MyActor2(arg1: String, arg2: String) extends Actor {
  val log = Logging(context.system, this)
  def receive = {
    case s: String => {
      log.info("args: %s, %s, received: %s" format (arg1, arg2, s))
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {

    // とても重い処理。アプリケーション内に一つだけ作るようにします。
    val system = ActorSystem("mySystem") // 名前はログなどに記載されます。

    // アクターの設定。イミュータブルにすることでスレッドセーフにします。
    val props1 = Props[MyActor]
    val props2 = Props(classOf[MyActor2], "myArg1", "myArg2") // 引数あり

    // アクターの生成。ActorRef が返ります。
    val actor1 = system.actorOf(props1, name = "myActor1") // 名前はログなどで使用されます。
    val actor2 = system.actorOf(props2, name = "myActor2") // 名前の重複はアクター間で許されません。

    while(true) {
      actor1 ! "hi actor1"
      actor2 ! "hi actor2"
      Thread.sleep(1000)
    }
  }
}

出力ログ

[INFO] [02/27/2016 22:32:42.277] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor2] args: myArg1, myArg2, received: hi actor2
[INFO] [02/27/2016 22:32:42.277] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor1] received: hi actor1
[INFO] [02/27/2016 22:32:42.278] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor1/myChild] args: myArg3, myArg4, received: hi actor1

アクターの停止

生成したアクターは stop で停止できます。システム全体を停止するためには terminate を使用します。system.shutdown は非推奨になりました。アクター内で生成したアクターの場合は watch で監視することで正常に停止した際に Terminatedreceive して独自の処理を実行できます。

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.pattern.ask // '?' を使用するため
import akka.util.Timeout
import scala.concurrent.duration._ // "5 seconds" を利用するため
import scala.concurrent.Await
import scala.concurrent.Future
import scala.language.postfixOps

class MyActor extends Actor {

  // 子スレッドを生成して監視。障害時の停止は検知できません。正常終了を検知します。
  val child = context.actorOf(Props[MyActor2], name = "myChild")
  context.watch(child)

  // 依頼元
  var lastSender: ActorRef = context.system.deadLetters // 既定は `/dev/null` 相当の deadLetters

  def receive = {
    case "kill" => {
      context.stop(child) // 停止処理
      lastSender = sender // 返信先
    }
    case Terminated(`child`) => {
      // 変数へのバインドを回避する目的でバッククォートで囲んでいます。
      lastSender ! "finished"
    }
    case _ => {
      context.stop(self) // 自分を停止することもできます。
    }
  }
}

class MyActor2 extends Actor {
  def receive = {
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")

    // アクターを生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")
    Thread.sleep(1000)

    // (子スレッドの) 停止を要請
    implicit val timeout = Timeout(5 seconds) // '?' の暗黙の引数として必要な情報です。
    val future: Future[Any] = actor ? "kill"
    // val future: Future[String] = ask(actor, "kill").mapTo[String] // としても同じです。

    // 同期して待つ (非同期ではない。ここで処理が停止)
    val result = Await.result(future, timeout.duration).asInstanceOf[String]
    // val result = Await.result(future, timeout.duration) // ↑の後者の場合は String に変換済み。

    println(result)

    // アクターの停止
    system.stop(actor)
    Thread.sleep(1000)

    // システムの停止 (`system.shutdown` は非推奨になりました。)
    system.terminate
  }
}

出力結果

finished

アクター障害発生時の復旧

例外が発生してもアクターは自動で再起動します。再起動時に独自の処理をフックして入れ込むことができます。

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging

class MyActor extends Actor {
  val log = Logging(context.system, this)

  // preStart -> preRestart -> postRestart/preStart -> postStop
  // [---- Old Instance ----] [-------- New Instance --------]
  // [---- Old Child -------] [-------- New Child -----------]
  // <---------------- Same MailBox ------------------------->

  // New Instance のコンストラクト時にも実行されます。
  val child = context.actorOf(Props[MyActor2], name = "myChild")

  override def preStart: Unit = {
    log.info("preStart")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    // これを定義しない場合は `preStart` の処理が実行されます。
    // (その際 `context.stop(child)` は実行されます)
    log.info("preRestart")
    if (message.nonEmpty)
      log.info("last message was: %s" format message)
    context.stop(child) // stop しないと old child が残存し `myChild` で名前が重複するため例外が発生します。
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info("postRestart")
  }

  override def postStop: Unit = {
    log.info("postStop")
  }

  def receive = {
    case "crash" => {
      throw new Exception
    }
    case s: String => {
      log.info(s)
      child ! s
    }
    case _ => {
    }
  }
}

class MyActor2 extends Actor {
  val log = Logging(context.system, this)

  override def preStart: Unit = {
    log.info("preStart")
  }

  override def postStop: Unit = {
    log.info("postStop")
  }

  def receive = {
    case s: String => {
      log.info(s)
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")
    Thread.sleep(1000)

    // メッセージを送信
    actor ! "crash"

    while(true) {
      Thread.sleep(1000)
      actor ! "hi"
    }
  }
}

実行結果

[INFO] [02/28/2016 03:10:43.461] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] preStart
[INFO] [02/28/2016 03:10:43.461] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] preStart
[ERROR] [02/28/2016 03:10:44.473] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] null
java.lang.Exception
at myapp.MyActor$$anonfun$receive$1.applyOrElse(Main.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at myapp.MyActor.aroundReceive(Main.scala:8)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [02/28/2016 03:10:44.474] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] preRestart
[INFO] [02/28/2016 03:10:44.474] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] last message was: Some(crash)
[INFO] [02/28/2016 03:10:44.478] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] postStop
[INFO] [02/28/2016 03:10:44.483] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] postRestart
[INFO] [02/28/2016 03:10:44.483] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] preStart
[INFO] [02/28/2016 03:10:45.467] [mySystem-akka.actor.default-dispatcher-6] [akka://mySystem/user/myActor] hi
[INFO] [02/28/2016 03:10:45.468] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
[INFO] [02/28/2016 03:10:46.472] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] hi
[INFO] [02/28/2016 03:10:46.472] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi

アクターの検索

actorSelection によって、アクターを検索できます。ただし、この方法は特殊な場合に有用なものであって、通常は ActorRef をコンストラクト時に渡したり、メッセージ内で ActorRef を受け渡しすることが推奨されます

It is always preferable to communicate with other Actors using their ActorRef instead of relying upon ActorSelection. Exceptions are

sending messages using the At-Least-Once Delivery facility
initiating first contact with a remote system

In all other cases ActorRefs can be provided during Actor creation or initialization, passing them from parent to child or introducing Actors by sending their ActorRefs to other Actors within messages.

import akka.actor.{Actor, ActorIdentity, ActorRef, ActorSystem, Identify, Props}
import akka.event.Logging
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

class MyActor extends Actor {
  val log = Logging(context.system, this)

  val identifyId = 1 // 問い合わせ番号
  val child = context.actorOf(Props[MyActor2], name = "myChild")

  var lastSender = context.system.deadLetters

  def receive = {
    case "search" => {
      context.actorSelection("/user/myActor/myChild") ! Identify(identifyId) // 絶対パス

      // その他の指定方法
      // context.actorSelection("../myActor/myChild") ! Identify(identifyId) // 相対パス
      // context.actorSelection("myChild") ! Identify(identifyId) // 相対パス (`./myChild` の意味)
      // context.actorSelection("myChi*") ! Identify(identifyId) // ワイルドカード

      lastSender = sender
    }
    case ActorIdentity(`identifyId`, Some(ref)) => {
      log.info("found")
      lastSender ! ref // 検索結果を返す
    }
    case ActorIdentity(`identifyId`, None) => {
      log.info("not found")
    }
    case s: String => {
      log.info(s)
      child ! s
    }
    case _ => {
    }
  }
}

class MyActor2 extends Actor {
  val log = Logging(context.system, this)
  def receive = {
    case s: String => {
      log.info(s)
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")
    implicit val timeout = Timeout(5 seconds)

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")
    Thread.sleep(1000)

    // アクター内で別のアクターの検索を要請
    val future: Future[Any] = actor ? "search"
    val result = Await.result(future, timeout.duration).asInstanceOf[ActorRef]
    result ! "hi child"

    // アクター外で検索する場合
    val identifyId = 2
    val future2: Future[Any] = system.actorSelection("/user/myActor") ? Identify(identifyId)
    val result2 = Await.result(future2, timeout.duration)
    result2 match {
      case ActorIdentity(`identifyId`, Some(ref)) => {
        ref ! "hi"
      }
      case ActorIdentity(`identifyId`, None) => {
      }
      case _ => {
      }
    }

    while(true) {
      Thread.sleep(1000)
    }
  }
}

実行結果

[INFO] [02/28/2016 16:33:46.923] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] found
[INFO] [02/28/2016 16:33:46.924] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] hi child
[INFO] [02/28/2016 16:33:46.925] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] hi
[INFO] [02/28/2016 16:33:46.925] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] hi

ケースクラスをメッセージとして渡す

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging

case class MyCaseClass(prop1: Int, prop2: String)

class MyActor extends Actor {
  val log = Logging(context.system, this)

  def receive = {
    case MyCaseClass(prop1, prop2) => {
      log.info("prop1: %d, prop2: %s" format (prop1, prop2))
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    // ケースクラスを渡す
    val msg = new MyCaseClass(123, "abc")

    while(true) {
      Thread.sleep(1000)
      actor ! msg
    }
  }
}

実行結果

[INFO] [02/28/2016 18:52:19.758] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor] prop1: 123, prop2: abc

アクターの状態遷移

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging

class MyActor extends Actor {
  val log = Logging(context.system, this)

  def angry: Receive = {
    case "be angry" => {
      log.info("already angry")
    }
    case "be happy" => {
      log.info("angry -> happy")
      context.become(happy)
    }
    case _ => {
      log.info("unbecome angry")
      context.unbecome
    }
  }

  def happy: Receive = {
    case "be angry" => {
      log.info("happy -> angry")
      context.become(angry)
    }
    case "be happy" => {
      log.info("already happy")
    }
    case _ => {
      log.info("unbecome happy")
      context.unbecome
    }
  }

  def receive = {
    case "be angry" => {
      log.info("become angry")
      context.become(angry)
    }
    case "be happy" => {
      log.info("become happy")
      context.become(happy)
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    // 状態遷移
    actor ! "be angry"
    Thread.sleep(1000)

    actor ! "be happy"
    Thread.sleep(1000)

    actor ! "be happy"
    Thread.sleep(1000)

    actor ! "dummy"
    Thread.sleep(1000)

    // 終了
    system.terminate
  }
}

実行結果

[INFO] [02/28/2016 18:58:28.370] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] become angry
[INFO] [02/28/2016 18:58:29.370] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] angry -> happy
[INFO] [02/28/2016 18:58:30.371] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] already happy
[INFO] [02/28/2016 18:58:31.374] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] unbecome happy

他のアクターへのメッセージの転送

forward すると sender を維持しつつ他のアクターにメッセージを転送できます。以下の例で ! によっていつもどおり送信した場合は Actor[akka://mySystem/user/myActor#-1013060199] からのメッセージであることが分かります。一方、転送した場合はアクター外からのメッセージのため Actor[akka://mySystem/deadLetters] が sender となっています。

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging

class MyActor extends Actor {
  val log = Logging(context.system, this)

  val child = context.actorOf(Props[MyActor2], name = "myChild")

  def receive = {
    case s: String => {
      log.info(s)
      child.forward(s) // 転送
      child ! s // コピー送信
    }
    case _ => {
    }
  }
}

class MyActor2 extends Actor {
  val log = Logging(context.system, this)
  def receive = {
    case s: String => {
      log.info(s)
      log.info(sender.toString)
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    // メッセージを送信
    actor ! "hi"
    Thread.sleep(1000)

    // 終了
    system.terminate
  }
}

実行結果

[INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] hi
[INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
[INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] Actor[akka://mySystem/deadLetters]
[INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] hi
[INFO] [02/28/2016 19:05:33.851] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] Actor[akka://mySystem/user/myActor#-1013060199]

子アクター内で発生した例外処理

アクターを生成したアクターは Supervisor とよばれ、子アクター内で発生した例外処理を行います。supervisorStrategy を override することで、既定の例外処理の設定を上書きできます。system.actorOf() を利用して生成したトップレベルのアクターは User Guardian Actor によって Supervise されるため、例外処理をカスタマイズするためには別の方法が必要です。

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging
import akka.actor.OneForOneStrategy // それぞれの子アクターが個別に例外処理される
// import akka.actor.AllForOneStrategy // ある子アクターで例外が発生すると、他のすべての子アクターにも同じ例外処理を適用
// ↑ひとつが壊れるとすべてダメになるケースで使用します。
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
import scala.language.postfixOps

class MyActor extends Actor {
  val log = Logging(context.system, this)

  val child = context.actorOf(Props[MyActor2], name = "myChild")

  // ここで例外処理を設定。
  // ウィンドウサイズ 60 秒で 10 回を越えて Restart が発生すると、例外処理を行わずに Stop します。
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume // ゼロ除算など。握り潰して何もなかったことにする (Resume)
    case _: NullPointerException => Restart
    case _: IllegalArgumentException => Stop
    case _: Exception => Escalate // MyActor で発生した例外として扱う
  }

  def receive = {
    case s: String => {
      log.info(s)
      child ! s
    }
    case _ => {
    }
  }
}

class MyActor2 extends Actor {
  val log = Logging(context.system, this)

  override def preStart: Unit = {
    log.info("preStart")
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info("preRestart")
  }

  override def postRestart(reason: Throwable): Unit = {
    log.info("postRestart")
  }

  override def postStop: Unit = {
    log.info("postStop")
  }

  def receive = {
    case "let's throw ArithmeticException" => {
      throw new ArithmeticException
    }
    case "let's throw NullPointerException" => {
      throw new NullPointerException
    }
    case "let's throw IllegalArgumentException" => {
      throw new IllegalArgumentException
    }
    case "let's throw Exception" => {
      throw new Exception
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")
    Thread.sleep(1000)

    actor ! "let's throw ArithmeticException"
    Thread.sleep(1000)
    actor ! "let's throw NullPointerException"
    Thread.sleep(1000)
    actor ! "let's throw IllegalArgumentException"
    Thread.sleep(1000)
    actor ! "let's throw Exception"
    Thread.sleep(1000)

    // 終了
    system.terminate
  }
}

実行結果

[INFO] [02/28/2016 21:36:08.122] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] preStart
[INFO] [02/28/2016 21:36:09.121] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] let's throw ArithmeticException ←Resume
[WARN] [02/28/2016 21:36:09.127] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] null
[INFO] [02/28/2016 21:36:10.125] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] let's throw NullPointerException ←Restart
[ERROR] [02/28/2016 21:36:10.127] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] null
java.lang.NullPointerException
at myapp.server.MyActor2$$anonfun$receive$2.applyOrElse(Main.scala:62)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at myapp.server.MyActor2.aroundReceive(Main.scala:38)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [02/28/2016 21:36:10.128] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] preRestart
[INFO] [02/28/2016 21:36:10.130] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor/myChild] postRestart
[INFO] [02/28/2016 21:36:11.129] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor] let's throw IllegalArgumentException ←Stop
[ERROR] [02/28/2016 21:36:11.130] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] null
java.lang.IllegalArgumentException
at myapp.server.MyActor2$$anonfun$receive$2.applyOrElse(Main.scala:65)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at myapp.server.MyActor2.aroundReceive(Main.scala:38)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[INFO] [02/28/2016 21:36:11.132] [mySystem-akka.actor.default-dispatcher-5] [akka://mySystem/user/myActor/myChild] postStop
[INFO] [02/28/2016 21:36:12.133] [mySystem-akka.actor.default-dispatcher-2] [akka://mySystem/user/myActor] let's throw Exception ←Stop された後のため `child ! s` としても dead letters 行き。
[INFO] [02/28/2016 21:36:12.135] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor/myChild] Message [java.lang.String] from Actor[akka://mySystem/user/myActor#-1824119388] to Actor[akka://mySystem/user/myActor/myChild#-563721550] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Future の扱い方

アクターに ? (ask) すると Future が返されます。Future はモナドのひとつです。コレクション型を扱う際に使用する formap などで処理できます。

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global

class MyActor extends Actor {
  def receive = {
    case s: String => {
      sender ! s
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")
    implicit val timeout = Timeout(5 seconds)

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    // Future を生成 (ask)
    val future1: Future[Any] = actor ? "hi"
    val future2: Future[Any] = actor ? "hello"

    // 変換
    val future3: Future[Int] = for(f <- future1) yield f.asInstanceOf[String].length
    val future4: Future[Int] = future2.map { _.asInstanceOf[String].length } // としても同じ

    // 「待ちの処理」が完了したら実行する処理
    for(f <- future3) {
      println("future3 = %d" format f) //=> future3 = 2
    }
    future4.foreach { f => // としても同じ
      println("future4 = %d" format f) //=> future4 = 5
    }

    // 「すべての待ちの処理」が完了したら実行する処理
    for {
      f3 <- future3
      f4 <- future4
    } {
      println("future3 + future4 = %d" format (f3 + f4)) //=> future3 + future4 = 7
    }

    // filter もできる
    val future5 = future4.filter { _ == 0 }
    for(f <- future5) println("future5 = %d" format f) // 実行されない (空のリストを処理するイメージ)

    // 無限個の Future
    val futureList: List[Future[Any]] = List[String]("a", "aa", "aaa").map{ actor ? _ }
    val futureLengthList: List[Future[Int]] =
      for(future <- futureList) yield
        for(f <- future) yield f.asInstanceOf[String].length
    val futureSum: Future[Int] =
      for(lengthList <- Future.sequence(futureLengthList)) yield
        lengthList.reduceLeft{ (l, f) => l + f }
    for(f <- futureSum)
      println("sum = %d" format f) //=> sum = 6

    while(true) {
      Thread.sleep(1000)
    }
  }
}

コールバック

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Success}

class MyActor extends Actor {
  def receive = {
    case s: String => {
      sender ! s
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")
    implicit val timeout = Timeout(5 seconds)

    // アクターの生成
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    // Future を生成 (ask)
    val future: Future[Any] = actor ? "hi"

    // コールバックの書き方 (その1)
    // 成功時
    future onSuccess {
      case s: String => {
        println(s) //=> hi
      }
      case _ => {
      }
    }
    // 失敗時
    future onFailure {
      case e: Exception => {
      }
    }

    // コールバックの書き方 (その2)
    // 完了時
    future onComplete {
      case Success(result) => {
        println(result) //=> hi
      }
      case Failure(faulure) => {
      }
    }

    // コールバックの書き方 (その3)
    // 連鎖
    future andThen {
      case Success(result) => {
        println("success") //=> success (これが発動してから↓)
      }
      case Failure(failure) => {
        println("failure")
      }
    } andThen {
      case _ => {
        println("finalize") //=> finalize (これが発動)
      }
    }

    while(true) {
      Thread.sleep(1000)
    }
  }
}

遅延実行

処理をスケジューリングして遅延実行させることができます。

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global

class MyActor extends Actor {
  val log = Logging(context.system, this)

  def receive = {
    case "trigger" => {
      log.info("trigger")
      val schedule = context.system.scheduler.scheduleOnce(10 seconds, self, "start looping") // 10秒後に `!` を実施。
      // schedule.cancel // 開始まではキャンセルが可能。
    }
    case "start looping" => {
      log.info("start looping")
      context.system.scheduler.schedule(2 seconds, 1 second, self, "loop") // 2秒後に「1秒間隔で `!` を実施」を開始。
    }
    case "loop" => {
      log.info("loop")
      context.system.scheduler.scheduleOnce(1 second) { // 1秒後に処理を実施。
        log.info("(loop)")
      }
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    actor ! "trigger"

    while(true) {
      Thread.sleep(1000)
    }
  }
}

実行結果

[INFO] [2016-02-29 01:26:22.904] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] trigger
↓10秒後
[INFO] [2016-02-29 01:26:32.920] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] start looping
↓2秒後
[INFO] [2016-02-29 01:26:34.942] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
[INFO] [2016-02-29 01:26:35.941] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
[INFO] [2016-02-29 01:26:35.961] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] (loop)
[INFO] [2016-02-29 01:26:36.941] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
[INFO] [2016-02-29 01:26:36.959] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] (loop)
[INFO] [2016-02-29 01:26:37.942] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] loop
[INFO] [2016-02-29 01:26:37.959] [mySystem-akka.actor.default-dispatcher-3] [akka://mySystem/user/myActor] (loop)

Logback との連携

Logback をロガーとして設定できます

build.sbt

libraryDependencies ++= Seq(
  "ch.qos.logback" % "logback-classic" % "1.1.3",
  "org.slf4j" % "slf4j-api" % "1.7.12",
  "com.typesafe.akka" %% "akka-slf4j" % "2.4.2"
)

src/main/resources/application.conf

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
}

src/main/resources/logback.xml

<configuration>
  <property name="LOG_DIR" value="./log" />
  <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${LOG_DIR}/myapp.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <!-- daily rollover  -->
      <fileNamePattern>${LOG_DIR}/myapp.%d{yyyy-MM-dd}.log</fileNamePattern>
      <!-- keep 90 days' worth of history -->
      <maxHistory>90</maxHistory>
    </rollingPolicy>
    <encoder>
      <pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
    </encoder>
  </appender>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%msg%n</pattern>
    </encoder>
  </appender>
  <root level="info">
    <appender-ref ref="FILE" />
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

Main.scala

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.Logging

class MyActor extends Actor {
  val log = Logging(context.system.eventStream, classOf[MyActor])
  def receive = {
    case s: String => {
      log.info("logging... {}, {}", s, 123)
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("mySystem")
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    while(true) {
      actor ! "hi"
      Thread.sleep(1000)
    }
  }
}

実行結果 (log/myapp.log)

2016-02-29 00:52:22,236 INFO [mySystem-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger [Slf4jLogger.scala:85] Slf4jLogger started
2016-02-29 00:52:22,271 INFO [mySystem-akka.actor.default-dispatcher-5] q.c.s.MyActor [Slf4jLogger.scala:79] logging... hi, 123
2016-02-29 00:52:23,269 INFO [mySystem-akka.actor.default-dispatcher-5] q.c.s.MyActor [Slf4jLogger.scala:79] logging... hi, 123
Likeボタン(off)0
詳細設定を開く/閉じる
アカウント プロフィール画像

自動化が好きです。

記事の執筆者にステッカーを贈る

有益な情報に対するお礼として、またはコメント欄における質問への返答に対するお礼として、 記事の読者は、執筆者に有料のステッカーを贈ることができます。

>>さらに詳しくステッカーを贈る
ステッカーを贈る コンセプト画像

Feedbacks

Feedbacks コンセプト画像

    ログインするとコメントを投稿できます。

    ログインする

    関連記事

    • Scala 文字列の処理
      書式指定 object Main { def main(args: Array[String]): Unit = { println("%d + %d = %d".format(1, 1, 2)) //=> 1 + 1 = 2 } } 文字列の比較 ヒアドキュメント 他の言語でいう「ヒアドキュメント」のようなものは """ で囲うことで実現できます。 object Main ...
      したくんしたくん5/18/2018に更新
      いいねアイコン画像0
    • Scala 日付に関する処理
      Date クラスを文字列にフォーマット import java.util.Date object Main { def main(args: Array[String]): Unit = { // format は Date に限らない文字列用の機能です。 println("%d-%d-%d" format (1, 1, 1)) //=> 1-1-1 printl...
      したくんしたくん5/5/2018に更新
      いいねアイコン画像0
    • 酢豚の基本的な使い方 (sbt)
      sbt は Scala および Java を主な対象としたビルドツールです。Scala Build Tool の略ではありませんが、Simple Build Tool という明示的な記述も公式ドキュメントなどには見当りません。以下 sbt の基本的な使用例をまとめます。使用した sbt のバージョンは 0.13 です。 公式ドキュメント [sbt 0.13](http://www.scala-sb...
      ねこねこ5/30/2018に更新
      いいねアイコン画像0
    • Scala 関数のサンプルコード
      「デフォルト引数」および「Unit 型を返す関数」 object HelloWorld { def main(args: Array[String]): Unit = { def myPrint(myArg: String = "default_value") = println(myArg + "!") val result = myPrint() //=> defau...
      したくんしたくん5/26/2018に更新
      いいねアイコン画像0
    • Scala 組み込みの制御構造
      if-else 条件分岐で知られる if-else は三項演算子のようにも使用されます。 object HelloWorld { def main(args: Array[String]): Unit = { val myVal = if (!args.isEmpty) args(0) else "default" println(myVal) } ...
      したくんしたくん9/7/2021に更新
      いいねアイコン画像0