State Persistence

Corda offers developers the option to expose all or some parts of a contract state to an Object Relational Mapping (ORM) tool to be persisted in a Relational Database Management System (RDBMS).

This assists vault development and allows the persistence of state data to a custom database table. Persisted states held in the vault are indexed to execute queries. This allows relational joins between Corda tables and the organization’s existing data.

The Object Relational Mapping is specified using Java Persistence API (JPA) annotations. This mapping is persisted to the database as a table row (a single, implicitly structured data item) by the node automatically every time a state is recorded in the node’s local vault as part of a transaction.

All ContractStates may implement the QueryableState interface. The interface inserts the ContractState into a custom table in the node’s database, accessible using SQL.

/**
 * A contract state that may be mapped to database schemas configured for this node to support querying for,
 * or filtering of, states.
 */
@KeepForDJVM
interface QueryableState : ContractState {
    /**
     * Enumerate the schemas this state can export representations of itself as.
     */
    fun supportedSchemas(): Iterable<MappedSchema>

    /**
     * Export a representation for the given schema.
     */
    fun generateMappedObject(schema: MappedSchema): PersistentState
}

The QueryableState interface requires the state to enumerate the different relational schemas it supports, for instance in situations where the schema has evolved. Each relational schema is represented as a MappedSchema object returned by the state’s supportedSchemas method.

Nodes have an internal SchemaService which decides what data to persist by selecting the MappedSchema to use. Once a MappedSchema is selected, the SchemaService will delegate to the QueryableState to generate a corresponding representation (mapped object) via the generateMappedObject method, the output of which is then passed to the ORM.

/**
 * A configuration and customisation point for Object Relational Mapping of contract state objects.
 */
interface SchemaService {
    /**
     * All available schemas in this node
     */
    val schemas: Set<MappedSchema>

    /**
     * Given a state, select schemas to map it to that are supported by [generateMappedObject] and that are configured
     * for this node.
     */
    fun selectSchemas(state: ContractState): Iterable<MappedSchema>

    /**
     * Map a state to a [PersistentState] for the given schema, either via direct support from the state
     * or via custom logic in this service.
     */
    fun generateMappedObject(state: ContractState, schema: MappedSchema): PersistentState
}
/**
 * A database schema that might be configured for this node.  As well as a name and version for identifying the schema,
 * also list the classes that may be used in the generated object graph in order to configure the ORM tool.
 *
 * @param schemaFamily A class to fully qualify the name of a schema family (i.e. excludes version)
 * @param version The version number of this instance within the family.
 * @param mappedTypes The JPA entity classes that the ORM layer needs to be configure with for this schema.
 */
@KeepForDJVM
open class MappedSchema(schemaFamily: Class<*>,
                        val version: Int,
                        val mappedTypes: Iterable<Class<*>>) {
    val name: String = schemaFamily.name

    /**
     * Optional classpath resource containing the database changes for the [mappedTypes]
     */
    open val migrationResource: String? = null

    override fun toString(): String = "${this.javaClass.simpleName}(name=$name, version=$version)"

    override fun equals(other: Any?): Boolean {
        if (this === other) return true
        if (javaClass != other?.javaClass) return false

        other as MappedSchema

        if (version != other.version) return false
        if (mappedTypes != other.mappedTypes) return false
        if (name != other.name) return false

        return true
    }

    override fun hashCode(): Int {
        var result = version
        result = 31 * result + mappedTypes.hashCode()
        result = 31 * result + name.hashCode()
        return result
    }
}

With this framework, the relational view of ledger states can evolve in a controlled fashion in lock-step with internal systems or other integration points, and is not dependent on changes to the contract code.

Multiple contract state implementations might provide mappings within a single schema. For example, an Interest Rate Swap contract and an Equity OTC Option contract might both provide a mapping to a derivative contract within the same schema. The schemas should typically not be part of the contract itself and should exist independently to encourage reuse of a common set within a particular business area or CorDapp.

MappedSchema offers a family name, which is disambiguated using Java-package-style name-spacing derived from the class name of a schema family class that is constant across versions. This allows the SchemaService to select a preferred version of a schema.

The SchemaService is also responsible for the SchemaOptions you can configure for a particular MappedSchema. These allow the configuration of database schemas or table name prefixes to avoid clashes with other MappedSchema.

Custom contract schemas register automatically when CorDapps start up. The node bootstrap process scans for states that implement the QueryableState interface. The MappedSchema specifies which tables to create, as identified by each state’s supportedSchemas method.

For testing purposes you must manually register the packages containing custom schemas:

  • Tests using MockNetwork and MockNode must explicitly register packages using the cordappPackages parameter of MockNetwork.
  • Tests using MockServices must explicitly register packages using the cordappPackages parameter of the MockServices makeTestDatabaseAndMockServices() helper method.

To facilitate the object relational mapping (ORM), the persisted representation of a QueryableState should be an instance of a PersistentState subclass,constructed either by the state itself or a plugin to the SchemaService. This allows the ORM layer to always associate a StateRef with a persisted representation of a ContractState and allows joining with the set of unconsumed states in the vault.

The PersistentState subclass should be marked up as a JPA 2.1 Entity with a defined table name and having properties (in Kotlin, getters/setters in Java) annotated to map to the appropriate columns and SQL types. Additional entities can be included to model these properties where they are more complex. The mapping does not need to be flat - for example, for collections. The MappedSchema constructor accepts a list of all JPA entity classes for that schema in the MappedTypes parameter. It must provide this list in order to initialise the ORM layer.

Several examples of entities and mappings are provided in the codebase, including Cash.State and CommercialPaper.State. For example, the first version of the cash schema:

package net.corda.finance.schemas

import net.corda.core.identity.AbstractParty
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.contracts.MAX_ISSUER_REF_SIZE
import org.hibernate.annotations.Type
import javax.persistence.*

/**
 * An object used to fully qualify the [CashSchema] family name (i.e. independent of version).
 */
object CashSchema

/**
 * First version of a cash contract ORM schema that maps all fields of the [Cash] contract state as it stood
 * at the time of writing.
 */
@Suppress("MagicNumber") // SQL column length
@CordaSerializable
object CashSchemaV1 : MappedSchema(schemaFamily = CashSchema.javaClass, version = 1, mappedTypes = listOf(PersistentCashState::class.java)) {

    override val migrationResource = "cash.changelog-master"

    @Entity
    @Table(name = "contract_cash_states", indexes = [Index(name = "ccy_code_idx", columnList = "ccy_code"), Index(name = "pennies_idx", columnList = "pennies")])
    class PersistentCashState(
            /** X500Name of owner party **/
            @Column(name = "owner_name", nullable = true)
            var owner: AbstractParty?,

            @Column(name = "pennies", nullable = false)
            var pennies: Long,

            @Column(name = "ccy_code", length = 3, nullable = false)
            var currency: String,

            @Column(name = "issuer_key_hash", length = MAX_HASH_HEX_SIZE, nullable = false)
            var issuerPartyHash: String,

            @Column(name = "issuer_ref", length = MAX_ISSUER_REF_SIZE, nullable = false)
            @Type(type = "corda-wrapper-binary")
            var issuerRef: ByteArray
    ) : PersistentState()
}

You may want to persist hierarchical relationships within state data using multiple database tables. To facilitate this, you must implement all queries making use of hierarchical relations as native SQL.

Example schemas implementing hierarchical relationships have been implemented below.

For one-to-one scenarios, use the following:

// Parent schema implemented as usual
class PersistentParent(
  @Column(name = "linear_id")
  var linearId: UUID
) : PersistentState() {
  constructor() : this( UUID.randomUUID() )
}

// Child has a reference to its parent
class PersistentChild(
  @Column(name = "linear_id")
  var linearId: UUID
  @Column=(name = "parent_linear_id")
  var parentLinearId: UUID
) : PersistentState() {
  constructor() : this( UUID.randomUUID(), UUID.randomUUID() )
}

For one-to-many scenarios, use the following:

// Parent schema implemented as usual
class PersistentParent(
  @Column(name = "linear_id")
  var linearId: UUID
) : PersistentState() {
  constructor() : this( UUID.randomUUID() )
}

// Child schema implemented as usual
class PersistentChild(
  @Column(name = "linear_id")
  var linearId: UUID
) : PersistentState() {
  constructor() : this( UUID.randomUUID())
}

// ParentChildThrough table schema
class PersistentThroughTable(
  @Column(name = "parent_linear_id")
  var parentLinearId: UUID
  @Column(name="child_linear_id")
  var childLinearId: UUID
) : PersistentState() {
  constructor() : this( UUID.randomUUID(), UUID.randomUUID() )
}

Schema entity attributes defined by identity types (AbstractParty, Party, AnonymousParty) are automatically processed to ensure only the X500Name of the identity is persisted where an identity is well known. Otherwise a null value is stored in the associated column. To preserve privacy, identity keys are never persisted. Developers should use the IdentityService to resolve keys from well-known X500 identity names.

Apps can interact directly with the underlying node’s database using a standard JDBC connection (session) as described by the Java SQL Connection API

Use the ServiceHub jdbcSession function to get a JDBC connection:

val nativeQuery = "SELECT v.transaction_id, v.output_index FROM vault_states v WHERE v.state_status = 0"

database.transaction {
    val jdbcSession = services.jdbcSession()
    val prepStatement = jdbcSession.prepareStatement(nativeQuery)
    val rs = prepStatement.executeQuery()
}

You can use JDBC sessions in flows and services. See Writing flows.

The following example illustrates the creation of a custom Corda service using a jdbcSession:

object CustomVaultQuery {

    @CordaService
    class Service(val services: AppServiceHub) : SingletonSerializeAsToken() {
        private companion object {
            private val log = contextLogger()
        }

        fun rebalanceCurrencyReserves(): List<Amount<Currency>> {
            val nativeQuery = """
                select
                    cashschema.ccy_code,
                    sum(cashschema.pennies)
                from
                    vault_states vaultschema
                join
                    contract_cash_states cashschema
                where
                    vaultschema.output_index=cashschema.output_index
                    and vaultschema.transaction_id=cashschema.transaction_id
                    and vaultschema.state_status=0
                group by
                    cashschema.ccy_code
                order by
                    sum(cashschema.pennies) desc
            """
            log.info("SQL to execute: $nativeQuery")
            val session = services.jdbcSession()
            return session.prepareStatement(nativeQuery).use { prepStatement ->
                prepStatement.executeQuery().use { rs ->
                    val topUpLimits: MutableList<Amount<Currency>> = mutableListOf()
                    while (rs.next()) {
                        val currencyStr = rs.getString(1)
                        val amount = rs.getLong(2)
                        log.info("$currencyStr : $amount")
                        topUpLimits.add(Amount(amount, Currency.getInstance(currencyStr)))
                    }
                    topUpLimits
                }
            }
        }
    }
}

which is then referenced within a custom flow:

@Suspendable
@Throws(CashException::class)
override fun call(): List<SignedTransaction> {
    progressTracker.currentStep = AWAITING_REQUEST
    val topupRequest = otherPartySession.receive<TopupRequest>().unwrap {
        it
    }

    val customVaultQueryService = serviceHub.cordaService(CustomVaultQuery.Service::class.java)
    val reserveLimits = customVaultQueryService.rebalanceCurrencyReserves()

    val txns: List<SignedTransaction> = reserveLimits.map { amount ->
        // request asset issue
        logger.info("Requesting currency issue $amount")
        val txn = issueCashTo(amount, topupRequest.issueToParty, topupRequest.issuerPartyRef, topupRequest.notaryParty)
        progressTracker.currentStep = SENDING_TOP_UP_ISSUE_REQUEST
        return@map txn.stx
    }

    otherPartySession.send(txns)
    return txns
}

For examples of @CordaService implementation testing, see the Oracle example.

Corda restricts the functions available by the Connection returned by jdbcSession. This is to prevent a flow’s underlying database transaction from being tampered with, which could cause errors within the flow.

Calling jdbcSession returns a RestrictedConnection which prevents calls to the following functions:

abort(executor: Executor?)
clearWarnings()
close()
commit()
setSavepoint() methods
releaseSavepoint(savepoint: Savepoint?)
rollback() methods
setCatalog(catalog : String?)
setTransactionIsolation(level: Int)
setTypeMap(map: MutableMap<String, Class<*>>?)
setHoldability(holdability: Int)
setSchema(schema: String?)
setNetworkTimeout(executor: Executor?, milliseconds: Int)
setAutoCommit(autoCommit: Boolean)
setReadOnly(readOnly: Boolean)

In addition to jdbcSession, ServiceHub also exposes the Java Persistence API to flows via the withEntityManager method. This method can be used to persist and query entities which inherit from MappedSchema. This is particularly useful if you need to maintain off-ledger data in conjunction with on-ledger state data.

The code snippet below defines a PersistentFoo type inside FooSchemaV1. Note that PersistentFoo is added to a list of mapped types which is passed to MappedSchema. This is exactly how state schemas are defined, except that the entity in this case should not subclass PersistentState as it is not a state object:

public class FooSchema {}

public class FooSchemaV1 extends MappedSchema {
    FooSchemaV1() {
        super(FooSchema.class, 1, ImmutableList.of(PersistentFoo.class));
    }

    @Entity
    @Table(name = "foos")
    class PersistentFoo implements Serializable {
        @Id
        @Column(name = "foo_id")
        String fooId;

        @Column(name = "foo_data")
        String fooData;
    }
}
object FooSchemaV1 : MappedSchema(
    schemaFamily = FooSchema.javaClass,
    version = 1,
    mappedTypes = listOf(PersistentFoo::class.java)
) {
    @Entity
    @Table(name = "foos")
    class PersistentFoo(
        @Id
        @Column(name = "foo_id")
        var fooId: String,
        @Column(name = "foo_data")
        var fooData: String
    ) : Serializable
}

Instances of PersistentFoo can be manually persisted inside a flow as follows:

PersistentFoo foo = new PersistentFoo(new UniqueIdentifier().getId().toString(), "Bar");
serviceHub.withEntityManager(entityManager -> {
    entityManager.persist(foo);
    return null;
});
val foo = FooSchemaV1.PersistentFoo(UniqueIdentifier().id.toString(), "Bar")
serviceHub.withEntityManager {
    persist(foo)
}

And retrieved via a query, as follows:

getServiceHub().withEntityManager((EntityManager entityManager) -> {
    CriteriaQuery<PersistentFoo> query = entityManager.getCriteriaBuilder().createQuery(PersistentFoo.class);
    Root<PersistentFoo> type = query.from(PersistentFoo.class);
    query.select(type);
    return entityManager.createQuery(query).getResultList();
});
serviceHub.withEntityManager {
    val query = criteriaBuilder.createQuery(FooSchemaV1.PersistentFoo::class.java)
    val type = query.from(FooSchemaV1.PersistentFoo::class.java)
    query.select(type)
    createQuery(query).resultList
}

Suspendable flow operations cannot be used within the lambda function passed to withEntityManager. This includes:

  • FlowSession.send
  • FlowSession.receive
  • FlowLogic.receiveAll
  • FlowLogic.sendAll
  • FlowLogic.sleep
  • FlowLogic.subFlow

Corda restricts the functions available using the EntityManager, which is returned by withEntityManager. This prevents a flow’s underlying database transaction from being tampered with, which would likely lead to errors within the flow.

The withEntityManager function provides an object that adheres to the EntityManager interface but with two differences:

  • getTransaction returns a RestrictedEntityTransaction.
  • All other restricted functions will produce UnsupportedOperationException exceptions.

The restricted functions are:

RestrictedEntityManager:

close()
unwrap()
getDelegate()
getMetamodel()
joinTransaction()
lock() methods
setProperty(propertyName: String?, value: Any?)

RestrictedEntityTransaction:

rollback()
commit()
begin()

When you call withEntityManager, an intermediate database session is created that provides rollback capability without affecting the current transaction.

A withEntityManager block has the following three outcomes, which are managed by the call and do not require you to manually flush or roll back a flow before calling withEntityManager:

  • Completes successfully: The intermediate session is automatically flushed to the underlying transaction.
  • Throws a database error: The intermediate session is automatically rolled back.
  • Throws a non-database error: The intermediate session is not flushed to the underlying transaction.

Changes are committed to the database when the transaction is committed.

The configuration is set to READ_COMMITTED by default, and it is not configurable. This is the best option for Corda operations at the database level.

A flow can handle database exceptions that occur within a withEntityManager block without affecting the flow’s underlying database transaction.

You can handle database errors that occur within a withEntityManager by catching relevant exceptions. Below are two ways to handle these exceptions:

Around the block:

// try around withEntityManager block
try {
    getServiceHub().withEntityManager(entityManager -> {
        entityManager.persist(entity);
    });
// catch around withEntityManager block
} catch (PersistenceException e) {
    // Exception thrown due to constraint violation
    getLogger().info("Ok, let's not save this entity 2");
}
// try around withEntityManager block
try {
    serviceHub.withEntityManager {
        persist(entity)
    }
// catch around withEntityManager block
} catch (e: PersistenceException) {
    // Exception thrown due to constraint violation
    logger.info("Caught the exception!")
}

There is no need for a flush when catching exceptions around the withEntityManager block. It automatically triggers a flush when leaving the block.

  • Inside the block:

    getServiceHub().withEntityManager(entityManager -> {
        entityManager.persist(entity);
        // try inside withEntityManager block
        try {
            // Manually trigger a flush on the intermediate session
            entityManager.flush();
        } catch (PersistenceException e) {
            // Exception thrown due to constraint violation
            getLogger().info("Ok, let's not save this entity");
        }
    });
    
    serviceHub.withEntityManager {
        persist(entity)
        // try inside withEntityManager block
        try {
            // Manually trigger a flush on the intermediate session
            flush()
        } catch (e: PersistenceException) {
            // Exception thrown due to constraint violation
            logger.info("Ok, let's not save this entity")
        }
    }
    

    You must manually trigger a flush if the exception is to be caught inside the entity manager. If the flush is not included, the code above would throw the PersistenceException instead of catching it.

You need to manually flush database changes to the underlying database transaction for two reasons:

  • Handling database errors: Handle any possible database errors that occur from the flush within the withEntityManager block.
  • Survive non-database errors: Keep your database changes even if a non-database error is thrown out of the withEntityManager block.

An example of flushing a session to survive a non-database error:

try {
    getServiceHub().withEntityManager(entityManager -> {
        entityManager.persist(entity);
        // Manually trigger a flush on the intermediate session
        entityManager.flush();
        throw new RuntimeException("Non-database error");
    });
} catch (Exception e) {
    getLogger().info("I still want to save that entity");
}
try {
    serviceHub.withEntityManager {
        persist(entity)
        // Manually trigger a flush on the intermediate session
        flush()
        throw RuntimeException("Non-database error")
    }
} catch (e: Exception) {
    logger.info("I still want to save that entity")
}

Was this page helpful?

Thanks for your feedback!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.

We're sorry this page wasn't helpful. Let us know how we can make it better!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Create an issue

Create a new GitHub issue in this repository - submit technical feedback, draw attention to a potential documentation bug, or share ideas for improvement and general feedback.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.