My Profile Photo

Daniel Perez


software engineer interested in programming languages and functional programming


Getting started with Scala's akka actors

I’ve been looking for a sample code to get started with Akka actors, but after looking around a little, the only non trivial example I found was the akka 1.3 Getting started page.

I tried to adapt this to the latest release of akka (2.4 while I’m writing this), so here are the main changes. By the way, I’m using Scala 2.10.1 version.

Disclaimer

I’m only starting with akka library, and I’m sure there are some more elegant and efficient ways to do that. That’s pretty much try to at least get things working.

Imports

The imports themselves have more or less completely changed, so it’s better not to start trying to go with the original version. All we’ll be needing is

import akka.actor._
import akka.routing._

and if you don’t want to import stuff you won’t be using, you can use

import akka.actor.{Actor, PoisonPill, Props, ActorSystem}
import akka.routing.{RoundRobinRouter, Broadcast}

The code

Worker class

The main change here is that reply is no longer a method of the Actor trait, and therefore to reply we have to use the implicit sender object.

  class Worker extends Actor {
    def receive = {
      case Work(start, nbOfElements) =>
      sender ! Result(calculatePiFor(start, start + nbOfElements))
    }

    def calculatePiFor(start: Int, stop: Int): Double = {
      (start until stop) map { i =>
        4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
      } sum
    }
  }

The router

The router has been simplified (at least for basic usage), and there is no need to create the actors, it can be automatically handled. The code for the router becomes simply

val router = context.actorOf(
  Props[Worker].withRouter(RoundRobinRouter(nbOfWorkers))
)

where nbOfWorkers is the number of Worker actors which will be supervised by the router. I chose RoundRobinRouter here because it was what seemed to be the closest to the CyclicIterator in the original code.

Message handler

The message handling part in the Master is pretty much the same, but the stop method is not part of the Actor trait anymore. In 2.x versions, the trait ActorContext seems to be responsible for that job, so to stop the actor, the code becomes

if(nbOfResults == nbOfMessages) {
  context.stop()
}

however, as we don’t really need any thing more once the master has done his job, I chose to stop the system the master belongs to itself by using

if(nbOfResults == nbOfMessages) {
  context.system.shutdown()
}

though I’m not really sure if that’s the right way to do it.

Full code

I finally ended up with a code like this:

import akka.actor.{Actor, PoisonPill, Props, ActorSystem}
import akka.routing.{RoundRobinRouter, Broadcast}

object Main {

  def main(args: Array[String]): Unit = {
    calculate(nbOfWorkers = 4, nbOfElements = 10000, nbOfMessages = 10000)
  }

  sealed trait PiMessage

  case object Calculate extends PiMessage

  case class Work(start: Int, nbOfElements: Int) extends PiMessage

  case class Result(value: Double) extends PiMessage


  class Worker extends Actor {
    def receive = {
      case Work(start, nbOfElements) =>
      sender ! Result(calculatePiFor(start, start + nbOfElements))
    }

    def calculatePiFor(start: Int, stop: Int): Double = {
      (start until stop) map { i =>
        4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
      } sum
    }
  }

  class Master (
    nbOfWorkers: Int,
    nbOfMessages: Int,
    nbOfElements: Int
  ) extends Actor {

    var pi: Double = 0.0
    var nbOfResults: Int = 0
    var start: Long = 0

    val router = context.actorOf(
      Props[Worker].withRouter(RoundRobinRouter(nbOfWorkers))
    )

    def receive = {
      case Calculate => {
        for (i <- 0 until nbOfMessages) {
          router ! Work(i * nbOfElements, nbOfElements)
        }
      }

      case Result(value) => {
        pi += value
        nbOfResults += 1
        if(nbOfResults == nbOfMessages) {
          context.system.shutdown()
        }
      }
    }

    override def preStart() {
      start = System.currentTimeMillis
    }

    override def postStop() {
      println(
        "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
        .format(pi, System.currentTimeMillis - start))
    }
  }


  def calculate(nbOfWorkers: Int, nbOfElements: Int, nbOfMessages: Int) {
    val system = ActorSystem("PiCalculator")

    val master = system.actorOf(Props(
      new Master(nbOfWorkers, nbOfMessages, nbOfElements)))

    master ! Calculate
  }
}

I used sbt to download the akka dependencies for me with this build.sbt

name := "Pi"

version := "1.0"

scalaVersion := "2.10.3"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.1.4"

scalacOptions ++= Seq("-feature", "-language:postfixOps")

and I compiled and ran the program.

-> % sbt
[info] Set current project to Pi (in build file:/home/daniel/tmp/tutorial/)
run
[info] Updating {file:/home/daniel/tmp/abc/}abc...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 1 Scala source to /home/daniel/tmp/abc/target/scala-2.10/classes...
[info] Running Main

	Pi estimate: 		3.1415926435897887
	Calculation time: 	1002 millis
[success] Total time: 6 s, completed Sep 29, 2014 9:28:22 PM

It’s not the best approximation of π I’ve seen, but well at least everything seems to be working fine.

comments powered by Disqus