Designing your persistence components

How Pip.Services facilitates code consistency.

Key takeaways

By designing your code in the correct way you can:

  1. Create a common set of instructions that are database independent.
  2. Easily transfer data from one database to another.

Introduction

In this tutorial, you will understand how to design your persistence in such a way that your code benefits from one of the main features of Pip.Services, which is symmetric code implementation.

In order to see this, we will create an example using two different databases (MySQL and PostgreSQL). Then, we will create a common set of instructions to manage CRUD operations and transfer data from one database to another.

Designing your persistence components

Pip.services allows you to reduce the amount of code through its symmetric implementation. In this tutorial, we will see how to perform CRUD operations using one set of common code for two different databases. In order to understand this, we will use an example that considers PostgreSQL and MySQL.

Pre-conditions

With a view to create our example, we need to import the following components. Among them, the two most important ones are IdentifiableMySqlPersistence and IdentifiablePostgrePersistence from Pip.Services, which will be used to define our persistence components.

import { 
    AnyValueMap, ConfigParams, DataPage, 
    FilterParams, IStringIdentifiable, 
    PagingParams, SortParams 
} from "pip-services3-commons-nodex";

import { IdentifiableMySqlPersistence } from "pip-services3-mysql-nodex";
import { IdentifiablePostgresPersistence } from "pip-services3-postgres-nodex";
using PipServices3.Commons.Config;
using PipServices3.Commons.Data;
using PipServices3.MySql.Persistence;
using PipServices3.Postgres.Persistence
import (
	conf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	cdata "github.com/pip-services3-gox/pip-services3-commons-gox/data"
	postgres "github.com/pip-services3-gox/pip-services3-postgres-gox/persistence"
	mysql "github.com/pip-services3-gox/pip-services3-mysql-gox/persistence"
)

import 'package:pip_services3_commons/pip_services3_commons.dart';
import 'package:pip_services3_mysql/pip_services3_mysql.dart';
import 'package:pip_services3_postgres/pip_services3_postgres.dart';
from abc import ABC
from typing import Optional, Any, List
from typing import TypeVar

from pip_services3_commons.data import IStringIdentifiable
from pip_services3_commons.config import ConfigParams
from pip_services3_commons.data import SortParams, PagingParams, DataPage, AnyValueMap, FilterParams
from pip_services3_mysql.persistence import IdentifiableMySqlPersistence
from pip_services3_postgres.persistence import IdentifiablePostgresPersistence
Not available

Data object

We define the following data object, which corresponds to the tables that we will be using in both databases.

export class MyData implements IStringIdentifiable {
    public id: string;
    public key: string;
    public content: string;
}

using System.Runtime.Serialization;

[DataContract]
public class MyData : IStringIdentifiable
{
    [DataMember(Name = "id")]
    public string Id { get; set; }

    [DataMember(Name = "key")]
    public string Key { get; set; }

    [DataMember(Name = "content")]
    public string Content { get; set; }
}

type MyData struct {
	Id      string `bson:"_id" json:"id"`
	Key     string `bson:"key" json:"key"`
	Content string `bson:"content" json:"content"`
}

func (d *MyData) SetId(id string) {
	d.Id = id
}

func (d MyData) GetId() string {
	return d.Id
}

func (d MyData) Clone() MyData {
	return MyData{
		Id:      d.Id,
		Key:     d.Key,
		Content: d.Content,
	}
}
class MyData implements IStringIdentifiable, ICloneable {
  @override
  String? id;
  String? key;
  String? content;

  MyData();

  MyData.from(this.id, this.key, this.content);

  Map<String, dynamic> toJson() {
    return <String, dynamic>{'id': id, 'key': key, 'content': content};
  }

  void fromJson(Map<String, dynamic> json) {
    id = json['id'];
    key = json['key'];
    content = json['content'];
  }

  @override
  MyData clone() {
    return MyData.from(id, key, content);
  }
}

class MyData(IStringIdentifiable):
    def __init__(self, id: str = None, key: str = None, content: str = None):
        self.id: str = id
        self.key: str = key
        self.content: str = content
Not available

Thus, our tables will have three columns, namely id, key and content.

Common interface

Now, we create an interface that will be used to create persistence objects for both databases and, which states a set of CRUD operations for identifiable persistence objects:

export interface IMyDataPersistence {
    set(correlationId: string, item: MyData): Promise<MyData>;

    create(correlationId: string, item: MyData): Promise<MyData>;

    getPageByFilter(correlationId: string, filter: FilterParams, paging: PagingParams, sort: SortParams): Promise<DataPage<MyData>>;

    getCountByFilter(correlationId: string, filter: FilterParams): Promise<number>;

    getListByFilter(correlationId: string, filter: FilterParams, sort: SortParams): Promise<MyData[]>;

    getOneById(correlationId: string, id: MyData): Promise<MyData>;

    getListByIds(correlationId: string, ids: MyData[]): Promise<MyData[]>;

    update(correlationId: string, item: MyData): Promise<MyData>;

    updatePartially(correlationId: string, id: string, data: AnyValueMap): Promise<MyData>;

    deleteById(correlationId: string, id: string): Promise<MyData>;

    deleteByIds(correlationId: string, ids: string[]): Promise<void>;

    deleteByFilter(correlationId: string, filter: FilterParams): Promise<void>;
}
public interface IMyDataPersistence
{
    Task<MyData> SetAsync(string correlationId, MyData item);

    Task<MyData> CreateAsync(string correlationId, MyData item);

    Task<DataPage<MyData>> GetPageByFilterAsync(string correlationId, FilterParams filter, PagingParams paging = null, SortParams sort = null);

    Task<long> GetCountByFilterAsync(string correlationId, FilterParams filter);

    Task<List<MyData>> GetListByFilterAsync(string correlationId, FilterParams filter);

    Task<MyData> GetOneByIdAsync(string correlationId, string id);

    Task<List<MyData>> GetListByIdsAsync(string correlationId, string[] ids);

    Task<MyData> UpdateAsync(string correlationId, MyData item);

    Task<MyData> UpdatePartially(string correlationId, string id, AnyValueMap data);

    Task<MyData> DeleteByIdAsync(string correlationId, string id);

    Task DeleteByIdsAsync(string correlationId, string[] ids);

    Task DeleteByFilterAsync(string correlationId, FilterParams filter);
}
type IMyDataPersistence interface {
	Set(ctx context.Context, correlationId string, item MyData) (result MyData, err error)

	Create(ctx context.Context, correlationId string, item MyData) (result MyData, err error)

	GetPageByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, paging cdata.PagingParams, sort cdata.SortParams) (page cdata.DataPage[MyData], err error)

	GetCountByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (count int64, err error)

	GetListByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, sort cdata.SortParams) (items []MyData, err error)

	GetOneById(ctx context.Context, correlationId string, id string) (item MyData, err error)

	GetListByIds(ctx context.Context, correlationId string, ids []string) (items []MyData, err error)

	Update(ctx context.Context, correlationId string, item MyData) (result MyData, err error)

	UpdatePartially(ctx context.Context, correlationId string, id string, data cdata.AnyValueMap) (result MyData, err error)

	DeleteById(ctx context.Context, correlationId string, id string) (result MyData, err error)

	DeleteByIds(ctx context.Context, correlationId string, ids []string) error

	DeleteByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (err error)
}
abstract class IMyDataPersistence {
  Future<MyData?> set(String? correlationId, MyData? item);

  Future<MyData?> create(String? correlationId, MyData? item);

  Future<DataPage<MyData>> getPageByFilter(String? correlationId,
      FilterParams filter, PagingParams? paging, SortParams? sort);

  Future<int> getCountByFilter(String? correlationId, FilterParams filter);

  Future<List<MyData>> getListByFilter(
      String? correlationId, FilterParams filter, SortParams sort);

  Future<MyData?> getOneById(String? correlationId, String id);

  Future<List<MyData>> getListByIds(String? correlationId, List<String> ids);

  Future<MyData?> update(String? correlationId, MyData? item);

  Future<MyData?> updatePartially(
      String? correlationId, String? id, AnyValueMap? data);

  Future<MyData?> deleteById(String? correlationId, String? id);

  Future<void> deleteByIds(String? correlationId, List<String> ids);

  Future<void> deleteByFilter(String? correlationId, FilterParams filter);
}
class IMyDataPersistence(ABC):
    
    # CRUD operations
    
    # Create
    def set(self, correlation_id: Optional[str], item: MyData) -> MyData:
        raise NotImplemented()
        
    def create(self, correlation_id: Optional[str], item: MyData) -> MyData:
        raise NotImplemented()
    
    # Retrive
    def get_page_by_filter(self, correlation_id: Optional[str], filter: FilterParams,
                           paging: Optional[PagingParams], sort: Optional[SortParams]) -> DataPage:
        raise NotImplemented()

    def get_count_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> int:
        raise NotImplemented()

    def get_list_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> List[MyData]:
        raise NotImplemented()

    def get_list_by_ids(self, correlation_id: Optional[str], ids: List[str]) -> List[MyData]:
        raise NotImplemented()

    def get_one_by_id(self, correlation_id: Optional[str], id: str) -> MyData:
        raise NotImplemented()

    # Update
    def update(self, correlation_id: Optional[str], item: MyData) -> MyData:
        raise NotImplemented()

    def update_partially(self, correlation_id: Optional[str], id: str, data: AnyValueMap) -> MyData:
        raise NotImplemented()
    
    # Delete
    def delete_by_id(self, correlation_id: Optional[str], id: str) -> MyData:
        raise NotImplemented()

    def delete_by_ids(self, correlation_id: Optional[str], ids: List[str]):
        raise NotImplemented()

    def delete_by_filter(self, correlation_id: Optional[str], filter: FilterParams):
        raise NotImplemented()
Not available

MySQL persistence

After that, we define a component that inherits from the IdentifiableMySqlPersistence class and our previously defined interface, and implements several CRUD methods.

export class MyIdentifiableMySqlPersistence extends IdentifiableMySqlPersistence<MyData, string> implements IMyDataPersistence {
    public constructor() {
        super('mydata');
    }

    protected defineSchema(): void {
        this.clearSchema();
        this.ensureSchema('CREATE TABLE `' + this._tableName + '` (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)');
        this.ensureIndex(this._tableName + '_key', { key: 1 }, { unique: true });
    }

    private composeFilter(filter: FilterParams): string {
        filter ??= new FilterParams();
        let key = filter.getAsNullableString("key");
        let content = filter.getAsNullableString("content");

        let filterCondition = "";
        if (key != null)
            filterCondition += "`key`='" + key + "'";
        if (content != null)
            filterCondition += "`content`='" + content + "'";

        return filterCondition;
    }

    private composeSort(sort: SortParams): string {
        sort ??= new SortParams();
        var composeSort = "";

        for (var i = 0; i < sort.length; i++)
            composeSort += sort[i].name + (sort[i].ascending ? " ASC" : " DESC");

        return composeSort != '' ? composeSort : null;
    }

    public getPageByFilter(correlationId: string, filter: FilterParams, paging: PagingParams, sort: SortParams): Promise<DataPage<MyData>> {
        return super.getPageByFilter(correlationId, this.composeFilter(filter), paging, this.composeSort(sort), null);
    }

    public getCountByFilter(correlationId: string, filter: FilterParams): Promise<number> {
        return super.getCountByFilter(correlationId, this.composeFilter(filter));
    }

    public getListByFilter(correlationId: string, filter: FilterParams, sort: SortParams): Promise<MyData[]> {
        return super.getListByFilter(correlationId, this.composeFilter(filter), this.composeSort(sort), null);
    }

    public deleteByFilterParams(correlationId: string, filter: FilterParams): Promise<void> {
        return super.deleteByFilter(correlationId, this.composeFilter(filter));
    }
}
public class MyIdentifiableMySqlPersistence : IdentifiableMySqlPersistence<MyData, string>, IMyDataPersistence
{
    public MyIdentifiableMySqlPersistence(): base("mydata") {}

    protected override void DefineSchema()
    {
        ClearSchema();
        EnsureSchema($"CREATE TABLE {_tableName}   (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)");
        EnsureIndex($"{_tableName}_key", new Dictionary<string, bool> { { "key", true } }, new PipServices3.MySql.Persistence.IndexOptions { Unique = true });
    }

    private static string ComposeFilter(FilterParams filter)
    {
        filter ??= new FilterParams();
        var key = filter.GetAsNullableString("key");
        var content = filter.GetAsNullableString("content");

        var filterCondition = "";
        if (key != null)
            filterCondition += "`key`='" + key + "'";
        if (content != null)
            filterCondition += "`content`='" + content + "'";

        return filterCondition;
    }

    private static string ComposeSort(SortParams sort)
    {
        sort ??= new SortParams();
        var composeSort = "";

        for (var i = 0; i < sort.Count; i++)
            composeSort += sort[i].Name + (sort[i].Ascending ? " ASC" : " DESC");

        return composeSort;
    }

    public Task<DataPage<MyData>> GetPageByFilterAsync(string correlationId, FilterParams filter, PagingParams paging = null, SortParams sort = null)
    {
        return base.GetPageByFilterAsync(correlationId, ComposeFilter(filter), paging, ComposeSort(sort));
    }

    public Task<long> GetCountByFilterAsync(string correlationId, FilterParams filter)
    {
        return base.GetCountByFilterAsync(correlationId, ComposeFilter(filter));
    }

    public Task<List<MyData>> GetListByFilterAsync(string correlationId, FilterParams filter)
    {
        return base.GetListByFilterAsync(correlationId, ComposeFilter(filter));
    }

    public Task DeleteByFilterAsync(string correlationId, FilterParams filter)
    {
        return base.DeleteByFilterAsync(correlationId, ComposeFilter(filter));
    }
}
type MyIdentifiableMySqlPersistence struct {
	*mysqlpersist.IdentifiableMySqlPersistence[MyData, string]
}

func NewMyIdentifiableMySqlPersistence() *MyIdentifiableMySqlPersistence {
	c := &MyIdentifiableMySqlPersistence{}
	c.IdentifiableMySqlPersistence = mysqlpersist.InheritIdentifiableMySqlPersistence[MyData, string](c, "mydata")
	return c
}

func (c *MyIdentifiableMySqlPersistence) DefineSchema() {
	c.ClearSchema()
	c.EnsureSchema("CREATE TABLE `" + c.TableName + "` (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)")
}

func (c *MyIdentifiableMySqlPersistence) composeFilter(filter cdata.FilterParams) string {
	key, keyOk := filter.GetAsNullableString("key")
	content, contentOk := filter.GetAsNullableString("content")

	filterObj := ""
	if keyOk && key != "" {
		filterObj += "`key`='" + key + "'"
	}
	if contentOk && content != "" {
		filterObj += "`content`='" + content + "'"
	}

	return filterObj
}

func (c *MyIdentifiableMySqlPersistence) composeSort(sort cdata.SortParams) string {
	composeSort := ""

	for _, field := range sort {
	    composeSort += field.Name
	    if field.Ascending {
	    	composeSort += " ASC"
	    } else {
	    	composeSort += " DESC"
	    }
    }

	return composeSort
}

func (c *MyIdentifiableMySqlPersistence) GetPageByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, paging cdata.PagingParams, sort cdata.SortParams) (page cdata.DataPage[MyData], err error) {
	return c.MySqlPersistence.GetPageByFilter(ctx, correlationId, c.composeFilter(filter), paging, c.composeSort(sort), "")
}

func (c *MyIdentifiableMySqlPersistence) GetListByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, sort cdata.SortParams) (items []MyData, err error) {

	return c.MySqlPersistence.GetListByFilter(ctx, correlationId, c.composeFilter(filter), c.composeSort(sort), "")
}

func (c *MyIdentifiableMySqlPersistence) GetCountByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (count int64, err error) {
	return c.MySqlPersistence.GetCountByFilter(ctx, correlationId, c.composeFilter(filter))
}

func (c *MyIdentifiableMySqlPersistence) DeleteByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (err error) {
	return c.MySqlPersistence.DeleteByFilter(ctx, correlationId, c.composeFilter(filter))
}
class MyIdentifiableMySqlPersistence
    extends IdentifiableMySqlPersistence<MyData, String>
    implements IMyDataPersistence {
  MyIdentifiableMySqlPersistence() : super('mydata', null);

  @override
  void defineSchema_() {
    clearSchema();
    ensureSchema_('CREATE TABLE IF NOT EXISTS `' +
        tableName_! +
        '` (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)');
    ensureIndex_(tableName_! + '_key', {'key': 1}, {'unique': true})
  }

  String _composeFilter(FilterParams? filter) {
    filter ??= FilterParams();
    var key = filter.getAsNullableString('key');
    var content = filter.getAsNullableString('content');

    var filterCondition = '';
    if (key != null) {
      filterCondition += "'key'='" + key + "'";
    }
    if (content != null) {
      filterCondition += "content='" + content + "'";
    }

    return filterCondition;
  }

  String? _composeSort(SortParams? sort) {
    sort ??=  SortParams(null);
    var composeSort = '';

    for (var i = 0; i < sort.length; i++) {
      composeSort += sort[i].name! + (sort[i].ascending ? ' ASC' : ' DESC');
    }

    return composeSort != '' ? composeSort : null;
  }

  @override
  Future<DataPage<MyData>> getPageByFilter(String? correlationId,
      FilterParams filter, PagingParams? paging, SortParams? sort) {
    return super.getPageByFilter_(correlationId, _composeFilter(filter), paging, _composeSort(sort), null);
  }

    @override
  Future<int> getCountByFilter(String? correlationId, FilterParams filter) {
    return super.getCountByFilter_(correlationId, _composeFilter(filter));
  }

    @override
  Future<List<MyData>> getListByFilter(
      String? correlationId, FilterParams filter, SortParams sort) {
    return super.getListByFilter_(correlationId, _composeFilter(filter), _composeSort(sort), null);
  }

  @override
  Future<void> deleteByFilter(String? correlationId, FilterParams filter) {
    return super.deleteByFilter_(correlationId, _composeFilter(filter));
  }
}
class MyIdentifiableMySqlPersistence(IdentifiableMySqlPersistence, IMyDataPersistence):

    def __init__(self):
        super().__init__('mydata')
    
    def _define_schema(self):
        self._clear_schema()
        self._ensure_schema(
            'CREATE TABLE ' + self._table_name + ' (id VARCHAR(32) PRIMARY KEY, `key` VARCHAR(50), `content` TEXT)')
        self._ensure_index(self._table_name + '_key', {'key': 1}, {'unique': True})

    def _compose_filter(self, filter: FilterParams):
        filter = filter or FilterParams()
        key = filter.get_as_nullable_string('key')
        content = filter.get_as_nullable_string('content')

        filter_condition = ''
        if key is not None:
            filter_condition += "`key`='" + key + "'"
        if content is not None:
            filter_condition += "`content`='" + content + "'"

        return filter_condition

    def _compose_sort(self, sort: SortParams):
        sort = sort or SortParams()
        compose_sort = ''

        for i, filed in enumerate(sort):
            compose_sort += filed.name + (' ASC' if filed.ascending else ' DESC')

        return compose_sort

    def get_page_by_filter(self, correlation_id: Optional[str], filter: FilterParams, paging: PagingParams,
                           sort: SortParams) -> DataPage:
        return super().get_page_by_filter(correlation_id, self._compose_filter(filter), paging,
                                          self._compose_sort(sort), None)

    def get_count_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> int:
        return super().get_count_by_filter(correlation_id, self._compose_filter(filter))

    def get_list_by_filter(self, correlation_id: Optional[str], filter: FilterParams, sort: SortParams) -> List[MyData]:
        return super().get_list_by_filter(correlation_id, self._compose_filter(filter), self._compose_sort(sort), None)

    def get_one_random(self, correlation_id: Optional[str], filter: FilterParams) -> MyData:
        return super().get_one_random(correlation_id, self._compose_filter(filter))

    def delete_by_filter(self, correlation_id: Optional[str], filter: FilterParams):
        return super().delete_by_filter(correlation_id, self._compose_filter(filter))

Not available

PostgreSQL persistence

Similar to what we did in the previous step, we now define a component that inherits from the IdentifiablePostgrePersistence component and the interface previously defined, and implements a set of CRUD operations.

export class MyIdentifiablePostgresPersistence extends IdentifiablePostgresPersistence<MyData, string> implements IMyDataPersistence {
    public constructor() {
        super('mydata');
    }

    protected defineSchema(): void {
        this.clearSchema();
        this.ensureSchema('CREATE TABLE ' + this._tableName + ' (id TEXT PRIMARY KEY, key TEXT, content TEXT)');
        this.ensureIndex(this._tableName + '_key', { key: 1 }, { unique: true });
    }

    private composeFilter(filter: FilterParams): string {
        filter ??= new FilterParams();
        let key = filter.getAsNullableString("key");
        let content = filter.getAsNullableString("content");

        let filterCondition = "";
        if (key != null)
            filterCondition += "key='" + key + "'";
        if (content != null)
            filterCondition += "content='" + content + "'";

        return filterCondition;
    }

    private composeSort(sort: SortParams): string {
        sort ??= new SortParams();
        var composeSort = "";

        for (var i = 0; i < sort.length; i++)
            composeSort += sort[i].name + (sort[i].ascending ? " ASC" : " DESC");

        return composeSort != '' ? composeSort : null;
    }

    public getPageByFilter(correlationId: string, filter: FilterParams, paging: PagingParams, sort: SortParams): Promise<DataPage<MyData>> {
        return super.getPageByFilter(correlationId, this.composeFilter(filter), paging, this.composeSort(sort), null);
    }

    public getCountByFilter(correlationId: string, filter: FilterParams): Promise<number> {
        return super.getCountByFilter(correlationId, this.composeFilter(filter));
    }

    public getListByFilter(correlationId: string, filter: FilterParams, sort: SortParams): Promise<MyData[]> {
        return super.getListByFilter(correlationId, this.composeFilter(filter), this.composeSort(sort), null);
    }

    public deleteByFilterParams(correlationId: string, filter: FilterParams): Promise<void> {
        return super.deleteByFilter(correlationId, this.composeFilter(filter));
    }
}
public class MyIdentifiablePostgresPersistence : IdentifiablePostgresPersistence<MyData, string>, IMyDataPersistence
{
    public MyIdentifiablePostgresPersistence() : base("mydata") { }

    protected override void DefineSchema()
    {
        ClearSchema();
        EnsureSchema($"CREATE TABLE {_tableName} (id TEXT PRIMARY KEY, key TEXT, content TEXT)");
        EnsureIndex($"{_tableName}_key", new Dictionary<string, bool> { { "key", true } }, new PipServices3.Postgres.Persistence.IndexOptions { Unique = true });
    }

    private static string ComposeFilter(FilterParams filter)
    {
        filter ??= new FilterParams();
        var key = filter.GetAsNullableString("key");
        var content = filter.GetAsNullableString("content");

        var filterCondition = "";
        if (key != null)
            filterCondition += "key='" + key + "'";
        if (content != null)
            filterCondition += "content='" + content + "'";

        return filterCondition;
    }

    private static string ComposeSort(SortParams sort)
    {
        sort ??= new SortParams();
        var composeSort = "";

        for (var i = 0; i < sort.Count; i++)
            composeSort += sort[i].Name + (sort[i].Ascending ? " ASC" : " DESC");

        return composeSort;
    }

    public Task<DataPage<MyData>> GetPageByFilterAsync(string correlationId, FilterParams filter, PagingParams paging = null, SortParams sort = null)
    {
        return base.GetPageByFilterAsync(correlationId, ComposeFilter(filter), paging, ComposeSort(sort));
    }

    public Task<long> GetCountByFilterAsync(string correlationId, FilterParams filter)
    {
        return base.GetCountByFilterAsync(correlationId, ComposeFilter(filter));
    }

    public Task<List<MyData>> GetListByFilterAsync(string correlationId, FilterParams filter)
    {
        return base.GetListByFilterAsync(correlationId, ComposeFilter(filter));
    }

    public Task DeleteByFilterAsync(string correlationId, FilterParams filter)
    {
        return base.DeleteByFilterAsync(correlationId, ComposeFilter(filter));
    }
}
type MyData struct {
	Id      string `bson:"_id" json:"id"`
	Key     string `bson:"key" json:"key"`
	Content string `bson:"content" json:"content"`
}

func (d *MyData) SetId(id string) {
	d.Id = id
}

func (d MyData) GetId() string {
	return d.Id
}

func (d MyData) Clone() MyData {
	return MyData{
		Id:      d.Id,
		Key:     d.Key,
		Content: d.Content,
	}
}

type IMyDataPersistence interface {
	Set(ctx context.Context, correlationId string, item MyData) (result MyData, err error)

	Create(ctx context.Context, correlationId string, item MyData) (result MyData, err error)

	GetPageByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, paging cdata.PagingParams, sort cdata.SortParams) (page cdata.DataPage[MyData], err error)

	GetCountByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (count int64, err error)

	GetListByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, sort cdata.SortParams) (items []MyData, err error)

	GetOneById(ctx context.Context, correlationId string, id string) (item MyData, err error)

	GetListByIds(ctx context.Context, correlationId string, ids []string) (items []MyData, err error)

	Update(ctx context.Context, correlationId string, item MyData) (result MyData, err error)

	UpdatePartially(ctx context.Context, correlationId string, id string, data cdata.AnyValueMap) (result MyData, err error)

	DeleteById(ctx context.Context, correlationId string, id string) (result MyData, err error)

	DeleteByIds(ctx context.Context, correlationId string, ids []string) error

	DeleteByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (err error)
}
type MyPostgresPersistence struct {
	*postgrespersist.IdentifiablePostgresPersistence[MyData, string]
}

func NewMyPostgresPersistence() *MyPostgresPersistence {
	c := &MyPostgresPersistence{}
	c.IdentifiablePostgresPersistence = postgrespersist.InheritIdentifiablePostgresPersistence[MyData, string](c, "mydata")
	return c
}

func (c *MyPostgresPersistence) DefineSchema() {
	c.ClearSchema()
	c.EnsureSchema("CREATE TABLE " + c.QuotedTableName() + " (\"id\" TEXT PRIMARY KEY, \"key\" TEXT, \"content\" TEXT)")
}

func (c *MyPostgresPersistence) composeFilter(filter cdata.FilterParams) string {
	key, keyOk := filter.GetAsNullableString("key")
	content, contentOk := filter.GetAsNullableString("content")

	filterObj := ""
	if keyOk && key != "" {
		filterObj += "key='" + key + "'"
	}
	if contentOk && content != "" {
		filterObj += "content='" + content + "'"
	}

	return filterObj
}

func (c *MyPostgresPersistence) composeSort(sort cdata.SortParams) string {
	composeSort := ""

	for _, field := range sort {
		composeSort += field.Name
		if field.Ascending {
			composeSort += " ASC"
		} else {
			composeSort += " DESC"
		}
	}

	return composeSort
}

func (c *MyPostgresPersistence) GetPageByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, paging cdata.PagingParams, sort cdata.SortParams) (page cdata.DataPage[MyData], err error) {
	return c.PostgresPersistence.GetPageByFilter(ctx, correlationId, c.composeFilter(filter), paging, c.composeSort(sort), "")
}

func (c *MyPostgresPersistence) GetListByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams, sort cdata.SortParams) (items []MyData, err error) {

	return c.PostgresPersistence.GetListByFilter(ctx, correlationId, c.composeFilter(filter), c.composeSort(sort), "")
}

func (c *MyPostgresPersistence) GetCountByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (count int64, err error) {
	return c.PostgresPersistence.GetCountByFilter(ctx, correlationId, c.composeFilter(filter))
}

func (c *MyPostgresPersistence) DeleteByFilter(ctx context.Context, correlationId string, filter cdata.FilterParams) (err error) {
	return c.PostgresPersistence.DeleteByFilter(ctx, correlationId, c.composeFilter(filter))
}
class MyIdentifiablePostgresPersistence
    extends IdentifiablePostgresPersistence<MyData, String>
    implements IMyDataPersistence {
  MyIdentifiablePostgresPersistence() : super('mydata', null);

  @override
  void defineSchema_() {
    clearSchema();
    ensureSchema_('CREATE TABLE IF NOT EXISTS ' +
        tableName_! +
        ' (id TEXT PRIMARY KEY, type TEXT, name TEXT)');
    ensureIndex_(tableName_! + '_key', {'key': 1}, {'unique': true});
  }

  String _composeFilter(FilterParams? filter) {
    filter ??= FilterParams();
    var key = filter.getAsNullableString('key');
    var content = filter.getAsNullableString('content');

    var filterCondition = '';
    if (key != null) {
      filterCondition += "key='" + key + "'";
    }
    if (content != null) {
      filterCondition += "content='" + content + "'";
    }

    return filterCondition;
  }

  String? _composeSort(SortParams? sort) {
    sort ??= SortParams(null);
    var composeSort = '';

    for (var i = 0; i < sort.length; i++) {
      composeSort += sort[i].name! + (sort[i].ascending ? ' ASC' : ' DESC');
    }

    return composeSort != '' ? composeSort : null;
  }

  @override
  Future<DataPage<MyData>> getPageByFilter(String? correlationId,
      FilterParams filter, PagingParams? paging, SortParams? sort) {
    return super.getPageByFilter_(correlationId, _composeFilter(filter), paging,
        _composeSort(sort), null);
  }

  @override
  Future<int> getCountByFilter(String? correlationId, FilterParams filter) {
    return super.getCountByFilter_(correlationId, _composeFilter(filter));
  }

  @override
  Future<List<MyData>> getListByFilter(
      String? correlationId, FilterParams filter, SortParams sort) {
    return super.getListByFilter_(
        correlationId, _composeFilter(filter), _composeSort(sort), null);
  }

  @override
  Future<void> deleteByFilter(String? correlationId, FilterParams filter) {
    return super.deleteByFilter_(correlationId, _composeFilter(filter));
  }
}
class MyIdentifiablePostgresPersistence(IdentifiablePostgresPersistence, IMyDataPersistence):

    def __init__(self):
        super().__init__('mydata')

    def _define_schema(self):
        self._clear_schema()
        self._ensure_schema('CREATE TABLE ' + self._table_name + ' (id TEXT PRIMARY KEY, key TEXT, content TEXT)')
        self._ensure_index(self._table_name + '_key', {'key': 1}, {'unique': True})

    def _compose_filter(self, filter: FilterParams):
        filter = filter or FilterParams()
        key = filter.get_as_nullable_string('key')
        content = filter.get_as_nullable_string('content')

        filter_condition = ''
        if key is not None:
            filter_condition += "key='" + key + "'"
        if content is not None:
            filter_condition += "content='" + content + "'"

        return filter_condition

    def _compose_sort(self, sort: SortParams):
        sort = sort or SortParams()
        compose_sort = ''

        for i, filed in enumerate(sort):
            compose_sort += filed.name + (' ASC' if filed.ascending else ' DESC')

        return compose_sort

    def get_page_by_filter(self, correlation_id: Optional[str], filter: FilterParams, paging: PagingParams,
                           sort: SortParams) -> DataPage:
        return super().get_page_by_filter(correlation_id, self._compose_filter(filter), paging,
                                          self._compose_sort(sort), None)

    def get_count_by_filter(self, correlation_id: Optional[str], filter: FilterParams) -> int:
        return super().get_count_by_filter(correlation_id, self._compose_filter(filter))

    def get_list_by_filter(self, correlation_id: Optional[str], filter: FilterParams, sort: SortParams) -> List[MyData]:
        return super().get_list_by_filter(correlation_id, self._compose_filter(filter), self._compose_sort(sort), None)

    def get_one_random(self, correlation_id: Optional[str], filter: FilterParams) -> MyData:
        return super().get_one_random(correlation_id, self._compose_filter(filter))

    def delete_by_filter(self, correlation_id: Optional[str], filter: FilterParams):
        return super().delete_by_filter(correlation_id, self._compose_filter(filter))

Not available

Defining the working database

In order to connect to our databases, we need to define our connection parameters. For our MySQL database, they will look like this:

let host = "localhost";
let port = "3306";
let db_name = "pip";
let user = "root";
let password = "";
var host = "localhost";
var port = "3306";
var db_name = "pip";
var user = "root";
var password = "";
host := "localhost"
port := "3306"
db_name := "pip"
user := "root"
password := ""
var host = 'localhost';
var port = '3306';
var db_name = 'pip';
var user = 'root';
var password = '';
host = 'localhost'
port = 3306
db_name = 'pip'
user = 'root'
password = ''
Not available

Next, we create an instance of our component and configure it.

let database1 = new MyIdentifiableMySqlPersistence();
database1.configure(ConfigParams.fromTuples(
    "connection.host", host,
    "connection.port", port,
    "connection.database", db_name,
    "credential.username", user,
    "credential.password", password
));
var database1 = new MyIdentifiableMySqlPersistence();
database1.Configure(ConfigParams.FromTuples(
    "connection.host", host,
    "connection.port", port,
    "connection.database", db_name,
    "credential.username", user,
    "credential.password", password
));
database1 := NewMyMySqlPersistence()
database1.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	"connection.host", host,
	"connection.port", port,
	"connection.database", db_name,
	"credential.username", user,
	"credential.password", password,
))
var database1 = MyIdentifiableMySqlPersistence();
database1.configure(ConfigParams.fromTuples([
  'connection.host',
  host,
  'connection.port',
  port,
  'connection.database',
  db_name,
  'credential.username',
  user,
  'credential.password',
  password
]));
database1 = MyIdentifiableMySqlPersistence()
database1.configure(ConfigParams.from_tuples(
    "connection.host", host,
    "connection.port", port,
    "connection.database", db_name,
    "credential.username", user,
    "credential.password", password
))
Not available

And, finally, we connect it to our database.

await database1.open(null);
await database1.OpenAsync("123");
err := database1.Open(context.Background(), "123")
await database1.open(null);
database1.open("123")
Not available

If instead, we want to work with our PostgreSQL database, we could define our configuration as

let host = "localhost";
let port = "5432";
let db_name = "pip";
let user = "postgres";
let password = "admin";
var host = "localhost";
var port = "5432";
var db_name = "pip";
var user = "postgres";
var password = "admin";
host := "localhost"
port := "5432"
db_name := "test"
user := "postgres"
password := "postgres"
var host = 'localhost';
var port = '5432';
var db_name = 'pip';
var user = 'postgres';
var password = 'admin';
host = 'localhost'
port = 5432
db_name = 'pip'
user = 'postgres'
password = 'admin'
Not available

And we create an instance of and configure our PostgreSQL component

let database2 = new MyIdentifiablePostgresPersistence();
database2.configure(ConfigParams.fromTuples(
    "connection.host", host,
    "connection.port", port,
    "connection.database", db_name,
    "credential.username", user,
    "credential.password", password
));
var database2 = new MyIdentifiablePostgresPersistence();
database2.Configure(ConfigParams.FromTuples(
    "connection.host", host,
    "connection.port", port,
    "connection.database", db_name,
    "credential.username", user,
    "credential.password", password
));
database2 := NewMyPostgresPersistence()
database2.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	"connection.host", host,
	"connection.port", port,
	"connection.database", db_name,
	"credential.username", user,
	"credential.password", password,
))
var database2 = MyIdentifiablePostgresPersistence();
database2.configure(ConfigParams.fromTuples([
  'connection.host',
  host,
  'connection.port',
  port,
  'connection.database',
  db_name,
  'credential.username',
  user,
  'credential.password',
  password
]));
database2 = MyIdentifiablePostgresPersistence()
database2.configure(ConfigParams.from_tuples(
    "connection.host", host,
    "connection.port", port,
    "connection.database", db_name,
    "credential.user", user,
    "credential.password", password
))
Not available

Then, we connect it to our PostgreSQL database.

await database2.open(null);
await database2.OpenAsync("123");
err = database2.Open(context.Background(), "123")
await database2.open(null);
database2.open(None)
Not available

Once we have connected to the database that we want to work with, we define a new variable called persistence, which is of type

let persistence: IMyDataPersistence;
IMyDataPersistence persistence;
var persistence IMyDataPersistence
IMyDataPersistence persistence;
persistence: IMyDataPersistence
Not available

Following this, we equate it to the connector we want to use. If we want to use our MySQL database, we write

persistence = database1;
persistence = database1;
persistence = database1
persistence = database1;
persistence = database1
Not available

Or, if we prefer to use our PostgreSQL database, we use

persistence = database2;
persistence = database2;
persistence = database2
persistence = database2;
persistence = database2
Not available

CRUD operations

Whatever database we decide to use, we can now perform different CRUD operations by using the persistence object and any of the methods defined in our interface or inherited from the base classes. As our implementations were symmetric, both databases call methods with the same names.

For example

Create

Here we use the create() method and we insert ten records in our database.

for (var i = 0; i < 20; i++) {
    let data: MyData = { id: i.toString(), key: `key ${i}`, content: `content ${i}`};
    let res = await persistence.create(null, data);
}
for (var i=0; i < 20; i++)
{
    var data = new MyData { Id=i.ToString(), Key=$"key {i}", Content=$"content {i}"};
    var res = await persistence.CreateAsync("123", data);
}
for i := 0; i < 20; i++ {
	index := strconv.Itoa(i)
	data := MyData{Id: index, Key: "key " + index, Content: "content " + index}
	res, err := persistence.Create(context.Background(), "123", data)
	if err != nil {
		panic(err)
	}
	fmt.Println(res)
}
for (var i = 0; i < 20; i++) {
  var data = MyData.from(
      i.toString(), 'key ' + i.toString(), 'content ' + i.toString());
  var res = await persistence.create(null, data);
}
for i in range(0, 20):
    data = MyData(str(i), f'key {i}', f'content {i}')
    result = persistence.create("123", data)
Not available
Retrieve

Once we have some records in our database, we can retrieve them by using one of the retrieving methods. In our example below, we use the get_list_by_ids() because we are working with identifiable records.

let ids = [ "3", "4", "5"  ];
let myDataList = await persistence.getListByIds(null, ids);
string[] ids = { "3", "4", "5"  };
var myDataList = await persistence.GetListByIdsAsync("123", ids);
ids := []string{"3", "4", "5"}
myDataList, err := persistence.GetListByIds(context.Background(), "123", ids)
var ids = ['3', '4', '5'];
var myDataList = await persistence.getListByIds(null, ids);
ids = [str(i) for i in range(3,7)]
my_data_list = persistence.get_list_by_ids('123', ids)
Not available

However, we could have also used any of the filter-based methods defined in our interface. For example:

let result = await persistence.getPageByFilter(null, FilterParams.fromTuples("key", "key 8"), null, null);
var result = await persistence.GetPageByFilterAsync("123", FilterParams.FromTuples("key", "key 8"), null, null);
result, err := persistence.GetPageByFilter(context.Background(), "123", *cdata.NewFilterParamsFromTuples("key", "key 8"), *cdata.NewEmptyPagingParams(), *cdata.NewEmptySortParams())
var result = await persistence.getPageByFilter(
    null, FilterParams.fromTuples(['key', 'key 8']), null, null);
result = persistence.get_page_by_filter(None, FilterParams.from_tuples('key', 'key 8'), None, None)
Not available

Which, in our example, returns a DataPage object with the following values

result.data[0].id;       // Returns '8'
result.data[0].key;      // Returns 'key 8'
result.data[0].content;  // Returns 'content 8'
result.Data[0].Id;       // Returns '8'
result.Data[0].Key;      // Returns 'key 8'
result.Data[0].Content;  // Returns 'content 8'
result.Data[0].Id;       // Returns '8'
result.Data[0].Key;      // Returns 'key 8'
result.Data[0].Content;  // Returns 'content 8'
result.data[0].id;       // Returns '8'
result.data[0].key;      // Returns 'key 8'
result.data[0].content;  // Returns 'content 8'
result.data[0].id       # Returns '8'
result.data[0].key     # Returns 'key 8'
result.data[0].content  # Returns 'content 8'
Not available
Update

We can update a record by using the update() method. In the following example, we define a new instance for our record with id equal to ‘1’ and we update the content to ‘Updated content 1’.

let newValue: MyData = { id: "1", key: "key 1", content: "Updated content 1" };

await persistence.update(null, newValue);
var newValue = new MyData { Id = "1", Key = "key 1", Content = "Updated content 1" };

await persistence.UpdateAsync("123", newValue);
newValue := MyData{Id: "1", Key: "key 1", Content: "Updated content 1"}
res, err := persistence.Update(context.Background(), "123", newValue)
var newValue = MyData.from('1', 'key 1', 'Updated content 1');

await persistence.update(null, newValue);
new_values = MyData('1', 'key 1', 'Updated content 1')

persistence.update(None, new_values)
Not available
Delete

Finally, we can delete some of our records with the deleteByIds method

let ids = [ "0", "1" ];
await persistence.deleteByIds(null, ids);
string[] ids = { "0", "1" };
await persistence.DeleteByIdsAsync("123", ids);
ids = []string{"0", "1"}
err = persistence.DeleteByIds(context.Background(), "123", ids)
var ids = [ "0", "1" ];
await persistence.deleteByIds(null, ids);
ids = ['0','1']
persistence.delete_by_ids(None, ids)
Not available

Or with the deleteByFilter() method

await persistence.deleteByFilterParams(null, FilterParams.fromTuples("key", "key 7"));
await persistence.DeleteByFilterAsync("123", FilterParams.FromTuples("key", "key 7"));
err = persistence.DeleteByFilter(context.Background(), "123", *cdata.NewFilterParamsFromTuples("key", "key 7"))
await persistence.deleteByFilter(
    null, FilterParams.fromTuples(['key', 'key 7']));
persistence.delete_by_filter(None, FilterParams.from_tuples('key', 'key 7'))
Not available

Data transfer

This approach defines an easy and practical way to migrate tables from one database to another. Let’s suppose that we want to transfer the data existing in our MySQL database to a table in our PostgreSQL database.

To achieve this, first, we retrieve the data from the table in MySQL and we obtain a list with elements of type MyData, which we call myDataList. As both databases use the same data structure, we just need to insert those rows via the create() method, which accepts a correlationID and a list of MyData elements as inputs. The following code shows this:

// Step 1:  we extract the data from the MySQL database
persistence = database1;
let ids3: string[] = [ "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" ];
myDataList = await persistence.getListByIds(null, ids3);

// Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2;
for (let item of myDataList) {
    let result: MyData = await persistence.create(null, item);
}
// Step 1:  we extract the data from the MySQL database
persistence = database1;
string[] ids = { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
myDataList = await persistence.GetListByIdsAsync("123", ids);

// Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2;
foreach (var item in myDataList)
    var result = await persistence.CreateAsync("123", item);
// Step 1:  we extract the data from the MySQL database
persistence = database1
ids3 := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
myDataList, err = persistence.GetListByIds(context.Background(), "123", ids3)
if err != nil {
	panic(err)
}
// Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2
for _, item := range myDataList {
	result, err := persistence.Create(context.Background(), "123", item)
	if err != nil {
		panic(err)
	}
	fmt.Println(result)
}
// Step 1:  we extract the data from the MySQL database
persistence = database1;
var ids3 = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10'];
myDataList = await persistence.getListByIds(null, ids3);

// Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2;
for (var item in myDataList) {
  var result = await persistence.create(null, item);
  print(result);
}
# Step 1:  we extract the data from the MySQL database
persistence = database1
ids = [str(i) for i in range(0,10)]
my_data_list = persistence.get_list_by_ids('123', ids)

# Step 2: we insert the data into the mydata table in the PostgreSQL database.
persistence = database2
for i in my_data_list:
    result = persistence.create(None, i)
Not available

Wrapping up

In this tutorial, we have seen how to design our code so we can benefit from the symmetric implementation feature that defines the Pip.Services toolkit.

We did this through an example that uses a MySQL and a PostgreSQL database. First, we created a common data object, an interface that provides a common structure to our database components, and a component per database.

Then, we showed how to connect those database components to their respective databases and we equated their instances to a variable that we named persistence, which was used to define our CRUD operations.

In this manner, we saw how we can define a common set of instructions that are used by any of the databases that we want to work with.

We also saw how to benefit from this design by transferring a set of records from our MySQL database to our PostgreSQL database.

In conclusion, Pip.Services provides a way to simplify our code and reduce the amount of work needed to work with persistence components.