corda / net.corda.core.flows / FlowLogic

FlowLogic

abstract class FlowLogic<out T>

A sub-class of FlowLogic implements a flow using direct, straight line blocking code. Thus you can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after a node crash, how many instances of your flow there are running and so on.

Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should request it just-in-time via the serviceHub property which is provided. Don't try and keep data you got from a service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from underneath you, for instance, if the node is restarted or reconfigured!

Additionally, be aware of what data you pin either via the stack or in your FlowLogic implementation. Very large objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.

If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass it to the subFlow method. It will return the result of that flow when it completes.

If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty and request they start their counterpart flow, then make sure it's annotated with InitiatingFlow. This annotation also has a version property to allow you to version your flow and enables a node to restrict support for the flow to that particular version.

Functions that suspend the flow (including all functions on FlowSession) accept a maySkipCheckpoint parameter defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.

This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.

Constructors

<init>

A sub-class of FlowLogic implements a flow using direct, straight line blocking code. Thus you can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after a node crash, how many instances of your flow there are running and so on.

FlowLogic()

Properties

isKilled

Returns true when the current FlowLogic has been killed (has received a command to halt its progress and terminate).

val isKilled: Boolean

logger

This is where you should log things to.

val logger: Logger

ourIdentity

Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node. This is the same as calling ourIdentityAndCert.party.

val ourIdentity: Party

ourIdentityAndCert

Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that belong to this node.

val ourIdentityAndCert: PartyAndCertificate

progressTracker

Override this to provide a ProgressTracker. If one is provided and stepped, the framework will do something helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another, then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track progress.

open val progressTracker: ProgressTracker?

runId

Returns a wrapped java.util.UUID object that identifies this state machine run (i.e. subflows have the same identifier as their parents).

val runId: StateMachineRunId

serviceHub

Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is only available once the flow has started, which means it cannot be accessed in the constructor. Either access this lazily or from inside call.

val serviceHub: ServiceHub

Functions

await

Executes the specified operation and suspends until operation completion.

fun <R : Any> await(operation: FlowExternalAsyncOperation<R>): R
fun <R : Any> await(operation: FlowExternalOperation<R>): R

call

This is where you fill out your business logic.

abstract fun call(): T

checkFlowIsNotKilled

Helper function that throws a KilledFlowException if the current FlowLogic has been killed.

fun checkFlowIsNotKilled(): Unit

Helper function that throws a KilledFlowException if the current FlowLogic has been killed. The provided message is added to the thrown KilledFlowException.

fun checkFlowIsNotKilled(lazyMessage: () -> Any): Unit

checkFlowPermission

Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action. This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient, or the permission is runtime dependent upon the choices made inside long lived flow code. For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields. An audit event is always recorded whenever this method is used. If the permission is not granted for the FlowInitiator a FlowException is thrown.

fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>): Unit

close

Closes the provided sessions and performs cleanup of any resources tied to these sessions.

fun close(sessions: NonEmptySet<FlowSession>): Unit

flowStackSnapshot

Returns a shallow copy of the Quasar stack frames at the time of call to flowStackSnapshot. Use this to inspect what objects would be serialised at the time of call to a suspending action (e.g. send/receive). Note: This logic is only available during tests and is not meant to be used during the production deployment. Therefore the default implementation does nothing.

fun flowStackSnapshot(): FlowStackSnapshot?

getFlowInfo

Returns a FlowInfo object describing the flow otherParty is using. With FlowInfo.flowVersion it provides the necessary information needed for the evolution of flows and enabling backwards compatibility.

fun getFlowInfo(otherParty: Party): FlowInfo

initiateFlow

Creates a communication session with destination. Subsequently you may send/receive using this session object. How the messaging is routed depends on the Destination type, including whether this call does any initial communication.

fun initiateFlow(destination: Destination): FlowSession

Creates a communication session with party. Subsequently you may send/receive using this session object. Note that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.

fun initiateFlow(party: Party): FlowSession

persistFlowStackSnapshot

Persists a shallow copy of the Quasar stack frames at the time of call to persistFlowStackSnapshot. Use this to track the monitor evolution of the quasar stack values during the flow execution. The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/ where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.

fun persistFlowStackSnapshot(): Unit

receive

Suspends until the specified otherParty sends us a message of type R.

fun <R : Any> receive(otherParty: Party): UntrustworthyData<R>

Suspends until the specified otherParty sends us a message of type receiveType.

open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R>

receiveAll

Suspends until a message has been received for each session in the specified sessions.

open fun <R : Any> receiveAll(receiveType: Class<R>, sessions: List<FlowSession>, maySkipCheckpoint: Boolean = false): List<UntrustworthyData<R>>

receiveAllMap

Suspends until a message has been received for each session in the specified sessions.

open fun receiveAllMap(sessions: Map<FlowSession, Class<out Any>>, maySkipCheckpoint: Boolean = false): Map<FlowSession, UntrustworthyData<Any>>

recordAuditEvent

Flows can call this method to record application level flow audit events

fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit

send

Queues the given payload for sending to the otherParty and continues without suspending.

open fun send(otherParty: Party, payload: Any): Unit

sendAll

Queues the given payload for sending to the provided sessions and continues without suspending.

fun sendAll(payload: Any, sessions: Set<FlowSession>, maySkipCheckpoint: Boolean = false): Unit

sendAllMap

Queues the given payloads for sending to the provided sessions and continues without suspending.

fun sendAllMap(payloadsPerSession: Map<FlowSession, Any>, maySkipCheckpoint: Boolean = false): Unit

sendAndReceive

Serializes and queues the given payload object for sending to the otherParty. Suspends until a response is received, which must be of the given R type.

fun <R : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<R>

Serializes and queues the given payload object for sending to the otherParty. Suspends until a response is received, which must be of the given receiveType. Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R>

subFlow

Invokes the given subflow. This function returns once the subflow completes successfully with the result returned by that subflow's call method. If the subflow has a progress tracker, it is attached to the current step in this flow's progress tracker.

open fun <R> subFlow(subLogic: FlowLogic<R>): R

track

Returns a pair of the current progress step, as a string, and an observable of stringified changes to the progressTracker.

fun track(): DataFeed<String, String>?

trackStepsTree

Returns a pair of the current steps tree of current progressTracker as pairs of zero-based depth and stringified step label and observable of upcoming changes to the structure.

fun trackStepsTree(): DataFeed<List<Pair<Int, String>>, List<Pair<Int, String>>>?

trackStepsTreeIndex

Returns a pair of the current progress step index (as integer) in steps tree of current progressTracker, and an observable of its upcoming changes.

fun trackStepsTreeIndex(): DataFeed<Int, Int>?

waitForLedgerCommit

Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.

fun waitForLedgerCommit(hash: SecureHash, maySkipCheckpoint: Boolean = false): SignedTransaction

waitForStateConsumption

Suspends the current flow until all the provided StateRefs have been consumed.

fun waitForStateConsumption(stateRefs: Set<StateRef>): Unit

Companion Object Properties

currentTopLevel

Return the outermost FlowLogic instance, or null if not in a flow.

val currentTopLevel: FlowLogic<*>?

Companion Object Functions

sleep

If on a flow, suspends the flow and only wakes it up after at least duration time has passed. Otherwise, just sleep for duration. This sleep function is not designed to aid scheduling, for which you should consider using net.corda.core.contracts.SchedulableState. It is designed to aid with managing contention for which you have not managed via another means.

fun sleep(duration: Duration, maySkipCheckpoint: Boolean = false): Unit

Extension Functions

receiveAll

Suspends until a message has been received for each session in the specified sessions.

fun FlowLogic<*>.receiveAll(session: Pair<FlowSession, Class<out Any>>, vararg sessions: Pair<FlowSession, Class<out Any>>): Map<FlowSession, UntrustworthyData<Any>>
fun <R : Any> FlowLogic<*>.receiveAll(receiveType: Class<R>, session: FlowSession, vararg sessions: FlowSession): List<UntrustworthyData<R>>
fun <R : Any> FlowLogic<*>.receiveAll(session: FlowSession, vararg sessions: FlowSession): List<UntrustworthyData<R>>

Inheritors

AbstractCashFlow

Initiates a flow that produces an Issue/Move or Exit Cash transaction.

abstract class AbstractCashFlow<out T> : FlowLogic<T>

CashExitResponderFlow

class CashExitResponderFlow : FlowLogic<Unit>

CashPaymentReceiverFlow

class CashPaymentReceiverFlow : FlowLogic<Unit>

CollectSignatureFlow

Get and check the required signature.

class CollectSignatureFlow : FlowLogic<List<TransactionSignature>>

CollectSignaturesFlow

The CollectSignaturesFlow is used to automate the collection of counterparty signatures for a given transaction.

class CollectSignaturesFlow : FlowLogic<SignedTransaction>

DataVendingFlow

open class DataVendingFlow : FlowLogic<Void?>

FinalityFlow

Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction is acceptable then it is from that point onwards committed to the ledger, and will be written through to the vault. Additionally it will be distributed to the parties reflected in the participants list of the states.

class FinalityFlow : FlowLogic<SignedTransaction>

ReceiveFinalityFlow

The receiving counterpart to FinalityFlow.

class ReceiveFinalityFlow : FlowLogic<SignedTransaction>

ReceiveStateAndRefFlow

The ReceiveStateAndRefFlow should be called in response to the SendStateAndRefFlow.

class ReceiveStateAndRefFlow<out T : ContractState> : FlowLogic<List<StateAndRef<T>>>

ReceiveTransactionFlow

The ReceiveTransactionFlow should be called in response to the SendTransactionFlow.

open class ReceiveTransactionFlow : FlowLogic<SignedTransaction>

SignTransactionFlow

The SignTransactionFlow should be called in response to the CollectSignaturesFlow. It automates the signing of a transaction providing the transaction:

abstract class SignTransactionFlow : FlowLogic<SignedTransaction>

WithReferencedStatesFlow

Given a flow which uses reference states, the WithReferencedStatesFlow will execute the flow as a subFlow. If the flow fails due to a NotaryError.Conflict for a reference state, then WithReferencedStatesFlow will be suspended until the state refs for the reference states are consumed. In this case, a consumption means that:

class WithReferencedStatesFlow<T : Any> : FlowLogic<T>