This example show how to write a reactive reader for the AWS Simple Queue Service, using Scala and alpakka (respective akka streams).

SQS Basics

SQS is a AWS-managed message queue service. It can contain several queues. If a message is read from the queue, it is internally set to invisible for 30 seconds. If you don’t delete it after these 30 seconds, it becomes visible again. This is great for resilient, distributed microservices: If one instance of a service dies in the middle of handling a message, the message will re-appear and be handled by another instance. It is important though that you do the deletion step last.

This Project

You can find the complete source code on https://github.com/JannikArndt/reactive-sqs.

Imports / build.sbt

Let’s start with adding dependencies to our build.sbt:

name := "reactive-sqs"

scalaVersion := "2.12.7"

version := "1.0"

libraryDependencies ++= Seq(
  // https://mvnrepository.com/artifact/com.lightbend.akka/akka-stream-alpakka-sqs
  "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.20",

  // https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  
  // https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit
  "com.typesafe.akka" %% "akka-testkit" % "2.5.16" % Test,
  // https://mvnrepository.com/artifact/org.scalatest/scalatest
  "org.scalatest" %% "scalatest" % "3.0.5" % Test,
  // https://mvnrepository.com/artifact/org.mockito/mockito-core
  "org.mockito" % "mockito-core" % "2.23.0"
)

Since the com.amazonaws.aws-java-sdk-sqs-library is included in akka-stream-alpakka-sqs, we only need that one. I’m also using scala-logging, and for tests the akka-testkit, scalatest and mockito.

SqsService.scala

Next we’ll write the SqsService.scala. Its job is to create a flow and handle the messages. As parameters I want to give it the queueUrl, a function to handle messages and the maximum amount of messages that are handled in parallel:

object SqsService {

  case class MyMessage(content: String)

  def create(queueUrl: String, maxMessagesInFlight: Int)
            (messageHandler: MyMessage => Unit) = ???
}

I can call this function from my Main.scala:

SqsService.create("http://localhost:4576/queue/myqueue", 20) { message =>
      println(s"Doing logic with ${message.content}")
    }

Since alpakka is running on akka streams, I will also have to provide an ActorSystem(). The return value of my create function is dictated by akka streams: a tuple of a KillSwitch and a Future[Done]. These enable me to stop the stream and wait for its completion. So the complete Main is

import akka.Done
import akka.actor.ActorSystem
import akka.stream._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.io.StdIn
import scala.language.postfixOps

object Main extends App {

  implicit val system = ActorSystem()

  val (killSwitch, completion): (KillSwitch, Future[Done]) =
    SqsService.create("http://localhost:4576/queue/myqueue", 20) { message =>
      println(s"Doing logic with ${message.content}")
    }

  println(s"Running service. Press enter to stop.")
  StdIn.readLine()

  killSwitch.shutdown()
  Await.ready(completion, 10 seconds)
  SqsService.stop()
  system.terminate()
}

Creating an SqsClient

The first thing our SqsService needs to do is create an AmazonSQSAsyncClient. The last thing it needs to do is to shut it down — otherwise it won’t let you exit the program:

object SqsService extends StrictLogging {

  case class MyMessage(content: String)

  implicit private val sqsClient: AmazonSQSAsync =
    AmazonSQSAsyncClientBuilder
      .standard()
      .withRegion("eu-central-1")
      .build()

  def stop(): Unit = sqsClient.shutdown()

  def create(queueUrl: String, maxMessagesInFlight: Int)
            (messageHandler: MyMessage => Unit) = ???
}

Creating the Flow

Next, we’ll implement the create function to create a Flow. The basic stream is build by this:

source
  .via(flow)
  .toMat(sink)(Keep.both)
  .run()

As source we’ll use

SqsSource(queueUrl).viaMat(KillSwitches.single)(Keep.right)

This combines a SqsSource that emits sqs.model.Messages with a KillSwitch and makes sure the KillSwitch is returned. The return type is Source[Message, UniqueKillSwitch].

As sink we use SqsAckSink(queueUrl, SqsAckSinkSettings(maxMessagesInFlight)). It needs to receive the queueUrl because that’s where it sends the delete-commands to delete/acknowledge the currently invisible message.

The Flow is created in two steps: Step one creates a flow from a function and defines attributes:

val flow = Flow
  .fromFunction(handleMessage(messageHandler))
  .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))

The supervisionStrategy decides what happens if an exception is thrown inside the flow. The standard strategy is to complete the stream with failure, i.e. one bad message will crash the entire program. The resumingDecider simply ignores elements that result in exceptions. This means you need to implement error handling inside the messageHandler.

The function itself reads the body from the message and hands it to the messageHandler we defined in the Main. It then returns a tuple of the original message and a delete-action. This is the input needed by the SqsAckSink:

private def handleMessage(messageHandler: MyMessage => Unit) = { message: Message =>
    messageHandler(MyMessage(message.getBody))
    (message, MessageAction.Delete)
  }

Now there’s only one thing left: The run() part, where the stream is materialized, needs an ActorMaterializer:

implicit val mat: ActorMaterializer = ActorMaterializer()

The complete code is

object SqsService extends StrictLogging {

  case class MyMessage(content: String)

  implicit private val sqsClient: AmazonSQSAsync =
    AmazonSQSAsyncClientBuilder
      .standard()
      .withRegion("eu-central-1")
      .build()

  def stop(): Unit = sqsClient.shutdown()

  def create(queueUrl: String, maxMessagesInFlight: Int)
            (messageHandler: MyMessage => Unit)
            (implicit system: ActorSystem): (KillSwitch, Future[Done]) = {

    implicit val mat: ActorMaterializer = ActorMaterializer()

    val source = SqsSource(queueUrl).viaMat(KillSwitches.single)(Keep.right)
    val sink = SqsAckSink(queueUrl, SqsAckSinkSettings(maxMessagesInFlight))
    val flow = Flow
      .fromFunction(handleMessage(messageHandler))
      .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))

    source
      .via(flow)
      .toMat(sink)(Keep.both)
      .run()
  }

  private def handleMessage(messageHandler: MyMessage => Unit) = { message: Message =>
    messageHandler(MyMessage(message.getBody))
    (message, MessageAction.Delete)
  }
}

Bonus Functions

The AmazonSqsClient has one big caveat: it does not fail if the queue you’re subscribing to doesn’t exist. That’s why I wrote two extra functions:

import scala.collection.JavaConverters._

def findAvailableQueues(queueNamePrefix: String): Seq[String] =
    sqsClient.listQueues(queueNamePrefix).getQueueUrls.asScala.toVector

This is just a Scala-wrapper around the library function. The second function calls a library function that does fail if the queue doesn’t exist:

def assertQueueExists(queueUrl: String): Unit =
  try {
    sqsClient.getQueueAttributes(queueUrl, Seq("All").asJava)
    logger.info(s"Queue at $queueUrl found.")
  } catch {
    case queueDoesNotExistException: QueueDoesNotExistException =>
      logger.error(s"The queue with url $queueUrl does not exist.")
      throw queueDoesNotExistException
  }

The advantage of assertQueueExists over checking if the url is contained in the list of all available queues is, that you don’t need the permission to list all queues.

Testing

Testing our SqsService has three challenges: It is using an AWS service, it is running asynchronously and message queues have a live of their own.

Mocking AWS SQS

Luckily, others have had the need to test AWS services as well, and created Localstack. It provides a Docker image that runs these services locally:

$ docker run -d --env SERVICES="sqs" --env TMPDIR="/tmp" \
    --name "localstack" \
    --publish 4576:4576 \
    --rm localstack/localstack

You list the services you want to use in the SERVICES variable and expose their respective port (--publish). The container is started in detached mode (-d) and cleans up after it is removed or the daemon exits (--rm).

If you want to access the Localstack-version of a service via the aws cli, you can use the --endpoint-url:

$ aws --endpoint-url=http://localhost:4576 sqs send-message\
    --queue-url "http://localhost:4576/queue/myqueue"\
    --message-body "Hallo"

Mocking the Function

Our test will basically be “is this function called if a message arrives in SQS?”. For this we need

  • the Localstack-version of SQS running
  • a seperate SQSClient
  • a queue dedicated for this test
  • a way to test if a function gets called
  • a way to wait for the round trip (test => Sqs => tested code => test)

We use the AWS client library thats included in alpakka to create a client in our test class:

val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder
  .standard()
  .withEndpointConfiguration(new EndpointConfiguration("http://localhost:4576", "eu-central-1"))
  .build()

Note that the endpoint connects to the Localstack-version running in docker.

Next, we’ll create a queue:

val queueUrl: String = awsSqsClient.createQueue("integrationtest").getQueueUrl

Since we’re running on akka, with it’s own ExecutionContext, and the SqsClient has it’s own ExecutionContext as well, we should terminate both when the tests are done:

override def afterAll(): Unit = {
  awsSqsClient.shutdown()
  shutdown(system)
  super.afterAll()
}

We’ll use Mockito to verify that the function we’ll give to our SqsService is actually called. Mockito needs a class to create a mock, so:

class TestClass {
  def testFunction(message: MyMessage): Unit = Unit
}

val mock: TestClass = mock[TestClass]

Since we’re mocking the testFunction, we don’t need to implement it.

It is good practice to put any values that appear in the test input as well as the output into a variable, so you can easily spot the expectation:

val messageBody = "Example Body"

Now we’re ready to write the test!

"SqsService" should "receive a message" in {

  // Arrange
  SqsService.create(queueUrl, maxMessagesInFlight = 20)(mock.testFunction)
  
  // Act: Send message to SQS (synchronous)
  awsSqsClient.sendMessage(queueUrl, messageBody)
  
  // Assert
  verify(mock, Mockito.timeout(1000)).testFunction(MyMessage(messageBody))
}

The Mockito.timeout(1000) will wait a second for the result. Make sure to use timeout instead of after, because with timeout the test succeeds directly when the function is called, while after waits the full second.

Dealing with Message Queues

Now the tests will mostly work. However, since it depends on an outside component, namely the SQS, it will fail, from time to time.

[info] MainSpec:
[info] SqsService
[info] - should receive a message *** FAILED ***
[info]   org.mockito.exceptions.verification.WantedButNotInvoked: Wanted but not invoked:
[info] testClass.testFunction(
[info]     MyMessage(Example Body)
[info] );
[info] -> at MainSpec.$anonfun$new$1(MainSpec.scala:64)

It might also fail because the round trip takes longer on you CI-server or you broke the code. Try setting the timeout to around 100 ms to replicate the behavior. Now what’s really bad is that your next test will fail as well:

[info] MainSpec:
[info] SqsService
[info] - should receive a message *** FAILED ***
[info]   org.mockito.exceptions.verification.TooManyActualInvocations: testClass.testFunction(
[info]     MyMessage(Example Body)
[info] );
[info] Wanted 1 time:
[info] -> at MainSpec.$anonfun$new$1(MainSpec.scala:64)
[info] But was 2 times:
[info] -> at MainSpec.$anonfun$new$2(MainSpec.scala:58)
[info] -> at MainSpec.$anonfun$new$2(MainSpec.scala:58)

That’s because the message from the previous test is still in the queue! Luckily, this can be worked around with BeforeAndAfterEach:

var queueUrl: String = ""

override def beforeEach(): Unit = {
  queueUrl = awsSqsClient.createQueue("integrationtest").getQueueUrl
  println("--- Created queue ---")
  super.beforeEach()
}

override def afterEach(): Unit = {
  awsSqsClient.deleteQueue(queueUrl)
  println("--- Deleted queue ---")
  super.afterEach()
}

This way, one failed test won’t affect the next.