Sunday, July 19, 2015

The Need for Acknowledgement in Streams

While working on op-rabbit Akka Streams integration, I had the need to know when a piece of data had completed its flight through a stream, where completion could be defined as the following:

  • The source element was processed by the sink.
  • The element was eliminated by a filter, mapConcat -> 0, or collect.
  • In the case an element was split into multiple sub-elements via mapConcat, all resulting sub-elements were completed.
  • In the case many elements were grouped together via grouped, groupedWithin, etc., the resulting group completed.
  • In the case an element is broadcast over n channels, then similarly, all resulting copies of the element were handled, and their downstream results.

Error signaling is important, too. If an element fails in any give stage of the stream, then the error should be propagated up through the acknowledgement channel.

My original implementation of this idea was to simply create a stream whose contents were always a tuple of a Promise and the element. Then, it would be the responsibility of the programmer working with the stream to handle acknowledgements. However, this approach ran counter to the philosophy of "model valid states using the type system"; it was too easy to make a mistake and not fork a promise, collect promises together, or altogether not acknowledge the promise (because it was carelessly filtered from the stream). Also, mixing the concern of acknowledgement with the concern of processing the stream produced some tremendously difficult-to-read code, even with a special helper. An example:


As you can see, it reads awful. And, worse, it's still incredibly error prone.

I wrestled with this: is acknowledgment needed? Without acknowledgement, then there is no hope for retrying messages that failed because of simple chaos. If the process crashes, or the stream is not brought down gracefully, then the messages in-flight are lost. In most cases, this is probably acceptable. But, in others, it's not. I wanted to see if I could do better.

At first, I decided it would be best to create a new class of stream components, based on Akka Streams, called AckedSource, AckedFlow, and AckedSink. The justification is as follows:

  • The requirement of automatic message acknowledgement necessarily implies a reduced set of out-of-the-box operations at your disposal. For example, it is inherently impossible, from the outside, to correlate messages emitted via scan and transform operations with the upstream elements that went into producing that output.
  • It would be too easy to accidentally hook an AckedStream to a normal Akka Streams Sink, or a normal Akka Streams Flow. This would result in leaky acknowledgement. Without a separate class of streams, the compiler would not know to yell at you for making this mistake.
  • Modeling message acknowledgement from head-to-tail provides similar watertight properties as does Future. A Scala Future, when constructed via a thunk, must complete, or must error. The only way to make a Future not complete is to complete it with another Future that doesn't complete (one that was created via a Promise), or actually have it run forever (in which case, never completing is the appropriate course of action). Because you cannot run an AckedSource or AckedFlow until it's connected into an AckedSink, and all stream operations are restricted to those that can guarantee watertight acknowledgement, it is impossible to have an element be processed and not acknowledged (or, "nacked" due to an error). Of course, this is operating on the assumption that AckedSource, AckedFlow and AckedSink do their job correctly.

Here is the above code modified to use AckedStream


All of the original functionality was retained, and the code is much simpler, easier to read. Naturally, the complexity is pushed elsewhere (to AckedStream), but that's just good separation of concerns.

FanOut / FanIn Graphs

AckedSource, AckedFlow, and AckedSink are a good start; but how do we handle the more complex 1-n or n-1 element routing scenarios provided by Akka Streams FlowGraph? There is a lot of rich functionality here, allowing composable disconnected stream fragments to be assembled into a closed graph.

Again, I considered, "do we press this pattern onward? Or, do we opt for generic types? Are we pressing it too far and creating too much complexity?" I was divided: proceeding seemed risky, and I could end up with significant lost time and, afterward, a terrible mess of complexity worse the original problem. I convinced myself it was worth it for the following reasons:

  • The provided Akka Stream Broadcast component is incompatible with message acknowledgement; combining an AckedSource with this component would be an error.
  • Combining an AckedSource to an Akka Flow or Sink is also an error.
  • In an Akka FlowGraph, when you combine a Source with a Flow, you get a FlowGraph.Implicits
    PortOps
    . PortOps implements FlowOps and provides stream manipulation operations that are incompatible with an acknowledged Flow; using them is error-prone.

Continuing to use the delegator pattern, I've created an implementation of AckedFlowGraph that supports a subset of the features. Here's some code using it:


Conclusion

As of this writing, the AckedStream is contained within the op-rabbit akka-stream integration, and can be found here, with examples found in the tests. The code is general enough that I will likely end up extracting it into its own library, although I presently have no need for it outside of the context of using RabbitMQ.

Some of this code may make some generalists frown. I consistently fought, along the way, a feeling inside that I was creating too many types. However, because of the aforementioned reasons, I feel the complexity is warranted because, fundamentally, acknowledged streams are different from non-acknowledged streams: their operations have different behavior, their supported operation lists don't overlap completely, and haphazardly intermixing acknowledged streams and non-acknowledged streams will cause issues.