public class FlowLogic<T>
A sub-class of class FlowLogic
implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on.
Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
request it just-in-time via the serviceHub
property which is provided. Don't try and keep data you got from a
service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
underneath you, for instance, if the node is restarted or reconfigured!
Additionally, be aware of what data you pin either via the stack or in your class FlowLogic
implementation. Very large
objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass
it to the subFlow
method. It will return the result of that flow when it completes.
If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty
and request they start their counterpart flow, then make sure it's annotated with annotationclass InitiatingFlow
. This annotation
also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
that particular version.
Functions that suspend the flow (including all functions on class FlowSession
) accept a maySkipCheckpoint parameter
defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.
public FlowLogic()
A sub-class of class FlowLogic
implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on.
Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
request it just-in-time via the serviceHub
property which is provided. Don't try and keep data you got from a
service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
underneath you, for instance, if the node is restarted or reconfigured!
Additionally, be aware of what data you pin either via the stack or in your class FlowLogic
implementation. Very large
objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass
it to the subFlow
method. It will return the result of that flow when it completes.
If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty
and request they start their counterpart flow, then make sure it's annotated with annotationclass InitiatingFlow
. This annotation
also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
that particular version.
Functions that suspend the flow (including all functions on class FlowSession
) accept a maySkipCheckpoint parameter
defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.
@NotNull public org.slf4j.Logger getLogger()
This is where you should log things to.
@NotNull public StateMachineRunId getRunId()
Returns a wrapped java.util.UUID object that identifies this state machine run (i.e. subflows have the same identifier as their parents).
@NotNull public ServiceHub getServiceHub()
Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
only available once the flow has started, which means it cannot be accessed in the constructor. Either
access this lazily or from inside call
.
call
public boolean isKilled()
Returns true
when the current class FlowLogic
has been killed (has received a command to halt its progress and terminate).
Check this property in long-running computation loops to exit a flow that has been killed:
while (!isKilled) {
// do some computation
}
Ideal usage would include throwing a exception KilledFlowException
which will lead to the termination of the flow:
for (item in list) {
if (isKilled) {
throw KilledFlowException(runId)
}
// do some computation
}
Note, once the isKilled
flag is set to true
the flow may terminate once it reaches the next API function marked with the
@annotationclass Suspendable
annotation. Therefore, it is possible to write a flow that does not interact with the isKilled
flag while still
terminating correctly.
class FlowLogic
,
exception KilledFlowException
,
isKilled
,
annotationclass Suspendable
,
isKilled
@Suspendable @NotNull public FlowSession initiateFlow(@NotNull Destination destination)
Creates a communication session with destination
. Subsequently you may send/receive using this session object. How the messaging
is routed depends on the interface Destination
type, including whether this call does any initial communication.
destination
,
interface Destination
@Suspendable @NotNull public FlowSession initiateFlow(@NotNull Party party)
Creates a communication session with party
. Subsequently you may send/receive using this session object. Note
that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
party
@NotNull public PartyAndCertificate getOurIdentityAndCert()
Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that belong to this node.
@NotNull public Party getOurIdentity()
Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node.
This is the same as calling ourIdentityAndCert.party
.
NodeInfo.legalIdentities
@Deprecated @Suspendable @NotNull public FlowInfo getFlowInfo(@NotNull Party otherParty)
Returns a class FlowInfo
object describing the flow otherParty
is using. With FlowInfo.flowVersion it
provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
This method can be called before any send or receive has been done with otherParty
. In such a case this will force
them to start their flow.
class FlowInfo
,
otherParty
,
otherParty
@Deprecated @Suspendable @NotNull public <R> UntrustworthyData<R> sendAndReceive(@NotNull java.lang.Class<R> receiveType, @NotNull Party otherParty, @NotNull java.lang.Object payload)
Serializes and queues the given payload
object for sending to the otherParty
. Suspends until a response
is received, which must be of the given receiveType
. Remember that when receiving data from other parties the data
should not be trusted until it's been thoroughly verified for consistency and that all expectations are
satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
use this when you expect to do a message swap than do use send
and then receive in turn.
class UntrustworthyData
wrapper around the received object.payload
,
otherParty
,
receiveType
,
send
@Deprecated @Suspendable @NotNull public <R> UntrustworthyData<R> receive(@NotNull java.lang.Class<R> receiveType, @NotNull Party otherParty)
Suspends until the specified otherParty
sends us a message of type receiveType
.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
class UntrustworthyData
wrapper around the received object.otherParty
,
receiveType
@Deprecated @Suspendable public void send(@NotNull Party otherParty, @NotNull java.lang.Object payload)
Queues the given payload
for sending to the otherParty
and continues without suspending.
Note that the other party may receive the message at some arbitrary later point or not at all: if otherParty
is offline then message delivery will be retried until it comes back or until the message is older than the
network's event horizon time.
payload
,
otherParty
,
otherParty
@Suspendable @JvmOverloads @NotNull public java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> receiveAllMap(@NotNull java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions, boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions
.
Consider receiveAllreceiveType:Class,sessions:List:ListUntrustworthyData when the same type is expected from all sessions.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
@Suspendable @JvmOverloads @NotNull public java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> receiveAllMap(@NotNull java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions)
Suspends until a message has been received for each session in the specified sessions
.
Consider receiveAllreceiveType:Class,sessions:List:ListUntrustworthyData when the same type is expected from all sessions.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
@Suspendable @JvmOverloads @NotNull public <R> java.util.List<net.corda.core.utilities.UntrustworthyData> receiveAll(@NotNull java.lang.Class<R> receiveType, @NotNull java.util.List<? extends net.corda.core.flows.FlowSession> sessions, boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions
.
Consider sessions:MapFlowSession,Class:MapFlowSession,UntrustworthyData when sessions are expected to receive different types.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
@Suspendable @JvmOverloads @NotNull public <R> java.util.List<net.corda.core.utilities.UntrustworthyData> receiveAll(@NotNull java.lang.Class<R> receiveType, @NotNull java.util.List<? extends net.corda.core.flows.FlowSession> sessions)
Suspends until a message has been received for each session in the specified sessions
.
Consider sessions:MapFlowSession,Class:MapFlowSession,UntrustworthyData when sessions are expected to receive different types.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
@Suspendable @JvmOverloads public void sendAll(@NotNull java.lang.Object payload, @NotNull java.util.Set<? extends net.corda.core.flows.FlowSession> sessions, boolean maySkipCheckpoint)
Queues the given payload
for sending to the provided sessions
and continues without suspending.
Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions
is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the
network's event horizon time.
@Suspendable @JvmOverloads public void sendAll(@NotNull java.lang.Object payload, @NotNull java.util.Set<? extends net.corda.core.flows.FlowSession> sessions)
Queues the given payload
for sending to the provided sessions
and continues without suspending.
Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions
is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the
network's event horizon time.
@Suspendable @JvmOverloads public void sendAllMap(@NotNull java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Object> payloadsPerSession, boolean maySkipCheckpoint)
Queues the given payloads for sending to the provided sessions and continues without suspending.
Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the network's event horizon time.
payloadsPerSession
- a mapping that contains the payload to be sent to each session.maySkipCheckpoint
- whether checkpointing should be skipped.@Suspendable @JvmOverloads public void sendAllMap(@NotNull java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Object> payloadsPerSession)
Queues the given payloads for sending to the provided sessions and continues without suspending.
Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the network's event horizon time.
payloadsPerSession
- a mapping that contains the payload to be sent to each session.@Suspendable public void close(@NotNull NonEmptySet<net.corda.core.flows.FlowSession> sessions)
Closes the provided sessions and performs cleanup of any resources tied to these sessions.
Note that sessions are closed automatically when the corresponding top-level flow terminates.
So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.).
A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the provided sessions are not going to be used anymore.
As a result, any operations on a closed session will fail with an exception UnexpectedFlowEndException
.
When a session is closed, the other side is informed and the session is closed there too eventually.
To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an IllegalStateException.
@Suspendable public <R> R subFlow(@NotNull FlowLogic<? extends R> subLogic)
Invokes the given subflow. This function returns once the subflow completes successfully with the result
returned by that subflow's call
method. If the subflow has a progress tracker, it is attached to the
current step in this flow's progress tracker.
If the subflow is not an initiating flow (i.e. not annotated with annotationclass InitiatingFlow
) then it will continue to use
the existing sessions this flow has created with its counterparties. This allows for subflows which can act as
building blocks for other flows, for example removing the boilerplate of common sequences of sends and receives.
FlowException
- This is either thrown by subLogic
itself or propagated from any of the remote
class FlowLogic
s it communicated with. The subflow can be retried by catching this exception.call
,
annotationclass InitiatingFlow
public void checkFlowPermission(@NotNull java.lang.String permissionName, @NotNull java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action. This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient, or the permission is runtime dependent upon the choices made inside long lived flow code. For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields. An audit event is always recorded whenever this method is used. If the permission is not granted for the FlowInitiator a FlowException is thrown.
permissionName
- is a string representing the desired permission. Each flow is given a distinct namespace for these permissions.extraAuditData
- in the audit log for this permission check these extra key value pairs will be recorded.public void recordAuditEvent(@NotNull java.lang.String eventType, @NotNull java.lang.String comment, @NotNull java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to record application level flow audit events
eventType
- is a string representing the type of event. Each flow is given a distinct namespace for these names.comment
- a general human readable summary of the event.extraAuditData
- in the audit log for this permission check these extra key value pairs will be recorded.@Nullable public ProgressTracker getProgressTracker()
Override this to provide a class ProgressTracker
. If one is provided and stepped, the framework will do something
helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another,
then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
progress.
Note that this has to return a tracker before the flow is invoked. You can't change your mind half way through.
class ProgressTracker
@Suspendable public T call()
This is where you fill out your business logic.
@Nullable public DataFeed<java.lang.String,java.lang.String> track()
Returns a pair of the current progress step, as a string, and an observable of stringified changes to the
progressTracker
.
progressTracker
@Nullable public DataFeed<java.lang.Integer,java.lang.Integer> trackStepsTreeIndex()
Returns a pair of the current progress step index (as integer) in steps tree of current progressTracker
, and an observable
of its upcoming changes.
progressTracker
@Nullable public DataFeed<java.util.List,java.util.List> trackStepsTree()
Returns a pair of the current steps tree of current progressTracker
as pairs of zero-based depth and stringified step
label and observable of upcoming changes to the structure.
progressTracker
@Suspendable @JvmOverloads @NotNull public SignedTransaction waitForLedgerCommit(@NotNull SecureHash hash, boolean maySkipCheckpoint)
Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.
@Suspendable @JvmOverloads @NotNull public SignedTransaction waitForLedgerCommit(@NotNull SecureHash hash)
Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.
@Suspendable public void waitForStateConsumption(@NotNull java.util.Set<net.corda.core.contracts.StateRef> stateRefs)
Suspends the current flow until all the provided class StateRef
s have been consumed.
WARNING! Remember that the flow which uses this async operation will NOT wake-up until all the supplied StateRefs have been consumed. If the node isn't aware of the supplied StateRefs or if the StateRefs are never consumed, then the calling flow will remain suspended FOREVER!!
stateRefs
- the StateRefs which will be consumed in the future.class StateRef
@Suspendable @Nullable public FlowStackSnapshot flowStackSnapshot()
Returns a shallow copy of the Quasar stack frames at the time of call to flowStackSnapshot
. Use this to inspect
what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
Note: This logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing.
flowStackSnapshot
@Suspendable public void persistFlowStackSnapshot()
Persists a shallow copy of the Quasar stack frames at the time of call to persistFlowStackSnapshot
.
Use this to track the monitor evolution of the quasar stack values during the flow execution.
The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
Note: With respect to the flowStackSnapshot
, the snapshot being persisted by this method is partial,
meaning that only flow relevant traces and local variables are persisted.
Also, this logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing.
persistFlowStackSnapshot
,
flowStackSnapshot
@Suspendable @NotNull public <R> R await(@NotNull FlowExternalAsyncOperation<R> operation)
Executes the specified operation
and suspends until operation completion.
An implementation of interface FlowExternalAsyncOperation
should be provided that creates a new future that the state machine awaits
completion of.
operation
,
interface FlowExternalAsyncOperation
@Suspendable @NotNull public <R> R await(@NotNull FlowExternalOperation<R> operation)
Executes the specified operation
and suspends until operation completion.
An implementation of interface FlowExternalOperation
should be provided that returns a result which the state machine will run on a separate
thread (using the node's external operation thread pool).
operation
,
interface FlowExternalOperation
public void checkFlowIsNotKilled()
Helper function that throws a exception KilledFlowException
if the current class FlowLogic
has been killed.
Call this function in long-running computation loops to exit a flow that has been killed:
for (item in list) {
checkFlowIsNotKilled()
// do some computation
}
See the isKilled
property for more information.
exception KilledFlowException
,
class FlowLogic
,
isKilled
public void checkFlowIsNotKilled(@NotNull kotlin.jvm.functions.Function0<? extends java.lang.Object> lazyMessage)
Helper function that throws a exception KilledFlowException
if the current class FlowLogic
has been killed. The provided message is added to the
thrown exception KilledFlowException
.
Call this function in long-running computation loops to exit a flow that has been killed:
for (item in list) {
checkFlowIsNotKilled { "The flow $runId was killed while iterating through the list of items" }
// do some computation
}
See the isKilled
property for more information.
@Nullable public static FlowLogic<?> getCurrentTopLevel()
Return the outermost class FlowLogic
instance, or null if not in a flow.
class FlowLogic
@Suspendable @JvmStatic @JvmOverloads public static void sleep(@NotNull java.time.Duration duration, boolean maySkipCheckpoint)
If on a flow, suspends the flow and only wakes it up after at least duration
time has passed. Otherwise,
just sleep for duration
. This sleep function is not designed to aid scheduling, for which you should
consider using interface SchedulableState
. It is designed to aid with managing contention
for which you have not managed via another means.
Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no support for flow migration! This method will throw an exception if you attempt to sleep for longer than 5 minutes.
duration
,
duration
,
interface SchedulableState
@Suspendable @JvmStatic @JvmOverloads public static void sleep(@NotNull java.time.Duration duration)
If on a flow, suspends the flow and only wakes it up after at least duration
time has passed. Otherwise,
just sleep for duration
. This sleep function is not designed to aid scheduling, for which you should
consider using interface SchedulableState
. It is designed to aid with managing contention
for which you have not managed via another means.
Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no support for flow migration! This method will throw an exception if you attempt to sleep for longer than 5 minutes.
duration
,
duration
,
interface SchedulableState