- The source element was processed by the sink.
- The element was eliminated by a
mapConcat -> 0, or
- In the case an element was split into multiple sub-elements via
mapConcat, all resulting sub-elements were completed.
- In the case many elements were grouped together via
groupedWithin, etc., the resulting group completed.
- In the case an element is broadcast over
nchannels, then similarly, all resulting copies of the element were handled, and their downstream results.
Error signaling is important, too. If an element fails in any give stage of the stream, then the error should be propagated up through the acknowledgement channel.
My original implementation of this idea was to simply create a stream whose contents were always a tuple of a
Promise and the element. Then, it would be the responsibility of the programmer working with the stream to handle acknowledgements. However, this approach ran counter to the philosophy of "model valid states using the type system"; it was too easy to make a mistake and not fork a promise, collect promises together, or altogether not acknowledge the promise (because it was carelessly filtered from the stream). Also, mixing the concern of acknowledgement with the concern of processing the stream produced some tremendously difficult-to-read code, even with a special helper. An example:
As you can see, it reads awful. And, worse, it's still incredibly error prone.
I wrestled with this: is acknowledgment needed? Without acknowledgement, then there is no hope for retrying messages that failed because of simple chaos. If the process crashes, or the stream is not brought down gracefully, then the messages in-flight are lost. In most cases, this is probably acceptable. But, in others, it's not. I wanted to see if I could do better.
At first, I decided it would be best to create a new class of stream components, based on Akka Streams, called
AckedSink. The justification is as follows:
- The requirement of automatic message acknowledgement necessarily implies a reduced set of out-of-the-box operations at your disposal. For example, it is inherently impossible, from the outside, to correlate messages emitted via
transformoperations with the upstream elements that went into producing that output.
- It would be too easy to accidentally hook an
AckedStreamto a normal Akka Streams
Sink, or a normal Akka Streams
Flow. This would result in leaky acknowledgement. Without a separate class of streams, the compiler would not know to yell at you for making this mistake.
- Modeling message acknowledgement from head-to-tail provides similar watertight properties as does
Future. A Scala
Future, when constructed via a thunk, must complete, or must error. The only way to make a
Futurenot complete is to complete it with another Future that doesn't complete (one that was created via a
Promise), or actually have it run forever (in which case, never completing is the appropriate course of action). Because you cannot run an
AckedFlowuntil it's connected into an
AckedSink, and all stream operations are restricted to those that can guarantee watertight acknowledgement, it is impossible to have an element be processed and not acknowledged (or, "nacked" due to an error). Of course, this is operating on the assumption that
AckedSinkdo their job correctly.
Here is the above code modified to use
All of the original functionality was retained, and the code is much simpler, easier to read. Naturally, the complexity is pushed elsewhere (to AckedStream), but that's just good separation of concerns.
FanOut / FanIn Graphs
AckedSink are a good start; but how do we handle the more complex
n-1 element routing scenarios provided by Akka Streams FlowGraph? There is a lot of rich functionality here, allowing composable disconnected stream fragments to be assembled into a closed graph.
Again, I considered, "do we press this pattern onward? Or, do we opt for generic types? Are we pressing it too far and creating too much complexity?" I was divided: proceeding seemed risky, and I could end up with significant lost time and, afterward, a terrible mess of complexity worse the original problem. I convinced myself it was worth it for the following reasons:
- The provided Akka Stream Broadcast component is incompatible with message acknowledgement; combining an AckedSource with this component would be an error.
- Combining an
AckedSourceto an Akka
Sinkis also an error.
- In an Akka FlowGraph, when you combine a
Flow, you get a FlowGraph.Implicits
FlowOpsand provides stream manipulation operations that are incompatible with an acknowledged Flow; using them is error-prone.
Continuing to use the delegator pattern, I've created an implementation of
AckedFlowGraph that supports a subset of the features. Here's some code using it:
As of this writing, the
AckedStream is contained within the
akka-stream integration, and can be found here, with examples found in the tests. The code is general enough that I will likely end up extracting it into its own library, although I presently have no need for it outside of the context of using
Some of this code may make some generalists frown. I consistently fought, along the way, a feeling inside that I was creating too many types. However, because of the aforementioned reasons, I feel the complexity is warranted because, fundamentally, acknowledged streams are different from non-acknowledged streams: their operations have different behavior, their supported operation lists don't overlap completely, and haphazardly intermixing acknowledged streams and non-acknowledged streams will cause issues.