Designing your persistence components

How Pip.Services facilitates code consistency.

Key takeaways

By designing your code in the correct way you can:

  1. Create a common set of instructions that are database independent.
  2. Easily transfer data from one database to another.

Introduction

In this tutorial, you will understand how to design your persistence in such a way that your code benefits from one of the main features of Pip.Services, which is symmetric code implementation.

In order to see this, we will create an example using two different databases (MySQL and PostgreSQL). Then, we will create a common set of instructions to manage CRUD operations and transfer data from one database to another.

Designing your persistence components

Pip.Services helps reduce the amount of code with its symmetric implementation principle. In this tutorial, we will demonstrate how this principle works by performing CRUD operations using shared code for two different databases: PostgreSQL and MySQL.

Pre-conditions

With a view to create our example, we need to import the following components:

Not available
Not available
import (
	conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	cdata "github.com/pip-services4/pip-services4-go/pip-services4-commons-go/data"
	cquery "github.com/pip-services4/pip-services4-go/pip-services4-data-go/query"
	postgrespersist "github.com/pip-services4/pip-services4-go/pip-services4-postgres-go/persistence"
	mysqlpersist "github.com/pip-services4/pip-services4-go/pip-services4-mysql-go/persistence"
)
Not available
from abc import ABC
from typing import Optional, Any, List
from typing import TypeVar

from pip_services4_data.data import IStringIdentifiable
from pip_services4_components.config import ConfigParams
from pip_services4_data.query import SortParams, PagingParams, DataPage, FilterParams
from pip_services4_commons.data import AnyValueMap
from pip_services4_mysql.persistence import IdentifiableMySqlPersistence
from pip_services4_postgres.persistence import IdentifiablePostgresPersistence
Not available

Among them, the two most important ones are IdentifiableMySqlPersistence and IdentifiablePostgrePersistence, which will be used to define our persistence components.

Data object

We define the following data object, which corresponds to the tables that we will be using in both databases.

Not available
Not available
type MyData struct {
	Id      string `bson:"_id" json:"id"`
	Key     string `bson:"key" json:"key"`
	Content string `bson:"content" json:"content"`
}

func (d *MyData) SetId(id string) {
	d.Id = id
}

func (d MyData) GetId() string {
	return d.Id
}

func (d MyData) Clone() MyData {
	return MyData{
		Id:      d.Id,
		Key:     d.Key,
		Content: d.Content,
	}
}
Not available
class MyData(IStringIdentifiable):
    def __init__(self, id: str = None, key: str = None, content: str = None):
        self.id: str = id
        self.key: str = key
        self.content: str = content
Not available

Thus, our tables will have three columns: id, key and content.

Common interface

Then, we create an interface that will be used to create persistence objects for both databases and, which states a set of CRUD operations for identifiable persistence objects:

Not available
Not available
type IMyDataPersistence interface {
	Set(ctx context.Context, item MyData) (result MyData, err error)

	Create(ctx context.Context, item MyData) (result MyData, err error)

	GetPageByFilter(ctx context.Context, filter cdata.FilterParams, paging cdata.PagingParams, sort cdata.SortParams) (page cdata.DataPage[MyData], err error)

	GetCountByFilter(ctx context.Context, filter cdata.FilterParams) (count int64, err error)

	GetListByFilter(ctx context.Context, filter cdata.FilterParams, sort cdata.SortParams) (items []MyData, err error)

	GetOneById(ctx context.Context, id string) (item MyData, err error)

	GetListByIds(ctx context.Context, ids []string) (items []MyData, err error)

	Update(ctx context.Context, item MyData) (result MyData, err error)

	UpdatePartially(ctx context.Context, id string, data cdata.AnyValueMap) (result MyData, err error)

	DeleteById(ctx context.Context, id string) (result MyData, err error)

	DeleteByIds(ctx context.Context, ids []string) error

	DeleteByFilter(ctx context.Context, filter cdata.FilterParams) (err error)
}
Not available
class IMyDataPersistence(ABC):
    
    # CRUD operations
    
    # Create
    def set(self, correlation_id: Optional[str], item: MyData) -> MyData:
        raise NotImplemented()
        
    def create(self, correlation_id: Optional[str], item: MyData) -> MyData:
        raise NotImplemented()
    
    # Retrive
    def get_page_by_filter(self, correlation_id: Optional[str], filter: FilterParams,
                           paging: Optional[PagingParams], sort: Optional[SortParams]) -> DataPage:
        raise NotImplemented()

    def get_count_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> int:
        raise NotImplemented()

    def get_list_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> List[MyData]:
        raise NotImplemented()

    def get_list_by_ids(self, correlation_id: Optional[str], ids: List[str]) -> List[MyData]:
        raise NotImplemented()

    def get_one_by_id(self, correlation_id: Optional[str], id: str) -> MyData:
        raise NotImplemented()

    # Update
    def update(self, correlation_id: Optional[str], item: MyData) -> MyData:
        raise NotImplemented()

    def update_partially(self, correlation_id: Optional[str], id: str, data: AnyValueMap) -> MyData:
        raise NotImplemented()
    
    # Delete
    def delete_by_id(self, correlation_id: Optional[str], id: str) -> MyData:
        raise NotImplemented()

    def delete_by_ids(self, correlation_id: Optional[str], ids: List[str]):
        raise NotImplemented()

    def delete_by_filter(self, correlation_id: Optional[str], filter: FilterParams):
        raise NotImplemented()
Not available

MySQL persistence

After that, we define a component that inherits from the IdentifiableMySqlPersistence class and our previously defined interface, and implements several CRUD methods.

Not available
Not available
type MyIdentifiableMySqlPersistence struct {
	*mysqlpersist.IdentifiableMySqlPersistence[MyData, string]
}

func NewMyIdentifiableMySqlPersistence() *MyIdentifiableMySqlPersistence {
	c := &MyIdentifiableMySqlPersistence{}
	c.IdentifiableMySqlPersistence = mysqlpersist.InheritIdentifiableMySqlPersistence[MyData, string](c, "mydata")
	return c
}

func (c *MyIdentifiableMySqlPersistence) DefineSchema() {
	c.ClearSchema()
	c.EnsureSchema("CREATE TABLE `" + c.TableName + "` (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)")
}

func (c *MyIdentifiableMySqlPersistence) composeFilter(filter cquery.FilterParams) string {
	key, keyOk := filter.GetAsNullableString("key")
	content, contentOk := filter.GetAsNullableString("content")

	filterObj := ""
	if keyOk && key != "" {
		filterObj += "`key`='" + key + "'"
	}
	if contentOk && content != "" {
		filterObj += "`content`='" + content + "'"
	}

	return filterObj
}

func (c *MyIdentifiableMySqlPersistence) composeSort(sort cquery.SortParams) string {
	composeSort := ""

	for _, field := range sort {
	    composeSort += field.Name
	    if field.Ascending {
	    	composeSort += " ASC"
	    } else {
	    	composeSort += " DESC"
	    }
    }

	return composeSort
}

func (c *MyIdentifiableMySqlPersistence) GetPageByFilter(ctx context.Context, filter cquery.FilterParams, paging cquery.PagingParams, sort cquery.SortParams) (page cquery.DataPage[MyData], err error) {
	return c.MySqlPersistence.GetPageByFilter(ctx, c.composeFilter(filter), paging, c.composeSort(sort), "")
}

func (c *MyIdentifiableMySqlPersistence) GetListByFilter(ctx context.Context, filter cquery.FilterParams, sort cquery.SortParams) (items []MyData, err error) {

	return c.MySqlPersistence.GetListByFilter(ctx, c.composeFilter(filter), c.composeSort(sort), "")
}

func (c *MyIdentifiableMySqlPersistence) GetCountByFilter(ctx context.Context, filter cquery.FilterParams) (count int64, err error) {
	return c.MySqlPersistence.GetCountByFilter(ctx, c.composeFilter(filter))
}

func (c *MyIdentifiableMySqlPersistence) DeleteByFilter(ctx context.Context, filter cquery.FilterParams) (err error) {
	return c.MySqlPersistence.DeleteByFilter(ctx, c.composeFilter(filter))
}
Not available
class MyIdentifiableMySqlPersistence(IdentifiableMySqlPersistence, IMyDataPersistence):

    def __init__(self):
        super().__init__('mydata')
    
    def _define_schema(self):
        self._clear_schema()
        self._ensure_schema(
            'CREATE TABLE ' + self._table_name + ' (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)')
        self._ensure_index(self._table_name + '_key', {'key': 1}, {'unique': True})

    def _compose_filter(self, filter: FilterParams):
        filter = filter or FilterParams()
        key = filter.get_as_nullable_string('key')
        content = filter.get_as_nullable_string('content')

        filter_condition = ''
        if key is not None:
            filter_condition += "`key`='" + key + "'"
        if content is not None:
            filter_condition += "`content`='" + content + "'"

        return filter_condition

    def _compose_sort(self, sort: SortParams):
        sort = sort or SortParams()
        compose_sort = ''

        for i, filed in enumerate(sort):
            compose_sort += filed.name + (' ASC' if filed.ascending else ' DESC')

        return compose_sort

    def get_page_by_filter(self, correlation_id: Optional[str], filter: FilterParams, paging: PagingParams,
                           sort: SortParams) -> DataPage:
        return super().get_page_by_filter(correlation_id, self._compose_filter(filter), paging,
                                          self._compose_sort(sort), None)

    def get_count_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> int:
        return super().get_count_by_filter(correlation_id, self._compose_filter(filter))

    def get_list_by_filter(self, correlation_id: Optional[str], filter: FilterParams, sort: SortParams) -> List[MyData]:
        return super().get_list_by_filter(correlation_id, self._compose_filter(filter), self._compose_sort(sort), None)

    def get_one_random(self, correlation_id: Optional[str], filter: FilterParams) -> MyData:
        return super().get_one_random(correlation_id, self._compose_filter(filter))

    def delete_by_filter(self, correlation_id: Optional[str], filter: FilterParams):
        return super().delete_by_filter(correlation_id, self._compose_filter(filter))

Not available

PostgreSQL persistence

Similar to what we did in the previous step, we now define a component that inherits from the IdentifiablePostgrePersistence component and the interface previously defined, and implements a set of CRUD operations.

Not available
Not available

type MyPostgresPersistence struct {
	*postgrespersist.IdentifiablePostgresPersistence[MyData, string]
}

func NewMyPostgresPersistence() *MyPostgresPersistence {
	c := &MyPostgresPersistence{}
	c.IdentifiablePostgresPersistence = postgrespersist.InheritIdentifiablePostgresPersistence[MyData, string](c, "mydata")
	return c
}

func (c *MyPostgresPersistence) DefineSchema() {
	c.ClearSchema()
	c.EnsureSchema("CREATE TABLE " + c.QuotedTableName() + " (\"id\" TEXT PRIMARY KEY, \"key\" TEXT, \"content\" TEXT)")
}

func (c *MyPostgresPersistence) composeFilter(filter cquery.FilterParams) string {
	key, keyOk := filter.GetAsNullableString("key")
	content, contentOk := filter.GetAsNullableString("content")

	filterObj := ""
	if keyOk && key != "" {
		filterObj += "key='" + key + "'"
	}
	if contentOk && content != "" {
		filterObj += "content='" + content + "'"
	}

	return filterObj
}

func (c *MyPostgresPersistence) composeSort(sort cquery.SortParams) string {
	composeSort := ""

	for _, field := range sort {
		composeSort += field.Name
		if field.Ascending {
			composeSort += " ASC"
		} else {
			composeSort += " DESC"
		}
	}

	return composeSort
}

func (c *MyPostgresPersistence) GetPageByFilter(ctx context.Context, filter cquery.FilterParams, paging cquery.PagingParams, sort cquery.SortParams) (page cquery.DataPage[MyData], err error) {
	return c.PostgresPersistence.GetPageByFilter(ctx, c.composeFilter(filter), paging, c.composeSort(sort), "")
}

func (c *MyPostgresPersistence) GetListByFilter(ctx context.Context, filter cquery.FilterParams, sort cquery.SortParams) (items []MyData, err error) {

	return c.PostgresPersistence.GetListByFilter(ctx, c.composeFilter(filter), c.composeSort(sort), "")
}

func (c *MyPostgresPersistence) GetCountByFilter(ctx context.Context, filter cquery.FilterParams) (count int64, err error) {
	return c.PostgresPersistence.GetCountByFilter(ctx, c.composeFilter(filter))
}

func (c *MyPostgresPersistence) DeleteByFilter(ctx context.Context, filter cquery.FilterParams) (err error) {
	return c.PostgresPersistence.DeleteByFilter(ctx, c.composeFilter(filter))
}
Not available
class MyIdentifiablePostgresPersistence(IdentifiablePostgresPersistence, IMyDataPersistence):

    def __init__(self):
        super().__init__('mydata')

    def _define_schema(self):
        self._clear_schema()
        self._ensure_schema('CREATE TABLE ' + self._table_name + ' (id TEXT PRIMARY KEY, key TEXT, content TEXT)')
        self._ensure_index(self._table_name + '_key', {'key': 1}, {'unique': True})

    def _compose_filter(self, filter: FilterParams):
        filter = filter or FilterParams()
        key = filter.get_as_nullable_string('key')
        content = filter.get_as_nullable_string('content')

        filter_condition = ''
        if key is not None:
            filter_condition += "key='" + key + "'"
        if content is not None:
            filter_condition += "content='" + content + "'"

        return filter_condition

    def _compose_sort(self, sort: SortParams):
        sort = sort or SortParams()
        compose_sort = ''

        for i, filed in enumerate(sort):
            compose_sort += filed.name + (' ASC' if filed.ascending else ' DESC')

        return compose_sort

    def get_page_by_filter(self, correlation_id: Optional[str], filter: FilterParams, paging: PagingParams,
                           sort: SortParams) -> DataPage:
        return super().get_page_by_filter(correlation_id, self._compose_filter(filter), paging,
                                          self._compose_sort(sort), None)

    def get_count_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> int:
        return super().get_count_by_filter(correlation_id, self._compose_filter(filter))

    def get_list_by_filter(self, correlation_id: Optional[str], filter: FilterParams, sort: SortParams) -> List[MyData]:
        return super().get_list_by_filter(correlation_id, self._compose_filter(filter), self._compose_sort(sort), None)

    def get_one_random(self, correlation_id: Optional[str], filter: FilterParams) -> MyData:
        return super().get_one_random(correlation_id, self._compose_filter(filter))

    def delete_by_filter(self, correlation_id: Optional[str], filter: FilterParams):
        return super().delete_by_filter(correlation_id, self._compose_filter(filter))

Not available

Defining the working database

In order to connect to our databases, we need to define our connection parameters. For our MySQL database, the following example applies:

Not available
Not available
host := "localhost"
port := "3306"
db_name := "pip"
user := "root"
password := ""
Not available
host = 'localhost'
port = 3306
db_name = 'pip'
user = 'root'
password = 'password'
Not available

Next, we create an instance of our component and configure it.

Not available
Not available
database1 := NewMyMySqlPersistence()
database1.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	"connection.host", host,
	"connection.port", port,
	"connection.database", db_name,
	"credential.username", user,
	"credential.password", password,
))
Not available
from pip_services4_components.config import ConfigParams

database1 = MyIdentifiableMySqlPersistence()
database1.configure(ConfigParams.from_tuples(
 "connection.host", "localhost",
    "connection.port", 3306,
    "credential.username", "root",
    "credential.password", "password",
    "connection.database", "pip"
))
Not available

And, finally, we connect it to our database.

Not available
Not available
err := database1.Open(context.Background())
Not available
database1.open(None)
Not available

If instead, we want to work with our PostgreSQL database, we could define our configuration as

Not available
Not available
host := "localhost"
port := "5432"
db_name := "test"
user := "postgres"
password := "postgres"
Not available
host = 'localhost'
port = 5432
db_name = 'pip'
user = 'postgres'
password = 'admin'
Not available

And we create an instance of and configure our PostgreSQL component

Not available
Not available
database2 := NewMyPostgresPersistence()
database2.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	"connection.host", host,
	"connection.port", port,
	"connection.database", db_name,
	"credential.username", user,
	"credential.password", password,
))
Not available
database2 = MyIdentifiablePostgresPersistence()
database2.configure(ConfigParams.from_tuples(
   "connection.host", "localhost",
    "connection.port", 5431,
    "connection.database", "pip1",
    "credential.user", "postgres", 
    "credential.password", "admin"
))
Not available

Then, we connect it to our PostgreSQL database.

Not available
Not available
err = database2.Open(context.Background())
Not available
database2.open(None)
Not available

Once we have connected to the database that we want to work with, we define a new variable called persistence, which is of type

Not available
Not available
var persistence IMyDataPersistence
Not available
persistence: IMyDataPersistence
Not available

Following this, we equate it to the connector we want to use. If we want to use our MySQL database, we write

Not available
Not available
persistence = database1
Not available
persistence = database1
Not available

Or, if we prefer to use our PostgreSQL database, we use

Not available
Not available
persistence = database2
Not available
persistence = database2
Not available

CRUD operations

Whatever database we decide to use, we can now perform different CRUD operations by using the persistence object and any of the methods defined in our interface or inherited from the base classes. As our implementations are symmetric, both databases call methods with the same names.

For example

Create

Here we use the create() method and we insert ten records in our database.

Not available
Not available
for i := 0; i < 20; i++ {
	index := strconv.Itoa(i)
	data := MyData{Id: index, Key: "key " + index, Content: "content " + index}
	res, err := persistence.Create(context.Background(), data)
	if err != nil {
		panic(err)
	}
	fmt.Println(res)
}
Not available
for i in range(0, 20):
    data = MyData(str(i), f'key {i}', f'content {i}')
    result = persistence.create(None, data)
Not available
Retrieve

Once we have some records in our database, we can retrieve them by using one of the retrieving methods. In our example below, we use the get_list_by_ids() because we are working with identifiable records.

Not available
Not available
ids := []string{"3", "4", "5"}
myDataList, err := persistence.GetListByIds(context.Background(), ids)
Not available
ids = [str(i) for i in range(3,7)]
my_data_list = persistence.get_list_by_ids(None, ids)
Not available

However, we could have also used any of the filter-based methods defined in our interface. For example:

Not available
Not available
result, err := persistence.GetPageByFilter(context.Background(), *cquery.NewFilterParamsFromTuples("key", "key 8"), *cquery.NewEmptyPagingParams(), *cquery.NewEmptySortParams())
Not available
result = persistence.get_page_by_filter(None, FilterParams.from_tuples('key', 'key 8'), None, None)
Not available

Which, in our example, returns a DataPage object with the following values

Not available
Not available
result.Data[0].Id;       // Returns '8'
result.Data[0].Key;      // Returns 'key 8'
result.Data[0].Content;  // Returns 'content 8'
Not available
result.data[0].id       # Returns '8'
result.data[0].key     # Returns 'key 8'
result.data[0].content  # Returns 'content 8'
Not available
Update

We can update a record by using the update() method. In the following example, we define a new instance for our record with id equal to ‘1’ and we update the content to ‘Updated content 1’.

Not available
Not available
newValue := MyData{Id: "1", Key: "key 1", Content: "Updated content 1"}
res, err := persistence.Update(context.Background(), newValue)
Not available
new_values = MyData('1', 'key 1', 'Updated content 1')

persistence.update(None, new_values)
Not available
Delete

Finally, we can delete some of our records with the deleteByIds method

Not available
Not available
ids = []string{"0", "1"}
err = persistence.DeleteByIds(context.Background(), ids)
Not available
ids = ['0','1']
persistence.delete_by_ids(None, ids)
Not available

Or with the deleteByFilter() method

Not available
Not available
err = persistence.DeleteByFilter(context.Background(), *cquery.NewFilterParamsFromTuples("key", "key 7"))
Not available
persistence.delete_by_filter(None, FilterParams.from_tuples('key', 'key 7'))
Not available

Data transfer

This approach defines an easy and practical way to migrate tables from one database to another. Let’s suppose that we want to transfer the data existing in our MySQL database to a table in our PostgreSQL database.

To achieve this, first, we retrieve the data from the table in MySQL and we obtain a list with elements of type MyData, which we call myDataList. As both databases use the same data structure, we just need to insert those rows via the create() method, which accepts a list of MyData elements as input. The following code shows this:

Not available
Not available
// Step 1:  we extract the data from the MySQL database
persistence = database1
ids3 := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
myDataList, err = persistence.GetListByIds(context.Background(), ids3)
if err != nil {
	panic(err)
}
// Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2
for _, item := range myDataList {
	result, err := persistence.Create(context.Background(), item)
	if err != nil {
		panic(err)
	}
	fmt.Println(result)
}
Not available
# Step 1:  we extract the data from the MySQL database
persistence = database1
ids = [str(i) for i in range(0,10)]
my_data_list = persistence.get_list_by_ids('123', ids)

# Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2
for i in my_data_list:
    result = persistence.create(None, i)
Not available

Wrapping up

In this tutorial, we have seen how to design our code so we can benefit from the symmetric implementation feature that defines the Pip.Services toolkit.

We did this through an example that uses a MySQL and a PostgreSQL database. First, we created a common data object, an interface that provides a common structure to our database components, and a component per database.

Then, we showed how to connect those database components to their respective databases and we equated their instances to a variable that we named persistence, which was used to define our CRUD operations.

In this manner, we saw how we can define a common set of instructions that are used by any of the databases that we want to work with.

We also saw how to benefit from this design by transferring a set of records from our MySQL database to our PostgreSQL database.

In conclusion, Pip.Services provides a way to simplify our code and reduce the amount of work needed to work with persistence components.