RestartFlow – Retrying failed stream stage in Akka Streams
If anything is certain in this world, it is that everything can fail, especially data processing. Optimistic path happens usually more often, but failing someday, somewhere is rather inevitable. But we should never give up. At least not on the first try. A good practise is ‘hope for the best, prepare for the worst’. Having that in mind we would like to have a way of retrying failed stream processing stage. Probably with an increasing time gap between two retries. This approach is called exponential backoff strategy. I will show you two different solutions and one of them will include RestartFlow mechanism.
SoftwareMill’s Retry
Firstly, we can use retry library. You can checkout latest version from https://github.com/softwaremill/retry. Usage is very simple:
import dispatch.Defaults._ import dispatch._ import scala.concurrent.Future import scala.concurrent.duration._ val maxRetries: Int = 10 val initialDelay: Duration = 5.seconds val delayBumpFactor: Int = 2 def processWithBackoff = { retry.Backoff(maxRetries, initialDelay, delayBumpFactor) { () => Future { // something that can "fail" } } }
Then, we can use it in Akka Stream. Internally this step will perform 10 retries with time gaps between them starting from 5 seconds, then 10 seconds and so on:
val parallelism: Int = 2 Source .from[...] .mapAsync(parallelism)(//some processing) .mapAsync(parallelism)(_ => processWithBackoff) .mapAsync(parallelism)(//some other processing) .runWith(Sink.ignore)
Akka Stream’s RestartFlow
Can we do it better? Sure we can. Since Akka 2.5.4 there is a feature available, that handles all of the above clunky boilerplate code. It’s named RestartFlow
and has similarly named companions for Source
and Sink
. To read more about these new functionalities jump into Akka’s documentation here: https://doc.akka.io/docs/akka/2.5.12/stream/stream-error.html?language=scala#delayed-restarts-with-a-backoff-stage. You can see there RestartSource
example. For now we will focus on RestartFlow
. Let’s say we have some basic flow, that we want to wrap into RestartFlow
logic. What is more, we don’t want to change input and output types. Here it is how we can achieve this:
import akka.NotUsed import akka.stream.scaladsl.{Flow, RestartFlow} val initialDelay: Duration = 5.seconds val maxDelay: Duration = 2560.seconds val randomFactor: Double = 0.2 val maxRetries: Int = 10 val basicFlow: Flow[T, Unit, NotUsed] = Flow[T].map( t => ()) val repeateableFlow: Flow[T, Unit, NotUsed] = RestartFlow.onFailuresWithBackoff(initialDelay, maxBackoff, randomFactor, maxRetries)(() => basicFlow)
Notice that types of both basic and repeateable flows are the same. That means we can seamlessly use the flow with restarting where signature of the basic flow is expected:
import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl.Sink Source .from[...] .via(repeateableFlow) .runWith(Sink.ignore)
Finally, will it work? Well, actually no… 🙂 On first failure it will produce log like that:
[ERROR] [03/20/2018 16:13:59.167] [default-akka.actor.default-dispatcher-2] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure
As I write that post there is a confirmed bug that makes RestartFlow
unusable: https://stow.com/a/49391285/1969391, https://github.com/issues/24726, which has been fixed for now and is planned to be released with milestone 2.5.13. Progress of releasing that Akka version can be tracked here: https://github.com/milestone/129. At the moment there are only two issues left. Unfortunately, they set no due date. Eventually, when needed work is done I will update that post accordingly.