public class FlowLogic<T>
A sub-class of class FlowLogic
implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on.
Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
request it just-in-time via the FlowLogic.getServiceHub
property which is provided. Don't try and keep data you got from a
service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
underneath you, for instance, if the node is restarted or reconfigured!
Additionally, be aware of what data you pin either via the stack or in your class FlowLogic
implementation. Very large
objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass
it to the FlowLogic.subFlow
method. It will return the result of that flow when it completes.
If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty
and request they start their counterpart flow, then make sure it's annotated with interface InitiatingFlow
. This annotation
also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
that particular version.
Functions that suspend the flow (including all functions on class FlowSession
) accept a maySkipCheckpoint parameter
defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.
Modifier and Type | Class and Description |
---|---|
static class |
FlowLogic.Companion |
Modifier and Type | Field and Description |
---|---|
static FlowLogic.Companion |
Companion |
Constructor and Description |
---|
FlowLogic()
A sub-class of
class FlowLogic implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on. |
Modifier and Type | Method and Description |
---|---|
<R> R |
await(FlowExternalAsyncOperation<R> operation)
Executes the specified operation and suspends until operation completion.
|
<R> R |
await(FlowExternalOperation<R> operation)
Executes the specified operation and suspends until operation completion.
|
T |
call()
This is where you fill out your business logic.
|
void |
checkFlowIsNotKilled()
Helper function that throws a
exception KilledFlowException if the current class FlowLogic has been killed. |
void |
checkFlowIsNotKilled(Function0<? extends java.lang.Object> lazyMessage)
Helper function that throws a
exception KilledFlowException if the current class FlowLogic has been killed. The provided message is added to the
thrown exception KilledFlowException . |
void |
checkFlowPermission(java.lang.String permissionName,
java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action.
This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient,
or the permission is runtime dependent upon the choices made inside long lived flow code.
For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields.
An audit event is always recorded whenever this method is used.
If the permission is not granted for the FlowInitiator a FlowException is thrown.
|
void |
close(NonEmptySet<net.corda.core.flows.FlowSession> sessions)
Closes the provided sessions and performs cleanup of any resources tied to these sessions.
|
FlowStackSnapshot |
flowStackSnapshot()
Returns a shallow copy of the Quasar stack frames at the time of call to
FlowLogic.flowStackSnapshot . Use this to inspect
what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
Note: This logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing. |
FlowInfo |
getFlowInfo(Party otherParty)
Deprecated.
|
NonExistentClass |
getLogger()
This is where you should log things to.
|
Party |
getOurIdentity()
Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node.
This is the same as calling
ourIdentityAndCert.party . |
PartyAndCertificate |
getOurIdentityAndCert()
Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that
belong to this node.
|
ProgressTracker |
getProgressTracker()
Override this to provide a
class ProgressTracker . If one is provided and stepped, the framework will do something
helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another,
then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
progress. |
StateMachineRunId |
getRunId()
Returns a wrapped java.util.UUID object that identifies this state machine run (i.e. subflows have the same
identifier as their parents).
|
ServiceHub |
getServiceHub()
Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
only available once the flow has started, which means it cannot be accessed in the constructor. Either
access this lazily or from inside
FlowLogic.call . |
FlowSession |
initiateFlow(Destination destination)
Creates a communication session with destination. Subsequently you may send/receive using this session object. How the messaging
is routed depends on the
interface Destination type, including whether this call does any initial communication. |
FlowSession |
initiateFlow(Party party)
Creates a communication session with party. Subsequently you may send/receive using this session object. Note
that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
|
boolean |
isKilled()
Returns
true when the current class FlowLogic has been killed (has received a command to halt its progress and terminate). |
void |
persistFlowStackSnapshot()
Persists a shallow copy of the Quasar stack frames at the time of call to
FlowLogic.persistFlowStackSnapshot .
Use this to track the monitor evolution of the quasar stack values during the flow execution.
The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform. |
<R> UntrustworthyData<R> |
receive(java.lang.Class<R> receiveType,
Party otherParty)
Deprecated.
|
<R> java.util.List<net.corda.core.utilities.UntrustworthyData> |
receiveAll(java.lang.Class<R> receiveType,
java.util.List<? extends net.corda.core.flows.FlowSession> sessions,
boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
|
java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> |
receiveAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions,
boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
|
void |
recordAuditEvent(java.lang.String eventType,
java.lang.String comment,
java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to record application level flow audit events
|
void |
send(Party otherParty,
java.lang.Object payload)
Deprecated.
|
void |
sendAll(java.lang.Object payload,
java.util.Set<? extends net.corda.core.flows.FlowSession> sessions,
boolean maySkipCheckpoint)
Queues the given payload for sending to the provided sessions and continues without suspending.
|
void |
sendAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Object> payloadsPerSession,
boolean maySkipCheckpoint)
Queues the given payloads for sending to the provided sessions and continues without suspending.
|
<R> UntrustworthyData<R> |
sendAndReceive(java.lang.Class<R> receiveType,
Party otherParty,
java.lang.Object payload)
Deprecated.
|
<R> R |
subFlow(FlowLogic<? extends R> subLogic)
Invokes the given subflow. This function returns once the subflow completes successfully with the result
returned by that subflow's
FlowLogic.call method. If the subflow has a progress tracker, it is attached to the
current step in this flow's progress tracker. |
DataFeed<java.lang.String,java.lang.String> |
track()
Returns a pair of the current progress step, as a string, and an observable of stringified changes to the
FlowLogic.getProgressTracker . |
DataFeed<java.util.List,java.util.List> |
trackStepsTree()
Returns a pair of the current steps tree of current
FlowLogic.getProgressTracker as pairs of zero-based depth and stringified step
label and observable of upcoming changes to the structure. |
DataFeed<java.lang.Integer,java.lang.Integer> |
trackStepsTreeIndex()
Returns a pair of the current progress step index (as integer) in steps tree of current
FlowLogic.getProgressTracker , and an observable
of its upcoming changes. |
SignedTransaction |
waitForLedgerCommit(SecureHash hash,
boolean maySkipCheckpoint)
Suspends the flow until the transaction with the specified ID is received, successfully verified and
sent to the vault for processing. Note that this call suspends until the transaction is considered
valid by the local node, but that doesn't imply the vault will consider it relevant.
|
void |
waitForStateConsumption(java.util.Set<net.corda.core.contracts.StateRef> stateRefs)
Suspends the current flow until all the provided
class StateRef s have been consumed. |
public static FlowLogic.Companion Companion
public FlowLogic()
A sub-class of class FlowLogic
implements a flow using direct, straight line blocking code. Thus you
can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after
a node crash, how many instances of your flow there are running and so on.
Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using
the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should
request it just-in-time via the FlowLogic.getServiceHub
property which is provided. Don't try and keep data you got from a
service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from
underneath you, for instance, if the node is restarted or reconfigured!
Additionally, be aware of what data you pin either via the stack or in your class FlowLogic
implementation. Very large
objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.
If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass
it to the FlowLogic.subFlow
method. It will return the result of that flow when it completes.
If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty
and request they start their counterpart flow, then make sure it's annotated with interface InitiatingFlow
. This annotation
also has a version property to allow you to version your flow and enables a node to restrict support for the flow to
that particular version.
Functions that suspend the flow (including all functions on class FlowSession
) accept a maySkipCheckpoint parameter
defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to
true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.
This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed itsrelevant database transactions. Only set this option to true if you know what you're doing.
public NonExistentClass getLogger()
This is where you should log things to.
public StateMachineRunId getRunId()
Returns a wrapped java.util.UUID object that identifies this state machine run (i.e. subflows have the same identifier as their parents).
public ServiceHub getServiceHub()
Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
only available once the flow has started, which means it cannot be accessed in the constructor. Either
access this lazily or from inside FlowLogic.call
.
FlowLogic.call
public boolean isKilled()
Returns true
when the current class FlowLogic
has been killed (has received a command to halt its progress and terminate).
Check this property in long-running computation loops to exit a flow that has been killed:
while (!isKilled) {
// do some computation
}
Ideal usage would include throwing a exception KilledFlowException
which will lead to the termination of the flow:
for (item in list) {
if (isKilled) {
throw KilledFlowException(runId)
}
// do some computation
}
Note, once the FlowLogic.isKilled
flag is set to true
the flow may terminate once it reaches the next API function marked with the
@Suspendable annotation. Therefore, it is possible to write a flow that does not interact with the FlowLogic.isKilled
flag while still
terminating correctly.
public FlowSession initiateFlow(Destination destination)
Creates a communication session with destination. Subsequently you may send/receive using this session object. How the messaging
is routed depends on the interface Destination
type, including whether this call does any initial communication.
interface Destination
public FlowSession initiateFlow(Party party)
Creates a communication session with party. Subsequently you may send/receive using this session object. Note that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
public PartyAndCertificate getOurIdentityAndCert()
Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that belong to this node.
public Party getOurIdentity()
Specifies the identity to use for this flow. This will be one of the multiple identities that belong to this node.
This is the same as calling ourIdentityAndCert.party
.
NodeInfo.getLegalIdentities
public FlowInfo getFlowInfo(Party otherParty)
Returns a class FlowInfo
object describing the flow otherParty is using. With FlowInfo.flowVersion it
provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
This method can be called before any send or receive has been done with otherParty. In such a case this will force them to start their flow.
class FlowInfo
public <R> UntrustworthyData<R> sendAndReceive(java.lang.Class<R> receiveType, Party otherParty, java.lang.Object payload)
Serializes and queues the given payload object for sending to the otherParty. Suspends until a response is received, which must be of the given receiveType. Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
use this when you expect to do a message swap than do use FlowLogic.send
and then receive in turn.
class UntrustworthyData
wrapper around the received object.FlowLogic.send
public <R> UntrustworthyData<R> receive(java.lang.Class<R> receiveType, Party otherParty)
Suspends until the specified otherParty sends us a message of type receiveType.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
class UntrustworthyData
wrapper around the received object.public void send(Party otherParty, java.lang.Object payload)
Queues the given payload for sending to the otherParty and continues without suspending.
Note that the other party may receive the message at some arbitrary later point or not at all: if otherParty is offline then message delivery will be retried until it comes back or until the message is older than the network's event horizon time.
public java.util.Map<net.corda.core.flows.FlowSession,net.corda.core.utilities.UntrustworthyData> receiveAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Class<? extends java.lang.Object>> sessions, boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
Consider receiveAllreceiveTypeClass,sessionsListListUntrustworthyData when the same type is expected from all sessions.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
public <R> java.util.List<net.corda.core.utilities.UntrustworthyData> receiveAll(java.lang.Class<R> receiveType, java.util.List<? extends net.corda.core.flows.FlowSession> sessions, boolean maySkipCheckpoint)
Suspends until a message has been received for each session in the specified sessions.
Consider sessionsMapFlowSession,ClassMapFlowSession,UntrustworthyData when sessions are expected to receive different types.
Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
public void sendAll(java.lang.Object payload, java.util.Set<? extends net.corda.core.flows.FlowSession> sessions, boolean maySkipCheckpoint)
Queues the given payload for sending to the provided sessions and continues without suspending.
Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the network's event horizon time.
payload
- the payload to send.sessions
- the sessions to send the provided payload to.maySkipCheckpoint
- whether checkpointing should be skipped.public void sendAllMap(java.util.Map<net.corda.core.flows.FlowSession,? extends java.lang.Object> payloadsPerSession, boolean maySkipCheckpoint)
Queues the given payloads for sending to the provided sessions and continues without suspending.
Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the network's event horizon time.
payloadsPerSession
- a mapping that contains the payload to be sent to each session.maySkipCheckpoint
- whether checkpointing should be skipped.public void close(NonEmptySet<net.corda.core.flows.FlowSession> sessions)
Closes the provided sessions and performs cleanup of any resources tied to these sessions.
Note that sessions are closed automatically when the corresponding top-level flow terminates.
So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.).
A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the provided sessions are not going to be used anymore.
As a result, any operations on a closed session will fail with an exception UnexpectedFlowEndException
.
When a session is closed, the other side is informed and the session is closed there too eventually.
To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an IllegalStateException.
public <R> R subFlow(FlowLogic<? extends R> subLogic)
Invokes the given subflow. This function returns once the subflow completes successfully with the result
returned by that subflow's FlowLogic.call
method. If the subflow has a progress tracker, it is attached to the
current step in this flow's progress tracker.
If the subflow is not an initiating flow (i.e. not annotated with interface InitiatingFlow
) then it will continue to use
the existing sessions this flow has created with its counterparties. This allows for subflows which can act as
building blocks for other flows, for example removing the boilerplate of common sequences of sends and receives.
class FlowLogic
s it communicated with. The subflow can be retried by catching this exception.FlowLogic.call
,
interface InitiatingFlow
public void checkFlowPermission(java.lang.String permissionName, java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action. This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient, or the permission is runtime dependent upon the choices made inside long lived flow code. For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields. An audit event is always recorded whenever this method is used. If the permission is not granted for the FlowInitiator a FlowException is thrown.
permissionName
- is a string representing the desired permission. Each flow is given a distinct namespace for these permissions.extraAuditData
- in the audit log for this permission check these extra key value pairs will be recorded.public void recordAuditEvent(java.lang.String eventType, java.lang.String comment, java.util.Map<java.lang.String,java.lang.String> extraAuditData)
Flows can call this method to record application level flow audit events
eventType
- is a string representing the type of event. Each flow is given a distinct namespace for these names.comment
- a general human readable summary of the event.extraAuditData
- in the audit log for this permission check these extra key value pairs will be recorded.public ProgressTracker getProgressTracker()
Override this to provide a class ProgressTracker
. If one is provided and stepped, the framework will do something
helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another,
then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
progress.
Note that this has to return a tracker before the flow is invoked. You can't change your mind half way through.
class ProgressTracker
public T call()
This is where you fill out your business logic.
public DataFeed<java.lang.String,java.lang.String> track()
Returns a pair of the current progress step, as a string, and an observable of stringified changes to the
FlowLogic.getProgressTracker
.
FlowLogic.getProgressTracker
public DataFeed<java.lang.Integer,java.lang.Integer> trackStepsTreeIndex()
Returns a pair of the current progress step index (as integer) in steps tree of current FlowLogic.getProgressTracker
, and an observable
of its upcoming changes.
FlowLogic.getProgressTracker
public DataFeed<java.util.List,java.util.List> trackStepsTree()
Returns a pair of the current steps tree of current FlowLogic.getProgressTracker
as pairs of zero-based depth and stringified step
label and observable of upcoming changes to the structure.
FlowLogic.getProgressTracker
public SignedTransaction waitForLedgerCommit(SecureHash hash, boolean maySkipCheckpoint)
Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.
public void waitForStateConsumption(java.util.Set<net.corda.core.contracts.StateRef> stateRefs)
Suspends the current flow until all the provided class StateRef
s have been consumed.
WARNING! Remember that the flow which uses this async operation will NOT wake-up until all the supplied StateRefs have been consumed. If the node isn't aware of the supplied StateRefs or if the StateRefs are never consumed, then the calling flow will remain suspended FOREVER!!
stateRefs
- the StateRefs which will be consumed in the future.class StateRef
public FlowStackSnapshot flowStackSnapshot()
Returns a shallow copy of the Quasar stack frames at the time of call to FlowLogic.flowStackSnapshot
. Use this to inspect
what objects would be serialised at the time of call to a suspending action (e.g. send/receive).
Note: This logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing.
FlowLogic.flowStackSnapshot
public void persistFlowStackSnapshot()
Persists a shallow copy of the Quasar stack frames at the time of call to FlowLogic.persistFlowStackSnapshot
.
Use this to track the monitor evolution of the quasar stack values during the flow execution.
The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/
where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.
Note: With respect to the FlowLogic.flowStackSnapshot
, the snapshot being persisted by this method is partial,
meaning that only flow relevant traces and local variables are persisted.
Also, this logic is only available during tests and is not meant to be used during the production deployment.
Therefore the default implementation does nothing.
public <R> R await(FlowExternalAsyncOperation<R> operation)
Executes the specified operation and suspends until operation completion.
An implementation of interface FlowExternalAsyncOperation
should be provided that creates a new future that the state machine awaits
completion of.
interface FlowExternalAsyncOperation
public <R> R await(FlowExternalOperation<R> operation)
Executes the specified operation and suspends until operation completion.
An implementation of interface FlowExternalOperation
should be provided that returns a result which the state machine will run on a separate
thread (using the node's external operation thread pool).
interface FlowExternalOperation
public void checkFlowIsNotKilled()
Helper function that throws a exception KilledFlowException
if the current class FlowLogic
has been killed.
Call this function in long-running computation loops to exit a flow that has been killed:
for (item in list) {
checkFlowIsNotKilled()
// do some computation
}
See the FlowLogic.isKilled
property for more information.
public void checkFlowIsNotKilled(Function0<? extends java.lang.Object> lazyMessage)
Helper function that throws a exception KilledFlowException
if the current class FlowLogic
has been killed. The provided message is added to the
thrown exception KilledFlowException
.
Call this function in long-running computation loops to exit a flow that has been killed:
for (item in list) {
checkFlowIsNotKilled { "The flow $runId was killed while iterating through the list of items" }
// do some computation
}
See the FlowLogic.isKilled
property for more information.