RestartFlow – Retrying stage in Akka Streams

RestartFlow - Retrying failed stream stage in Akka Streams

RestartFlow – Retrying failed stream stage in Akka Streams

If anything is certain in this world, it is that everything can fail, especially data processing.  Optimistic path happens usually more often, but failing someday, somewhere is rather inevitable. But we should never give up. At least not on the first try. A good practise is ‘hope for the best, prepare for the worst’. Having that in mind we would like to have a way of retrying failed stream processing stage. Probably with an increasing time gap between two retries. This approach is  called exponential backoff strategy. I will show you two different solutions and one of them will include RestartFlow mechanism.

SoftwareMill’s Retry

Firstly, we can use  retry library. You can checkout latest version from https://github.com/softwaremill/retry. Usage is very simple:

import dispatch.Defaults._
import dispatch._

import scala.concurrent.Future
import scala.concurrent.duration._

val maxRetries: Int = 10
val initialDelay: Duration = 5.seconds
val delayBumpFactor: Int = 2

def processWithBackoff = {
  retry.Backoff(maxRetries, initialDelay, delayBumpFactor) { () =>
    Future {
      // something that can "fail"
    }
  }
}

Then, we can use it in Akka Stream. Internally this step will perform 10 retries with time gaps between them starting from 5 seconds, then 10 seconds and so on:

val parallelism: Int = 2

Source
  .from[...]
  .mapAsync(parallelism)(//some processing)
  .mapAsync(parallelism)(_ => processWithBackoff)
  .mapAsync(parallelism)(//some other processing)
  .runWith(Sink.ignore)

Akka Stream’s RestartFlow

Can we do it better? Sure we can. Since Akka 2.5.4 there is a feature available, that handles all of the above clunky boilerplate code. It’s named RestartFlow and has similarly named companions for  Source and Sink . To read more about these new functionalities jump into Akka’s documentation here: https://doc.akka.io/docs/akka/2.5.12/stream/stream-error.html?language=scala#delayed-restarts-with-a-backoff-stage. You can see there  RestartSource example. For now we will focus on  RestartFlow . Let’s say we have some basic flow, that we want to wrap into  RestartFlow logic. What is more, we don’t want to change input and output types. Here it is how we can achieve this:

import akka.NotUsed
import akka.stream.scaladsl.{Flow, RestartFlow}

val initialDelay: Duration = 5.seconds
val maxDelay: Duration = 2560.seconds
val randomFactor: Double = 0.2
val maxRetries: Int = 10

val basicFlow: Flow[T, Unit, NotUsed] = Flow[T].map( t => ())

val repeateableFlow: Flow[T, Unit, NotUsed] =
  RestartFlow.onFailuresWithBackoff(initialDelay, maxBackoff, randomFactor, maxRetries)(() => basicFlow)

Notice that types of both basic and repeateable flows are the same. That means we can seamlessly use the flow with restarting where signature of the basic flow is expected:

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Sink

Source
  .from[...]
  .via(repeateableFlow)
  .runWith(Sink.ignore)

Finally, will it work? Well, actually no… 🙂 On first failure it will produce log like that:

[ERROR] [03/20/2018 16:13:59.167] [default-akka.actor.default-dispatcher-2] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure

As I write that post there is a confirmed bug that makes  RestartFlow unusable: https://stow.com/a/49391285/1969391, https://github.com/issues/24726, which has been fixed for now and is planned to be released with milestone 2.5.13. Progress of releasing that Akka version can be tracked here: https://github.com/milestone/129. At the moment there are only two issues left. Unfortunately, they set no due date. Eventually, when needed work is done I will update that post accordingly.