Flow framework internals

Quasar rewrites bytecode to achieve a couple of things:

  • Collect the contents of the execution stack. It uses thread-local datastructures for this.
  • Create a way to jump from suspension points. It uses an un-catchable exception throw for this.
  • Create a way to jump into suspension points. It uses a sequence of switch statements for this.

To this end Quasar transforms the JVM bytecode of @Suspendable functions. Take the following as an example:

@Suspendable
fun s0(a): Int {
  val b = ..
  n1()
  s1()

  for (..) {
    val c = ..
    n2()
    s2(b)
  }

  n3()
  s3()

  return 1;
}

fun n1() { .. }
fun n2() { .. }
fun n3() { .. }
fun n4() { .. }
@Suspendable fun s1() { .. }
@Suspendable fun s2(b) { .. }
@Suspendable fun s3() { .. }

Quasar’s javaagent, when loading bytecode, will look for functions with the @Suspendable annotation. Furthermore within these functions it will look for callsites of other @Suspendable functions (which is why it’s important to annotate interface/abstract class methods as well as implementations). Note how n1-n4 are thus not instrumented, and their callsites aren’t relevant in the instrumentation of s0.

Disregarding any potential optimizations, quasar will then do the following transformation of s0:

// Quasar uses this annotation to store some metadata about the function, and to check whether the function has been instrumented already
@Instrumented
fun s0(a): Int {
  // A variable to temporarily store s0's return value later on
  var __return = null
  // A variable to indicate whether we are being resumed (we are jumping into a suspension's continuation), or this is a "regular" call.
  lateinit var __resumed: Boolean
  // Retrieve the Quasar-internal execution stack, which is stored in a thread-local.
  var __stack = co.paralleluniverse.fibers.Stack.getStack()
  if (__stack == null) {
    // There's no stack, execute as a regular function
    goto lMethodStart
  }

  // We are being resumed, we are jumping into the suspension point
  __resumed = true
  // Retrieve the integer that indicates which part of this function we should be jumping into, stored in a thread-local.
  val __entry = co.paralleluniverse.fibers.Stack.nextMethodEntry()
  when (__entry) {
    0 -> {
      TODO
    }
    1 -> {
      TODO
    }
    2 -> {
      TODO
    }
    else -> {
      // The entry value is not recognized, the function may be called in a non-suspending capacity.
      val __isFunctionCalledAsSuspendable = co.paralleluniverse.fibers.Stack.isFirstInStackOrPushed()
      if (_isFunctionCalledAsSuspendable) {
        goto lMethodStart
      }
      __stack = null
    }
  }

  // The first code block, starting from the original non-transformed function start.
  lMethodStart:
  // This try-catch handles the Quasar-specific SuspendExecution exception. Quasar prevents the catching of this exception in user code.
  try {
    __resumed = false
    val b = ..
    TODO
  } catch (e: SuspendExecution {
    TODO
  }
}

The above instrumentation allows the implementation of co-operative scheduling. That is, @Suspendable code can yield its execution by throwing a SuspendExecution exception. This exception throw takes care of handing the control flow to a top-level try-catch, which then has access to the thread-locally constructed execution stack, as well as a way to return to the suspension point using the “method entry” list.

A Fiber thus is nothing more than a data structure holding the execution stack, the method entry list, as well as various bookkeeping data related to the management of the Fiber, e.g. its state enum or identifier.

The main try-catch that handles the yielding may be found here.

The main idea behind checkpoints is to utilize the Fiber data structure and treat it as a serializable object capturing the state of a running computation. Whenever a Corda-suspendable API is hit, we capture the execution stack and corresponding entry list, and serialize it using Kryo, a reflection-based serialization library capable of serializing unstructured data. We thus get a handle to an arbitrary suspended computation.

In the flow state machine there is a strict separation of the user-code’s state, and the flow framework’s internal state. The former is the serialized Fiber, and the latter consists of structured objects.

The definition of a Checkpoint can be found here.

The “user state” can be found in FlowState. It is either

  • Unstarted: in this case there’s no Fiber to serialize yet, we serialize the FlowLogic instead.
  • Started: in this case the flow has been started already, and has been suspended on some IO. We store the FlowIORequest and the serialized Fiber.

The rest of the Checkpoint deals with internal bookkeeping. Sessions, the subflow-stack, errors. Note how all data structures are read-only. This is deliberate, to enable easier reasoning. Any “modification” of the checkpoint therefore implies making a shallow copy.

The internals of the flow framework were designed as a state machine. A flow is a strange event loop that has a state, and goes through state transitions triggered by events. The transitions may be either

  • User transitions, when we hand control to user-defined code in the cordapp. This may transition to a suspension point, the end of the flow, or may abort exceptionally.
  • Internal transitions, where we keep strict track of side-effects and failure conditions.

The core data structures of the state machine are:

  • StateMachineState: this is the full state of the state machine. It includes the Checkpoint (the persisted part of the state), and other non-persisted state, most importantly the list of pending DeduplicationHandler s, to be described later.
  • Event: Every state transition is triggered by one of these. These may be external events, notifying the state machine of something, or internal events, for example suspensions.
  • Action: These are created by internal state transitions. These transitions do not inherently execute any side-effects, instead, they create a list of Action s, which are later executed.
  • FlowContinuation: indicates how the state machine should proceed after a transition. It can resume to user code, throw an exception, keep processing events or abort the flow completely.

The state machine is a pure function that when given an Event and an initial StateMachineState returns the next state, a list of Action s to execute, and a FlowContinuation to indicate how to proceed:

// https://github.com/corda/corda/blob/c04a448bf391fb73f9b60cc41e8b5f0c23f81470/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TransitionResult.kt#L15
data class TransitionResult(
        val newState: StateMachineState,
        val actions: List<Action> = emptyList(),
        val continuation: FlowContinuation = FlowContinuation.ProcessEvents
)

// https://github.com/corda/corda/blob/c04a448bf391fb73f9b60cc41e8b5f0c23f81470/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StateMachine.kt#L12
fun transition(event: Event, state: StateMachineState): TransitionResult

The top-level entry point for the state machine transitions is in TopLevelTransition.

As an example let’s examine message delivery. This transition will be triggered by a DeliverSessionMessage event, defined like this:

data class DeliverSessionMessage(
        val sessionMessage: ExistingSessionMessage,
        override val deduplicationHandler: DeduplicationHandler,
        val sender: Party
) : Event(), GeneratedByExternalEvent

The event then goes through TopLevelTransition, which then passes it to DeliverSessionMessageTransition. This transition inspects the event, then does the relevant bookkeeping, updating sessions, buffering messages etc. Note that we don’t do any checkpoint persistence, and we don’t return control to the user code afterwards, we simply schedule a DoRemainingWork and return a ProcessEvents continuation. This means that it’s going to be the next transition that decides whether the received message is “relevant” to the current suspension, and whether control should thus be returned to user code with the message.

The state machine is a pure function, so what is the “driver” of it, that actually executes the transitions and side-effects? This is what FlowStateMachineImpl is doing, which is a Fiber. This class requires great care when it’s modified, as the programmer must be aware of what’s on the stack, what fields get persisted as part of the Checkpoint, and how the control flow is wired.

The usual way to implement state machines is to create a simple event loop that keeps popping events from a queue, and executes the resulting transitions. With flows however this isn’t so simple, because control must be returned to suspending operations. Therefore the eventloop is split up into several smaller eventloops, executed when “we get the chance”, i.e. when users call API functions. Whenever the flow calls a Flow API function, control is handed to the flow framework, that’s when we can process events, until a FlowContinuation indicates that control should be returned to user code.

There are two functions that aid the above:

  • FlowStateMachineImpl.processEventsUntilFlowIsResumed: as the name suggests this is a loop that keeps popping and processing events from the flow’s event queue, until a FlowContinuation.Resume or some continuation other than ProcessEvents is returned.
  • FlowStateMachineImpl.processEventImmediately: this function skips the event queue and processes an event immediately. There are certain transitions (e.g. subflow enter/exit) that must be done this way, otherwise the event ordering can cause problems.

The two main functions that call the above are the top-level run, which is the entry point of the flow, and suspend, which every blocking API call eventually calls.

Let’s take a look at suspend, which is the most delicate/brittle function in this class, and most probably the whole flow framework. Examining it will reveal a lot about how flows and fibers work.

@Suspendable
override fun <R : Any> suspend(ioRequest: FlowIORequest<R>, maySkipCheckpoint: Boolean): R {

First off, the type signature. We pass in a FlowIORequest<R>, which is an encapsulation of the IO action we’re about to suspend on. It is a sealed class with members like Send/Receive/ExecuteAsyncOperation. It is serializable, and will be part of the Checkpoint. In fact, it is doubly-serialized, as it is in FlowState in a typed form, but is also present in the Fiber’s stack, as a part of suspend’s stack frame.

We also pass a maySkipCheckpoint boolean which if true will prevent the checkpoint from being persisted.

The function returns R, but the runtime control flow achieving this “return” is quite tricky. When the Fiber suspends a SuspendExecution exception will be thrown, and when the fiber is resumed this suspend function will be re-entered, however this time in a “different capacity”, indicated by Quasar’s implicitly stored method entry, which will jump to the end of the suspension. This is repeated several times as this function has two suspension points, one of them possibly executing multiple times, as we will see later.

val serializationContext = TransientReference(getTransientField(TransientValues::checkpointSerializationContext))
val transaction = extractThreadLocalTransaction()

These lines extract some data required for the suspension. Note that both local variables are TransientReference s, which means the referred-to object will not be serialized as part of the stack frame. During resumption from a deserialized checkpoint these local variables will thus be null, however at that point these objects will not be required anymore.

The first line gets the serialization context from a TransientValues datastructure, which is where all objects live that are required for the flow’s functioning but which we don’t want to persist. This means all of these values must be re-initialized each time we are restoring a flow from a persisted checkpoint.

parkAndSerialize { _, _ ->

This is the Quasar API that does the actual suspension. The passed in lambda will not be executed in the current suspend frame, but rather is stored temporarily in the internal Fiber structure, and will be run in the outer Quasar try-catch as a “post park” action after the catch of the SuspendExecution exception. See Fiber.java for details.

This means that within this lambda the Fiber will have already technically parked, but it hasn’t yet properly yielded to the enclosing scheduler.

setLoggingContext()

Thread-locals are treated in a special way when Quasar suspends/resumes. Through use of reflection and JDK-internal unsafe operations it accesses all ThreadLocals in the current thread and swaps them with ones stored in the Fiber data structure. In essence for each thread that executes as a Fiber we have two sets of thread locals, one set belongs to the original “non-Quasar” thread, and the other belongs to the Fiber. During Fiber execution the latter is active, this is swapped with the former during suspension, and swapped back during resume. Note that during resume these thread-locals may actually be restored to a different thread than the original.

In the parkAndSerialize closure the Fiber is partially parked, and at this point the thread locals are already swapped out. This means that data stored in ThreadLocal s that we still need must be re-initialized somehow. In the above case this is the logging MDC.

// Will skip checkpoint if there are any idempotent flows in the subflow stack.
val skipPersistingCheckpoint = containsIdempotentFlows() || maySkipCheckpoint

contextTransactionOrNull = transaction.value
val event = try {
    Event.Suspend(
            ioRequest = ioRequest,
            maySkipCheckpoint = skipPersistingCheckpoint,
            fiber = this.checkpointSerialize(context = serializationContext.value)
    )
} catch (exception: Exception) {
    Event.Error(exception)
}

A couple of things happen here. First we determine whether this suspension’s subflow stack contains an IdempotentFlow, to determine whether to skip checkpoints. An idempotent flow is a subflow that’s safe to replay from the beginning. This means that no checkpoint will be persisted during its execution, as replaying from the previous checkpoint should yield the same results semantically. As an example the notary client flow is an IdempotentFlow, as notarisation is idempotent, and may be safely replayed.

We then set another thread-local, the database transaction, which was also swapped out during the park, and we made it available to the closure temporarily using a TransientReference earlier. The database transaction is used during serialization of the fiber and persistence of the checkpoint.

We then create the Suspend event, which includes the IO request and the serialized Fiber. If there’s an exception during serialization we create an Error event instead. Note how every condition, including error conditions are treated as “normal control flow” in the state machine, we must be extra careful as these conditions are also exposed to the user and are part of our API guarantees.

// We must commit the database transaction before returning from this closure otherwise Quasar may schedule
// other fibers, so we process the event immediately
val continuation = processEventImmediately(
        event,
        isDbTransactionOpenOnEntry = true,
        isDbTransactionOpenOnExit = false
)
require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "}

We first process the suspension event ASAP, as we must commit the underlying database transaction before the closure ends.

    unpark(SERIALIZER_BLOCKER)
}
return uncheckedCast(processEventsUntilFlowIsResumed(
        isDbTransactionOpenOnEntry = false,
        isDbTransactionOpenOnExit = true
))

As the last step in the park closure we unpark the Fiber we are currently parking. This effectively causes an “immediate” re-enter of the fiber, and therefore the suspend function, but this time jumping over the park and executing the next statement. Of course this re-enter may happen much later, perhaps even on a different thread.

We then enter a mini event-loop, which also does Quasar yields, processing the flow’s event queue until a transition continuation indicates that control can be returned to user code . Practically this means that when a flow is waiting on an IO action it won’t actually be blocked in the parkAndSerialize call, but rather in this event loop, popping from the event queue.

The processing of an event consists of two steps:

  • Calculating a transition. This is the pure StateMachineState + Event -> TransitionResult function.
  • Executing the transition. This is done by a TransitionExecutor, which in turn uses an ActionExecutor for individual Actions.

This structuring allows the introspection and interception of state machine transitions through the registering of TransitionExecutor interceptors. These interceptors are TransitionExecutor s that have access to a delegate. When they receive a new transition they can inspect it, pass it to the delegate, and do something specific to the interceptor.

For example checkpoint deserializability is checked by such an interceptor. It inspects a transition, and if it contains a Fiber checkpoint then it checks whether it’s deserializable in a separate thread.

The transition calculation is done in the net.corda.node.services.statemachine.transitions package, the top-level entry point being TopLevelTransition. There is a TransitionBuilder helper that makes the transition definitions a bit more readable. It contains a currentState field that may be updated with new StateMachineState instances as the event is being processed, and has some helper functions for common functionality, for example for erroring the state machine with some error condition.

Here are a couple of highlighted transitions:

Handling of Event.Suspend is quite straightforward and is done here. We take the serialized Fiber and the IO request and create a new checkpoint, then depending on whether we should persist or not we either simply commit the database transaction and schedule a DoRemainingWork (to be explained later), or we persist the checkpoint, run the DeduplicationHandler inside-tx hooks, commit, then run the after-tx hooks, and schedule a DoRemainingWork.

Every checkpoint persistence implies the above steps, in this specific order.

This is a generic event that simply tells the state machine: inspect your current state, and decide what to do, if anything. Using this event we can break down transitions into a and transition, which compose well with other transitions, as we don’t need to add special cases everywhere in the state machine.

As an example take error propagation. When a flow errors it’s put into an “errored” state, and it’s waiting for further instructions. One possibility is the triggering of error propagation through the scheduling of Event.StartErrorPropagation. Note how the handling of this event simply does the following:

currentState = currentState.copy(
        checkpoint = currentState.checkpoint.copy(
                errorState = errorState.copy(propagating = true)
        )
)
actions.add(Action.ScheduleEvent(Event.DoRemainingWork))

It marks the error state as propagating = true and schedules a DoRemainingWork. The processing of that event in turn will detect that we are errored and propagating, and there are some errors that haven’t been propagated yet. It then propagates those errors and updates the “propagated index” to indicate all errors have been dealt with. Subsequent DoRemainingWorks will thus do nothing. However, in case some other error condition or external event adds another error to the flow, we would automatically propagate that too, we don’t need to write a special case for it.

Most of the state machine logic is therefore about the handling DoRemainingWork. Another example is resumptions due to an IO request completing in some way. DoRemainingWork checks whether we are currently waiting for something to complete e.g. a FlowIORequest.Receive. It then checks whether the state contains enough data to complete the action, in the receive case this means checking the relevant sessions for buffered messages, and seeing whether those messages are sufficient to resume the flow with.

Once the transition has been calculated the transition is passed to the flow’s TransitionExecutor. The main executor is TransitionExecutorImpl, which executes the transition’s Action s, and handles errors by manually erroring the flow’s state. This is also when transition interceptors are triggered.

An error can manifest as either the whole flow erroring, or a specific session erroring. The former means that the whole flow is blocked from resumption, and it will end up in the flow hospital. A session erroring blocks only that specific session. Any interaction with this session will in turn error the flow. Session errors are created by a remote party propagating an error to our flow.

Let’s say we wanted to change the session messaging protocol. How would we go about changing the state machine?

The session logic is defined by

  • Session message definitions, see the SessionMessage sealed class.
  • Session state definitions, see the SessionState sealed class. This is the state we store per established/to-be-established session with a Party.
  • Session state transitions, see DeliverSessionMessageTransition.

Let’s say we wanted to add more handshake steps. To do this we need to add new types of SessionMessage s as required, new SessionState s, and cases to handle state transitions in DeliverSessionMessageTransition. This handles the receive path, to handle send paths StartedFlowTransition.sendTransition needs modifying, this is the transition triggered when the flow suspends on a send.

The flow framework guarantees atomicity of processing incoming events. This means that a flow or the node may be stopped at any time, even during processing of an event and on restart the node will reconstruct the correct state of the flows and will proceed as if nothing happened.

To do this each external event is given two hooks, one inside the database transaction committing the next checkpoint, and one after the commit, to enable implementation of exactly-once delivery on top of at-least-once. These hooks can be found on the DeduplicationHandler interface:

/**
 * This handler is used to implement exactly-once delivery of an external event on top of an at-least-once delivery. This is done
 * using two hooks that are called from the event processor, one called from the database transaction committing the
 * side-effect caused by the external event, and another one called after the transaction has committed successfully.
 *
 * For example for messaging we can use [insideDatabaseTransaction] to store the message's unique ID for later
 * deduplication, and [afterDatabaseTransaction] to acknowledge the message and stop retries.
 *
 * We also use this for exactly-once start of a scheduled flow, [insideDatabaseTransaction] is used to remove the
 * to-be-scheduled state of the flow, [afterDatabaseTransaction] is used for cleanup of in-memory bookkeeping.
 *
 * It holds a reference back to the causing external event.
 */
interface DeduplicationHandler {
    /**
     * This will be run inside a database transaction that commits the side-effect of the event, allowing the
     * implementor to persist the event delivery fact atomically with the side-effect.
     */
    fun insideDatabaseTransaction()

    /**
     * This will be run strictly after the side-effect has been committed successfully and may be used for
     * cleanup/acknowledgement/stopping of retries.
     */
    fun afterDatabaseTransaction()

    /**
     * The external event for which we are trying to reduce from at-least-once delivery to exactly-once.
     */
    val externalCause: ExternalEvent
}

Let’s take message delivery as an example. From the flow framework’s perspective we are assuming at least once delivery, and in order delivery. When a message is received a corresponding DeduplicationHandler is created. The hook inside the database transaction persists the message ID, and the hook after acknowledges it, stopping potential retries. If the node crashes before the transaction commits then the message will be redelivered, and if it crashes after it will be deduplicated based on the ID table.

We also use this for deduplicating scheduled flow starts, the inside hook removes the scheduled StateRef, and the after hook cleans up in-memory bookkeeping.

We could also use this for deduplicating RPC flow starts. A deduplication ID would be generated (and potentially stored) on the client, persisted on the node in the inside-tx hook, and the start would be acked afterwards, removing the ID from the client (and stopping retries).

Internally a list of pending DeduplicationHandler s is accumulated in the state machine in StateMachineState. When the next checkpoint is persisted the corresponding insideDatabaseTranscation hooks are run, and once the checkpoint is committed the afterDatabaseTransaction hooks are run.

Tracking of these handlers also allows us to do in-memory retries of flows. To do this we need to re-create the flow from the last checkpoint and retry external events internally. For every flow we have two lists of such “events”, one is the yet-unprocessed event queue of the flow, and one is the already processed but still pending list of DeduplicationHandler s. The concatenation of these events gives us a handle on the list of events relevant to the flow since the last persisted checkpoint, so we just need to re-process these events. All of these events go through the StateMachineManager, which is where the retry is handled too.

Full message deduplication is more complex, what we’ve discussed so far only dealt with the state machine bits.

When we receive a message from Artemis it is eventually handled by P2PMessagingClient.deliver, which consults the P2PDeduplicator class to determine whether the message is a duplicate. P2PDeduplicator holds two data structures:

  • processedMessages: the persisted message ID table. Any message ID in this table must have been committed together with a checkpoint that includes the side-effects caused by the message.
  • beingProcessedMessages: an in-memory map holding the message IDs until they are being processed and committed.

These two data structures correspond to the two DeduplicationHandler hooks of each message. insideDatabaseTransaction adds to the processedMessages map, afterDatabaseTransaction removes from beingProcessedMessages

The indirection through the in-memory map is needed because Artemis may redeliver unacked messages in certain situations, and at that point the message may still be “in-flight”, i.e. the ID may not be committed yet.

If the message isn’t a duplicate then it’s put into beingProcessedMessages and forwarded to the state machine manager, which then forwards it to the right flow or constructs one if this is an initiating message. When the next checkpoint of the relevant flow is persisted the message is “finalized” as discussed, using its DeduplicationHandler.

The flow hospital is a place where errored flows end up. This is done using an interceptor that detects error transitions and notifies the hospital.

The hospital can decide what to do with the flow: restart it from the last persisted checkpoint using an in-memory retry, keep the flow around pending either manual intervention or a restart of the node (in which case it will be retried from the last persisted checkpoint on start), or trigger error propagation, which makes the error permanent and notifies other parties the flow has sessions with of the failure.

This is where we can do special logic to handle certain error conditions like notary failures in a specific way e.g. by retrying.

Was this page helpful?

Thanks for your feedback!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.

We're sorry this page wasn't helpful. Let us know how we can make it better!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Create an issue

Create a new GitHub issue in this repository - submit technical feedback, draw attention to a potential documentation bug, or share ideas for improvement and general feedback.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.