How easily can you cause OutOfMemoryException?

dam

OutOfMemoryException despite GC

That post will be rather short note than fully featured post.  In few words I will show you (and warn you) how easily we can cause OutOfMemoryException despite GC doing its great job. Example comes from my short struggle with the said issue.

Background

Few months ago I was indexing into Elasticsearch over 16 millions XML documents containing geospatial data. It involved great library elastic4s (which I personally recommend when interacting with ES from Scala) and Observable from Monix.

Code

Main for-comprehension included three steps: creating backup of the index, indexing data from scratch and cleaning afterwards. It was something like that:

val geoFetcherObs: Observable[File] =
        GeoFetcher.fetchGeoDataObs(tempDataPath, fetchingParallelism)

for {
  _       <- createBackup(client)
  _       <- processData(client, geoFetcherObs).toListL
  result  <- realiasAndCleanup(client)
} yield result

The type returned by the second line was somewhat complicated (and actually crucial): List[Either[Throwable, RequestSuccess[BulkResponse]]].

That type was originating from processData function. Its body was as follows:

def processData(client: EsClient, fetcherObservable: Observable[File]):
    Observable[Either[Throwable, RequestSuccess[BulkResponse]]] = {
      fetcherObservable
        .transform(extractZipFiles)
        .transform(parseXmls)
        .transform(toIndexable)
        .bufferIntrospective(conf.batchSize)
        .transform(indexBatch(client))
  }

As you can see the whole stream and stages of processing involved:
– getting zip file from stream,
– deflating it to separate XML files,
– parsing XML files,
– converting parsed data to an indexable form,
– dividing into batches,
– finally – indexing batch of data into an Elasticsearch index.

Outcome

At first, after starting stream, everything looked great. Files with geospatial data were downloaded and extracted. Then, after XML parsing, thousands of documents were landing safely in the index.  Well, until OutOfMemoryException happened…

Applying many potential solutions (which I will save you from) didn’t work at all. Then, next logical step was looking into JVM and heap state during application run. In such cases VisualVM is your best friend. In order to make my application successfully finished I needed 64GB of RAM which was screaming symptom there was something really wrong. Here is screenshot from VisualVM (you can click it to see it bigger):

VisualVM screenshot - before fix
VisualVM screenshot – before fix

As you can see CPU usage was acceptable but Garbage Collector was struggling to restore as much heap as it could. In fact, in the end of application run there was 20GB of heap space that couldn’t be released at all. Maybe some memory leak? Maybe not in my code but in elastic4s library? (Yeah, for sure… ( ?° ?? ?°))

After some more spent time it suddenly hit me what I was accumulating. Do you still remember that .toListL from first code snippet producing List[Either[Throwable, RequestSuccess[BulkResponse]]]? Well, that list contained statuses of successful indexing for roughly 16 millions documents and occupied … 20GB of RAM memory.

Problem solved

Finally, when the (shameful) problem was located I realized I didn’t need those statuses at all because I didn’t do anything with them. All I needed was just waiting up to indexing end. The solution couldn’t be easier. I just changed .toListL  to .completedLwhich, in for-comprehension, returns just Unit . And that’s really all. VisualVM screenshot after applying fix (again you can click it to see it in bigger resolution):

VisualVM screenshot - after fix
VisualVM screenshot – after fix

What you can see is 128MB of RAM  being perfectly enough for the whole application lifespan. It’s 512 times less than initial 64GB!

Summary

Despite Garbage Collector existing inside JVM and doing its dirty job you can still hurt yourself and cause OutOfMemoryException. In my case GC could not release heap occupied by indexing statuses because I explicitly requested to get all of them. It happened when I inattentively invoked .toListL. Beware of such practices, even in JVM world. Good luck! 🙂

 

 

 

Mockito’s ArgumentCaptor meets AsyncWordSpec

ArgumentCaptor

ArgumentCaptor – ask what is passed by

ArgumentCaptor is a nice Mockito feature that can be used to check and validate arguments passed by inside tested portion of code. With that feature we can see what happens internally – we can check ‘what’s inside the box.’

Background

Let’s suppose you have several kinds of messages you are passing by in your system and you want to persist corresponding types of event in something we would call an event log. That will be our model for Event:

sealed trait Event

case object EventA        extends Event
case object EventB        extends Event
case object EventC        extends Event
case object EventUnknown  extends Event

We also have conversion from messages into Event types:

object EventCreator {
  def fromMessage: String => Event = {
    case "messageA" => EventA
    case "messageB" => EventB
    case "messageC" => EventC
    case _          => EventUnknown
  }
}

So whenever somewhere you receive “messageA” statement you want to store corresponding event. As it was said before – events will be stored in the event log:

trait EventRepository {
  def save(event: Event): Future[Unit]
}

Use case

We have a trait defined that describes EventPersistorbehavior. An implementation should – for given String message – persist appropiate Event in the event log:

import scala.concurrent.Future

trait EventPersistor {
  def persistFor(message: String): Future[Unit]
}

Let us provide example implementation. We will use the conversion from EventCreator:

import [...].EventCreator.fromMessage

import scala.concurrent.Future

class EventPersistorImpl(eventRepository: EventRepository) extends EventPersistor {
  override def persistFor(message: String): Future[Unit] = {
    val event = fromMessage(message)
    eventRepository.save(event)
  }
}

As you can see we create event in val event = fromMessage(message) and persist in eventRepository.save(event). Those two lines, especially one with the save invocation, are crucial as we hope that correct event will be created and then passed by to function. Still nothing complicated happens here.

Testing

Using AsyncWordSpecwe can easily test whether our implementation really works. First of all, we have to mock our event log. Mockito smoothly help us:

private val mockedEventRepo = mock[EventRepository]
when(mockedEventRepo.save(any[Event]))
  .thenReturn(Future.successful())

Then, we create an instance of our implementation:

private val testedEventPersistorImpl = new EventPersistorImpl(mockedEventRepo)

Actual testing would look like:

testedEventPersistorImpl
  .persistFor("messageA")
  .map { result =>
    result shouldBe (())
}

Is that’s all? Well, not really… Cautious reader should notice that if above test passes it will only mean some kind of Event has been successfully saved. But we have no clue what exactly type of event was internally persisted. Actually, above test will pass for any message converted to any  Event. That’s definitely unwanted feature.

Star of the evening

Here ArgumentCaptor comes to the rescue. Firstly, we create instance of captor:

private val captor: ArgumentCaptor[Event] = ArgumentCaptor.forClass(classOf[Event])

Then, with a helper function:

private def checkPassedEvent(message: String, expectedEvent: Event) = {
    testedEventPersistorImpl
      .persistFor(message)
      .map { result =>
        result shouldBe (())
        verify(mockedEventRepo, atLeastOnce)
          .save(captor.capture())
        val passedEvent = captor.getValue
        passedEvent shouldBe expectedEvent
      }
}

we can finally test that our implementation really persists correct events:

checkPassedEvent(message = "messageA",    expectedEvent = EventA)
checkPassedEvent(message = "messageB",    expectedEvent = EventB)
checkPassedEvent(message = "messageC",    expectedEvent = EventC)
checkPassedEvent(message = "someMessage", expectedEvent = EventUnknown)

Reference

I provided you a lot of code snippets and not everything may be obvious at first reading. In case something is not clear enough or still doesn’t work you can get complete above example here: https://github.com/eltherion/mockito-argumentcaptor. Fork/clone it in order to play around and understand.

RestartFlow – Retrying stage in Akka Streams

RestartFlow - Retrying failed stream stage in Akka Streams

RestartFlow – Retrying failed stream stage in Akka Streams

If anything is certain in this world, it is that everything can fail, especially data processing.  Optimistic path happens usually more often, but failing someday, somewhere is rather inevitable. But we should never give up. At least not on the first try. A good practise is ‘hope for the best, prepare for the worst’. Having that in mind we would like to have a way of retrying failed stream processing stage. Probably with an increasing time gap between two retries. This approach is  called exponential backoff strategy. I will show you two different solutions and one of them will include RestartFlow mechanism.

SoftwareMill’s Retry

Firstly, we can use  retry library. You can checkout latest version from https://github.com/softwaremill/retry. Usage is very simple:

import dispatch.Defaults._
import dispatch._

import scala.concurrent.Future
import scala.concurrent.duration._

val maxRetries: Int = 10
val initialDelay: Duration = 5.seconds
val delayBumpFactor: Int = 2

def processWithBackoff = {
  retry.Backoff(maxRetries, initialDelay, delayBumpFactor) { () =>
    Future {
      // something that can "fail"
    }
  }
}

Then, we can use it in Akka Stream. Internally this step will perform 10 retries with time gaps between them starting from 5 seconds, then 10 seconds and so on:

val parallelism: Int = 2

Source
  .from[...]
  .mapAsync(parallelism)(//some processing)
  .mapAsync(parallelism)(_ => processWithBackoff)
  .mapAsync(parallelism)(//some other processing)
  .runWith(Sink.ignore)

Akka Stream’s RestartFlow

Can we do it better? Sure we can. Since Akka 2.5.4 there is a feature available, that handles all of the above clunky boilerplate code. It’s named RestartFlow and has similarly named companions for  Source and Sink . To read more about these new functionalities jump into Akka’s documentation here: https://doc.akka.io/docs/akka/2.5.12/stream/stream-error.html?language=scala#delayed-restarts-with-a-backoff-stage. You can see there  RestartSource example. For now we will focus on  RestartFlow . Let’s say we have some basic flow, that we want to wrap into  RestartFlow logic. What is more, we don’t want to change input and output types. Here it is how we can achieve this:

import akka.NotUsed
import akka.stream.scaladsl.{Flow, RestartFlow}

val initialDelay: Duration = 5.seconds
val maxDelay: Duration = 2560.seconds
val randomFactor: Double = 0.2
val maxRetries: Int = 10

val basicFlow: Flow[T, Unit, NotUsed] = Flow[T].map( t => ())

val repeateableFlow: Flow[T, Unit, NotUsed] =
  RestartFlow.onFailuresWithBackoff(initialDelay, maxBackoff, randomFactor, maxRetries)(() => basicFlow)

Notice that types of both basic and repeateable flows are the same. That means we can seamlessly use the flow with restarting where signature of the basic flow is expected:

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Sink

Source
  .from[...]
  .via(repeateableFlow)
  .runWith(Sink.ignore)

Finally, will it work? Well, actually no… 🙂 On first failure it will produce log like that:

[ERROR] [03/20/2018 16:13:59.167] [default-akka.actor.default-dispatcher-2] [RestartWithBackoffFlow(akka://default)] Restarting graph due to failure

As I write that post there is a confirmed bug that makes  RestartFlow unusable: https://stow.com/a/49391285/1969391, https://github.com/issues/24726, which has been fixed for now and is planned to be released with milestone 2.5.13. Progress of releasing that Akka version can be tracked here: https://github.com/milestone/129. At the moment there are only two issues left. Unfortunately, they set no due date. Eventually, when needed work is done I will update that post accordingly.