Cassandra persistence

How to persist data using a Cassandra database.

Key takeaways

CassandraPersistence Component used to perform CRUD operations on data objects stored in a Cassandra database.
IdentifiableCassandraPersistence Component used to perform CRUD operations on identifiable data objects stored in a Cassandra database.
Identifiable data object Data object that contains a field named id with unique values that can be used to identify the object.

Introduction

In this tutorial, you will learn how to interact with Cassandra. First, we will see how to install the Cassandra module. Then, we will see how to perform CRUD operations with the CassandraPersistence and IdentifiableCassandraPersistence components. Finally, in the wrapping up section, we will review the concepts learned.

Cassandra persistence

Pip.Services contains the Cassandra module, which has several components that can be used to interact with Cassandra. In the following sections, we will learn two of them namely, CassandraPersistence and IdentifiableCassandraPersistence. As the names suggest, the main difference between them is the type of data objects handled. While the first component can be used with any type of data object, the second requires that the data object is identifiable (contains an id field).

General pre-requisites

In order to use this module, we must first install it. The following command shows how to do this:

npm install pip-services4-cassandra-node --save
Not available
Not available
Not available
Not available
Not available

Data object

The first thing that we need to do is to create a data object. To be able to have a common object for both components, we consider a structure with an id (identification) field. However, we should notice that this field is not required by the CassandraPersistence class. In addition, we add two fields to our example: name and description. The following code shows this data structure:

export class MyData implements IIdentifiable<string> {
    public id: string;
    public name: string;
    public description: string;
  }
Not available
Not available
Not available
Not available
Not available

CassandraPersistence

This is the most basic persistence component in this module. It contains the necessary methods to perform basic CRUD operations. The next sections explain each of them.

Pre-requisites

To be able to use this component, we need to import it first. The following command shows how to perform this task:

import { CassandraPersistence } from "pip-services4-cassandra-node";
Not available
Not available
Not available
Not available
Not available
Component implementation

Now, we can create a custom component as a subclass of CassandraPerformance. Within our class, we also define two protected methods. The first, named “defineSchema”, creates a table used to store our data. The second method adapts a FilterParams object to the specific needs of Cassandra. Later in this tutorial, we will see how to define other methods for CRUD operations that must also be included in our class. The code below is an example of this class:

class MyCassandraPersistence extends CassandraPersistence<MyData> {
      public constructor() {
          super("my_data_table", "mydata_keyspace");
      }
  
      protected defineSchema(): void {
          this.clearSchema();
          this.ensureSchema('CREATE TABLE ' + this.quotedTableName() + ' (id TEXT PRIMARY KEY, name TEXT, description TEXT)');
          this.ensureIndex('name', { name: 1 }, { unique: true });
      }
  
  
      protected filterCondition(filter: FilterParams):string{
          let key = filter.getKeys()[0];
          console.log(`key:\n ${JSON.stringify(key)} \n`);
          filter = filter || new FilterParams();
          let keyValue = filter.getAsNullableString(key);
          console.log(`key:\n ${JSON.stringify(keyValue)} \n`);
          let filterCondition: string = null;
          if (keyValue)
              filterCondition = key + "='" + keyValue + "'";
          console.log(`filterCondition:\n ${JSON.stringify(filterCondition)} \n`);
          return filterCondition;
      }
}
Not available
Not available
Not available
Not available
Not available
Connection

After creating our persistence object, we need to connect to Cassandra. For this, we create an instance of our persistence component and use the configure() method to specify the necessary connection parameters. Then, via the open() method, we connect to Cassandra. In our example, we consider a local database and port 9042. We also specify our credentials. The code is as follows:

let persistence = new MyCassandraPersistence();
  
persistence.configure(ConfigParams.fromTuples(
    "connection.host", "localhost",
    "connection.port", 9042,
    'connection.datacenter', 'datacenter1',
    "connection.username", "cassandra",
    "connection.password", "cassandra",
));

await persistence.open(null);
Not available
Not available
Not available
Not available
Not available

We must also remember to free resources once our operations have been executed. This is done with the close() method. The following code shows how to use it:

await persistence.clear(null);
Not available
Not available
Not available
Not available
Not available
CRUD operations

Once we have our persistence component ready, we can start performing CRUD operations. The following sections explain how to use the different methods available in this class.

Create

First, we create data objects for our examples. The code below creates two different objects named item1 and item2 respectively:

let item1: MyData = { id: IdGenerator.nextLong(), name: 'name_1', description: 'description_1' }; 
let item2: MyData = { id: IdGenerator.nextLong(), name: 'name_2', description: 'description_2' }; 
Not available
Not available
Not available
Not available
Not available

Then, we store these objects in our database with the create() method. This method requires the correlation_id and a data object as inputs. The following code shows how to use it:

let result1 = await persistence.create(null, item1);
console.log(`Created item:\n ${JSON.stringify(result1)} \n`);

let result2 = await persistence.create(null, item2);
console.log(`Created item:\n ${JSON.stringify(result2)} \n`);
Not available
Not available
Not available
Not available
Not available

After running the previous code, we obtain the following values for result1 and result2 respectively:

figure 1

Read

To read a record from our database, we have several methods.

getPageByFilter()

This is a private method. Thus, we need to create our public version of it and use the filterCondition() method to adapt our FilterParams object to Cassandra. The code below shows how to define this method and how to call it from our persistence object:

public getPageByFilter(correlationId: Context, filter: FilterParams, paging: PagingParams): Promise<DataPage<MyData>> {
    let filterCondition: string = this.filterCondition(filter);

    return super.getPageByFilter(correlationId, filterCondition, paging, null, null);
}	
// ...

let page: DataPage<MyData> = await persistence.getPageByFilter(
    null, FilterParams.fromTuples('name', 'name_1'), null
);
Not available
Not available
Not available
Not available
Not available

Which, after running, returns the following record:

figure 2

getOneRandom()

This method extracts a random record according to a given condition. As this method is private, we need to first define it in our custom component and then call it from our persistence object. We also need to use the filterCondition() method to adapt our filter to Cassandra. The following code shows how to do this:

public getOneRandom(correlationId: Context, filter: FilterParams):Promise<MyData>{
  
    let filterCondition: string = this.filterCondition(filter);

    return super.getOneRandom(correlationId, filterCondition);

}

// ...

let data: MyData = await persistence.getOneRandom(
    null, FilterParams.fromTuples('name', 'name_1')
);
Not available
Not available
Not available
Not available
Not available

After running the above code, we get a result similar to the following record:

figure 3

getCountByFilter()

This method returns the number of stored records according to a given condition. Once again, as this is a private method, we need to define it in our class and call it from our persistence object. Similar to the previous method, we also need to use the filterCondition() method to adapt to Cassandra. The code below illustrates its usage:

public getCountByFilter(correlationId: Context, filter: FilterParams):Promise<number>{
  
    let filterCondition: string = this.filterCondition(filter);

    return super.getCountByFilter(correlationId, filterCondition);

}

// ...

let counter1: number = await persistence.getCountByFilter(
    null, FilterParams.fromTuples('name', 'name_1')
);
Not available
Not available
Not available
Not available
Not available

Which, after running, provides the following result:

figure 4

Update

This component has no specific update() method.

Delete

To delete a record, this class provides the deleteByFilter() method, which is private and deletes one or more records according to a given filter. This method returns no result. In order to use it, we need to consider two points: first, we have to remember that Cassandra only allows for the deletion of records specified by their primary key (in our case the id field). Second, as this is a private method, we need to define our public version and then run it from our class instance. The following code shows how to do this:

public deleteByFilter(correlationId: Context, filter: FilterParams):Promise<void>{
  
    let filterCondition: string = this.filterCondition(filter);
    return super.deleteByFilter(correlationId, filterCondition);

}

// ...

await persistence.deleteByFilter(
    null, FilterParams.fromTuples('id', '1', 'name', 'name_1')
);
Not available
Not available
Not available
Not available
Not available
Final code

Now, we can assemble everything together and obtain the following program:

import { ConfigParams, Context } from "pip-services4-components-node";
import { IIdentifiable, FilterParams, PagingParams, DataPage, IdGenerator } from "pip-services4-data-node";
import { AnyValueMap,  } from "pip-services4-commons-node";
import { CassandraPersistence } from "pip-services4-cassandra-node";

export class MyData implements IIdentifiable<string> {
    public id: string;
    public name: string;
    public description: string;
  }
  
  class MyCassandraPersistence extends CassandraPersistence<MyData> {
      public constructor() {
          super("my_data_table", "mydata_keyspace");
      }
  
      protected defineSchema(): void {
          this.clearSchema();
          this.ensureSchema('CREATE TABLE ' + this.quotedTableName() + ' (id TEXT PRIMARY KEY, name TEXT, description TEXT)');
          this.ensureIndex('name', { name: 1 }, { unique: true });
      }
  
  
      protected filterCondition(filter: FilterParams):string{
          let key = filter.getKeys()[0];
          console.log(`key:\n ${JSON.stringify(key)} \n`);
          filter = filter || new FilterParams();
          let keyValue = filter.getAsNullableString(key);
          console.log(`key:\n ${JSON.stringify(keyValue)} \n`);
          let filterCondition: string = null;
          if (keyValue)
              filterCondition = key + "='" + keyValue + "'";
          console.log(`filterCondition:\n ${JSON.stringify(filterCondition)} \n`);
          return filterCondition;
      }
  
      public getPageByFilter(correlationId: Context, filter: FilterParams, paging: PagingParams): Promise<DataPage<MyData>> {
          let filterCondition: string = this.filterCondition(filter);
  
          return super.getPageByFilter(correlationId, filterCondition, paging, null, null);
      }	
  
      public getOneRandom(correlationId: Context, filter: FilterParams):Promise<MyData>{
  
          let filterCondition: string = this.filterCondition(filter);
  
          return super.getOneRandom(correlationId, filterCondition);
  
      }
  
      public getCountByFilter(correlationId: Context, filter: FilterParams):Promise<number>{
  
          let filterCondition: string = this.filterCondition(filter);
  
          return super.getCountByFilter(correlationId, filterCondition);
  
      }
  
  
      public deleteByFilter(correlationId: Context, filter: FilterParams):Promise<void>{
  
          let filterCondition: string = this.filterCondition(filter);
          return super.deleteByFilter(correlationId, filterCondition);
  
      }
  }
  
  
  export async function main(): Promise<void> {
      let persistence = new MyCassandraPersistence();
  
      persistence.configure(ConfigParams.fromTuples(
          "connection.host", "localhost",
          "connection.port", 9042,
          'connection.datacenter', 'datacenter1',
          "connection.username", "cassandra",
          "connection.password", "cassandra",
      ));
  
      await persistence.open(null);
      await persistence.clear(null);
      
  
      // Data objects
      let item1: MyData = { id: IdGenerator.nextLong(), name: 'name_1', description: 'description_1' }; 
      let item2: MyData = { id: IdGenerator.nextLong(), name: 'name_2', description: 'description_2' }; 
      // CRUD
      // Create
      let result1 = await persistence.create(null, item1);
      console.log(`Created item:\n ${JSON.stringify(result1)} \n`);
  
      let result2 = await persistence.create(null, item2);
      console.log(`Created item:\n ${JSON.stringify(result2)} \n`);
      
      // Read
      let page: DataPage<MyData> = await persistence.getPageByFilter(
          null, FilterParams.fromTuples('name', 'name_1'), null
      );
      console.log(`Page:\n ${JSON.stringify(page.data)} \n`);
      
      let data: MyData = await persistence.getOneRandom(
          null, FilterParams.fromTuples('name', 'name_1')
      );
      console.log(`Random Data:\n ${JSON.stringify(data)} \n`);
      
      let counter1: number = await persistence.getCountByFilter(
          null, FilterParams.fromTuples('name', 'name_1')
      );
      console.log(`Counter:\n ${JSON.stringify(counter1)} \n`);
  
      let counter2: number = await persistence.getCountByFilter(
          null, FilterParams.fromTuples('name','description_2')
      );
      console.log(`Counter:\n ${JSON.stringify(counter2)} \n`);
  
      // Update
      
      // Delete
  
      await persistence.deleteByFilter(
          null, FilterParams.fromTuples('id', '1', 'name', 'name_1')
      );
      console.log(`Record deleted\n`);
      
      
      await persistence.close(null);
  
      console.log('Persistence closed');
  }
Not available
Not available
Not available
Not available
Not available

IdentifiableCassandraPersistence

This component is a subclass of the previous one and, as such, it inherits all its methods. The main difference between them is that this component requires an identifiable data object, that is, an object that contains a parameter named id whose value uniquely identifies it. The following sections explain the main CRUD methods contained in this class.

Pre-requisites

In order to use this component, we need to import it first. The following command shows how to do this:

import { IdentifiableCassandraPersistence } from "pip-services4-cassandra-node";
Not available
Not available
Not available
Not available
Not available
Component implementation

As we did with the previous class, we create a custom persistence component that inherits this class. In it, we also code a defineSchema() method that is used to create a table in the database. If we want to use the methods of the previous class, we need to define a filterCondition() method as we did before. In the examples below, we only consider the methods in this class, which don’t require filters. The code below shows an example of this custom component:

class MyCassandraPersistence extends IdentifiableCassandraPersistence<MyData, string> {
      public constructor() {
          super("my_data_table", "mydata_keyspace");
      }
  
      protected defineSchema(): void {
          this.clearSchema();
          this.ensureSchema('CREATE TABLE ' + this.quotedTableName() + ' (id TEXT PRIMARY KEY, name VARCHAR(20), description VARCHAR(20))');
          this.ensureIndex('name', { name: 1 }, { unique: true });
      }
      
  }
Not available
Not available
Not available
Not available
Not available
Connection

Once we have our component ready, we create an instance of it and define our connection to the database via the configure() method. Then, we connect to Cassandra via the open() method. The following code shows how to do this:

let persistence = new MyCassandraPersistence();
  
persistence.configure(ConfigParams.fromTuples(
    "connection.host", "localhost",
    "connection.port", 9042,
    'connection.datacenter', 'datacenter1',
    "connection.username", "cassandra",
    "connection.password", "cassandra",
));

await persistence.open(null);
Not available
Not available
Not available
Not available
Not available

And, after using our component, we free resources with the close() method:

await persistence.clear(null);
Not available
Not available
Not available
Not available
Not available
CRUD operations

This component offers a set of methods for CRUD operations that are based on identifiable objects. The following sections show how to use the main ones.

Create

As we did with the previous component, we need to create some data objects first. The procedure is identical to what we saw before:

// Data objects
let item = new MyData();
let item1: MyData = { id: '1', name: 'name_1', description: 'description_1' }; 
let item2: MyData = { id: '2', name: 'name_2', description: 'description_2' }; 

// CRUD
// Create
item = await persistence.create(null, item1);
Not available
Not available
Not available
Not available
Not available

Where the value of item is:

figure 5

And

item = await persistence.create(ctx, item2);
Not available
Not available
Not available
Not available
Not available

Where item has the following value:

figure 6

Read

This component adds two methods that can be used to read records.

getOneById()

This method extracts one record according to a given id value. The following example shows how to use it:

item = await persistence.getOneById(null,"1");
Not available
Not available
Not available
Not available
Not available

Where item has the following value:

figure 7

getListByIds()

This method can be used to extract one or more records. It accepts a list of ids as an input and returns a list containing the extracted records. The example below explains its usage:

let idList = ['1', '2']
let itemList = await persistence.getListByIds(null,idList);
Not available
Not available
Not available
Not available
Not available

Where itemList has the following value:

figure 8

Update

This component offers three methods to update a record.

update()

This method accepts a data item as input and updates the record stored with the given id. The following lines of code show how to use it:

let item1U: MyData = { id: '1', name: 'name_1_Updated', description: 'description_1_Updated' }; 
item = await persistence.update(null,item1U);
Not available
Not available
Not available
Not available
Not available

Where item has the following value:

figure 9

updatePartially()

This method accepts an id and an AnyValueMap object with a field and a value as inputs and updates the specified field in the specified record. The following code exemplifies its usage:

let value1 = new AnyValueMap({ name: "name_2_Updated" });
item = await persistence.updatePartially (null,"2",value1);
Not available
Not available
Not available
Not available
Not available

Where item has the following value:

figure 10

set()

Alternatively, we can use the set() method, which updates an existing record or creates a new one if the record was not found. The following code illustrates its use:

let item1U2: MyData = { id: '1', name: 'name_1_Updated_2', description: 'description_1_Updated_2' }; 
item = await persistence.set (null, item1U2);
Not available
Not available
Not available
Not available
Not available

Where item has the following value:

figure 11

Delete

This component adds two delete methods to its parent class.

deleteById()

This method accepts the correlation_id and a record’s id as inputs and deletes the record identified by the given id. Once the record is deleted, this method returns the deleted record. The following example shows how to use it:

item = await persistence.deleteById(null,"1");
Not available
Not available
Not available
Not available
Not available

Where item has the following value:

figure 12

deleteByIds()

If we want to delete more than one record, we can use this method. It accepts the correlation_id and a list containing the ids of the records to be deleted as inputs. This method doesn’t return any value. The following example explains how to delete the records with ids equal to one and two:

await persistence.deleteByIds(null,idList);
Not available
Not available
Not available
Not available
Not available
Final code

We can now combine the previous examples into one program that performs all the CRUD operations:

import { ConfigParams, References } from "pip-services4-components-node";
import { IIdentifiable } from "pip-services4-data-node";
import { AnyValueMap } from "pip-services4-commons-node";
import { IdentifiableCassandraPersistence } from "pip-services4-cassandra-node";

export class MyData implements IIdentifiable<string> {
    public id: string;
    public name: string;
    public description: string;
  }
  
  class MyCassandraPersistence extends IdentifiableCassandraPersistence<MyData, string> {
      public constructor() {
          super("my_data_table", "mydata_keyspace");
      }
  
      protected defineSchema(): void {
          this.clearSchema();
          this.ensureSchema('CREATE TABLE ' + this.quotedTableName() + ' (id TEXT PRIMARY KEY, name VARCHAR(20), description VARCHAR(20))');
          this.ensureIndex('name', { name: 1 }, { unique: true });
      }
      
  }
  
  
  export async function main() {
      let persistence = new MyCassandraPersistence();
  
      persistence.configure(ConfigParams.fromTuples(
          "connection.host", "localhost",
          "connection.port", 9042,
          'connection.datacenter', 'datacenter1',
          "connection.username", "cassandra",
          "connection.password", "cassandra",
      ));
  
      await persistence.open(null);
      await persistence.clear(null);
      
  
      // Data objects
      let item = new MyData();
      let item1: MyData = { id: '1', name: 'name_1', description: 'description_1' }; 
      let item2: MyData = { id: '2', name: 'name_2', description: 'description_2' }; 
  
      // CRUD
      // Create
      item = await persistence.create(null, item1);
      console.log(`Created:\n ${JSON.stringify(item)} \n`);
  
      item = await persistence.create(null, item2);
      console.log(`Created:\n ${JSON.stringify(item)} \n`);
  
      // Read
      item = await persistence.getOneById(null,"1");
      console.log(`Read:\n ${JSON.stringify(item)} \n`);
  
      let idList = ['1', '2']
      let itemList = await persistence.getListByIds(null,idList);
      console.log(`Read:\n ${JSON.stringify(itemList)} \n`);
  
      // Update
      let item1U: MyData = { id: '1', name: 'name_1_Updated', description: 'description_1_Updated' }; 
      item = await persistence.update(null,item1U);
      console.log(`Updated:\n ${JSON.stringify(item)} \n`);
  
      let value1 = new AnyValueMap({ name: "name_2_Updated" });
      item = await persistence.updatePartially (null,"2",value1);
      console.log(`Updated:\n ${JSON.stringify(item)} \n`);
  
      let item1U2: MyData = { id: '1', name: 'name_1_Updated_2', description: 'description_1_Updated_2' }; 
      item = await persistence.set (null, item1U2);
      console.log(`Updated or created:\n ${JSON.stringify(item)} \n`);
  
      // Delete
      item = await persistence.deleteById(null,"1");
      console.log(`Deleted:\n ${JSON.stringify(item)} \n`);
    
      // let idList = ['1', '2']
      await persistence.deleteByIds(null,idList);
      console.log(`Deleted:\n ${JSON.stringify(idList)} \n`); 
  
      await persistence.close(null);
  
      console.log('Persistence closed!');
  }
  
Not available
Not available
Not available
Not available
Not available

Wrapping up

In this tutorial, we saw how to use the CassandraPersistence and IdentifiableCassandraPersistence components. The first accepts any type of data object, whereas the second requires that the data object is identifiable.

We learned how to perform CRUD operations with both components. We also saw that as IdentifiableCassandraPersistence is a subclass of CassandraPersistance, it inherits all the methods of the former and adds several methods based on ids. We understood that many of the methods in the CassandraPersistence class are private, and thus require to be redefined in our custom component.

Finally, and for both components, we created a program that combines all the learned methods into one piece of code.