Class FlowLogic

  • All Implemented Interfaces:

    
    public abstract class FlowLogic<T extends Object>
    
                        

    A sub-class of FlowLogic<T> implements a flow using direct, straight line blocking code. Thus you can write complex flow logic in an ordinary fashion, without having to think about callbacks, restarting after a node crash, how many instances of your flow there are running and so on.

    Invoking the network will cause the call stack to be suspended onto the heap and then serialized to a database using the Quasar fibers framework. Because of this, if you need access to data that might change over time, you should request it just-in-time via the serviceHub property which is provided. Don't try and keep data you got from a service across calls to send/receive/sendAndReceive because the world might change in arbitrary ways out from underneath you, for instance, if the node is restarted or reconfigured!

    Additionally, be aware of what data you pin either via the stack or in your FlowLogic implementation. Very large objects or datasets will hurt performance by increasing the amount of data stored in each checkpoint.

    If you'd like to use another FlowLogic class as a component of your own, construct it on the fly and then pass it to the subFlow method. It will return the result of that flow when it completes.

    If your flow (whether it's a top-level flow or a subflow) is supposed to initiate a session with the counterparty and request they start their counterpart flow, then make sure it's annotated with InitiatingFlow. This annotation also has a version property to allow you to version your flow and enables a node to restrict support for the flow to that particular version.

    Functions that suspend the flow (including all functions on FlowSession) accept a maySkipCheckpoint parameter defaulting to false, false meaning a checkpoint should always be created on suspend. This parameter may be set to true which allows the implementation to potentially optimise away the checkpoint, saving a roundtrip to the database.

    This option however comes with a big warning sign: Setting the parameter to true requires the flow's code to be replayable from the previous checkpoint (or start of flow) up until the next checkpoint (or end of flow) in order to prepare for hard failures. As suspending functions always commit the flow's database transaction regardless of this parameter the flow must be prepared for scenarios where a previous running of the flow already committed its relevant database transactions. Only set this option to true if you know what you're doing.

    • Constructor Detail

      • FlowLogic

        FlowLogic()
    • Method Detail

      • getLogger

         final <Error class: unknown class> getLogger()
      • getProgressTracker

         ProgressTracker getProgressTracker()

        Override this to provide a ProgressTracker. If one is provided and stepped, the framework will do something helpful with the progress reports e.g record to the audit service. If this flow is invoked as a subflow of another, then the tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track progress.

        Note that this has to return a tracker before the flow is invoked. You can't change your mind half way through.

      • initiateFlow

         final FlowSession initiateFlow(Destination destination)

        Creates a communication session with destination. Subsequently you may send/receive using this session object. How the messaging is routed depends on the Destination type, including whether this call does any initial communication.

      • initiateFlow

         final FlowSession initiateFlow(Party party)

        Creates a communication session with party. Subsequently you may send/receive using this session object. Note that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.

      • sendAndReceive

        @Deprecated(message = "Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING) final <R extends Any> UntrustworthyData<R> sendAndReceive(Party otherParty, Object payload)

        Serializes and queues the given payload object for sending to the otherParty. Suspends until a response is received, which must be of the given R type.

        Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

        Note that this function is not just a simple send+receive pair: it is more efficient and more correct to use this when you expect to do a message swap than do use send and then receive in turn.

      • sendAndReceive

        @Deprecated(message = "Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING) <R extends Any> UntrustworthyData<R> sendAndReceive(Class<R> receiveType, Party otherParty, Object payload)

        Serializes and queues the given payload object for sending to the otherParty. Suspends until a response is received, which must be of the given receiveType. Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

        Note that this function is not just a simple send+receive pair: it is more efficient and more correct to use this when you expect to do a message swap than do use send and then receive in turn.

      • receive

        @Deprecated(message = "Use FlowSession.receive()", level = DeprecationLevel.WARNING) final <R extends Any> UntrustworthyData<R> receive(Party otherParty)

        Suspends until the specified otherParty sends us a message of type R.

        Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

      • receive

        @Deprecated(message = "Use FlowSession.receive()", level = DeprecationLevel.WARNING) <R extends Any> UntrustworthyData<R> receive(Class<R> receiveType, Party otherParty)

        Suspends until the specified otherParty sends us a message of type receiveType.

        Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

      • send

        @Deprecated(message = "Use FlowSession.send()", level = DeprecationLevel.WARNING) Unit send(Party otherParty, Object payload)

        Queues the given payload for sending to the otherParty and continues without suspending.

        Note that the other party may receive the message at some arbitrary later point or not at all: if otherParty is offline then message delivery will be retried until it comes back or until the message is older than the network's event horizon time.

      • receiveAllMap

         Map<FlowSession, UntrustworthyData<Object>> receiveAllMap(Map<FlowSession, Class<out Object>> sessions, Boolean maySkipCheckpoint)

        Suspends until a message has been received for each session in the specified sessions.

        Consider receiveAll(receiveType: Class<R>, sessions: List<FlowSession>): List<UntrustworthyData<R>> when the same type is expected from all sessions.

        Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

      • receiveAll

         <R extends Any> List<UntrustworthyData<R>> receiveAll(Class<R> receiveType, List<FlowSession> sessions, Boolean maySkipCheckpoint)

        Suspends until a message has been received for each session in the specified sessions.

        Consider sessions: Map<FlowSession, Class<out Any>>): Map<FlowSession, UntrustworthyData<Any>> when sessions are expected to receive different types.

        Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.

      • sendAll

         final Unit sendAll(Object payload, Set<FlowSession> sessions, Boolean maySkipCheckpoint)

        Queues the given payload for sending to the provided sessions and continues without suspending.

        Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the network's event horizon time.

        Parameters:
        payload - the payload to send.
        sessions - the sessions to send the provided payload to.
        maySkipCheckpoint - whether checkpointing should be skipped.
      • sendAllMap

         final Unit sendAllMap(Map<FlowSession, Object> payloadsPerSession, Boolean maySkipCheckpoint)

        Queues the given payloads for sending to the provided sessions and continues without suspending.

        Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided sessions is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the network's event horizon time.

        Parameters:
        payloadsPerSession - a mapping that contains the payload to be sent to each session.
        maySkipCheckpoint - whether checkpointing should be skipped.
      • close

         final Unit close(NonEmptySet<FlowSession> sessions)

        Closes the provided sessions and performs cleanup of any resources tied to these sessions.

        Note that sessions are closed automatically when the corresponding top-level flow terminates. So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.). A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the provided sessions are not going to be used anymore. As a result, any operations on a closed session will fail with an UnexpectedFlowEndException. When a session is closed, the other side is informed and the session is closed there too eventually. To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an IllegalStateException.

      • subFlow

         <R extends Any> R subFlow(FlowLogic<R> subLogic)

        Invokes the given subflow. This function returns once the subflow completes successfully with the result returned by that subflow's call method. If the subflow has a progress tracker, it is attached to the current step in this flow's progress tracker.

        If the subflow is not an initiating flow (i.e. not annotated with InitiatingFlow) then it will continue to use the existing sessions this flow has created with its counterparties. This allows for subflows which can act as building blocks for other flows, for example removing the boilerplate of common sequences of sends and receives.

      • checkFlowPermission

         final Unit checkFlowPermission(String permissionName, Map<String, String> extraAuditData)

        Flows can call this method to ensure that the active FlowInitiator is authorised for a particular action. This provides fine grained control over application level permissions, when RPC control over starting the flow is insufficient, or the permission is runtime dependent upon the choices made inside long lived flow code. For example some users may have restricted limits on how much cash they can transfer, or whether they can change certain fields. An audit event is always recorded whenever this method is used. If the permission is not granted for the FlowInitiator a FlowException is thrown.

        Parameters:
        permissionName - is a string representing the desired permission.
        extraAuditData - in the audit log for this permission check these extra key value pairs will be recorded.
      • recordAuditEvent

         final Unit recordAuditEvent(String eventType, String comment, Map<String, String> extraAuditData)

        Flows can call this method to record application level flow audit events

        Parameters:
        eventType - is a string representing the type of event.
        comment - a general human readable summary of the event.
        extraAuditData - in the audit log for this permission check these extra key value pairs will be recorded.
      • call

         abstract T call()

        This is where you fill out your business logic.

      • waitForLedgerCommit

         final SignedTransaction waitForLedgerCommit(SecureHash hash, Boolean maySkipCheckpoint)

        Suspends the flow until the transaction with the specified ID is received, successfully verified and sent to the vault for processing. Note that this call suspends until the transaction is considered valid by the local node, but that doesn't imply the vault will consider it relevant.

      • waitForStateConsumption

         final Unit waitForStateConsumption(Set<StateRef> stateRefs)

        Suspends the current flow until all the provided StateRefs have been consumed.

        WARNING! Remember that the flow which uses this async operation will NOT wake-up until all the supplied StateRefs have been consumed. If the node isn't aware of the supplied StateRefs or if the StateRefs are never consumed, then the calling flow will remain suspended FOREVER!!

        Parameters:
        stateRefs - the StateRefs which will be consumed in the future.
      • flowStackSnapshot

         final FlowStackSnapshot flowStackSnapshot()

        Returns a shallow copy of the Quasar stack frames at the time of call to flowStackSnapshot. Use this to inspect what objects would be serialised at the time of call to a suspending action (e.g. send/receive). Note: This logic is only available during tests and is not meant to be used during the production deployment. Therefore the default implementation does nothing.

      • persistFlowStackSnapshot

         final Unit persistFlowStackSnapshot()

        Persists a shallow copy of the Quasar stack frames at the time of call to persistFlowStackSnapshot. Use this to track the monitor evolution of the quasar stack values during the flow execution. The flow stack snapshot is stored in a file located in {baseDir}/flowStackSnapshots/YYYY-MM-DD/{flowId}/ where baseDir is the node running directory and flowId is the flow unique identifier generated by the platform.

        Note: With respect to the flowStackSnapshot, the snapshot being persisted by this method is partial, meaning that only flow relevant traces and local variables are persisted. Also, this logic is only available during tests and is not meant to be used during the production deployment. Therefore the default implementation does nothing.

      • await

         final <R extends Any> R await(FlowExternalOperation<R> operation)

        Executes the specified operation and suspends until operation completion.

        An implementation of FlowExternalOperation should be provided that returns a result which the state machine will run on a separate thread (using the node's external operation thread pool).

      • checkFlowIsNotKilled

         final Unit checkFlowIsNotKilled()

        Helper function that throws a KilledFlowException if the current FlowLogic has been killed.

        Call this function in long-running computation loops to exit a flow that has been killed:

        for (item in list) {
          checkFlowIsNotKilled()
          // do some computation
        }

        See the isKilled property for more information.

      • checkFlowIsNotKilled

         final Unit checkFlowIsNotKilled(Function0<Object> lazyMessage)

        Helper function that throws a KilledFlowException if the current FlowLogic has been killed. The provided message is added to the thrown KilledFlowException.

        Call this function in long-running computation loops to exit a flow that has been killed:

        for (item in list) {
          checkFlowIsNotKilled { "The flow $runId was killed while iterating through the list of items" }
          // do some computation
        }

        See the isKilled property for more information.