Concurrency

In order to achieve scalability and resilience, microservices have to allow running multiple instances of the same microservice. In this way, infrastructure can distribute load across them, and switch traffic to surviving instances, when some instances fail. The easier way to achieve this is to implement microservices as completely stateless independent components. However, there are situations when microservices have to maintain their states or coordinate their work among their instances or with other microservices. To support these scenarios the Pip.Services toolkit offers a few abstractions. Let’s look at them.

State management

When a microservice is created to be stateful, it needs to save its state in a distributed storage, so other instances can also access it. The Pip.Services toolkit defines the IStateStore interface for state management components in the state package in the components module. It allows saving, loading, and deleting states using a unique transaction id as a key. Furthermore, since stores can be used by different microservices and different transactions, it is recommended to append the key with a microservice name or transaction type to ensure their global uniqueness.

The following is an example of state management in a stateful microservice:

import { IStateStore } from "pip-services3-components-nodex";

class MyComponent {
    private _store: IStateStore;

    ...

    public doSomething(correlationId: string, objectId: string) {
        // Get state from the store or create a new one if the state wasn't found
        let state: MyState = await this._store.load(correlationId, "mycomponent:" + objectId);
        if (state != null) { state = new MyState(); }
        ...

        await this._store.save(correlationId, "mycomponent:" + objectId, state);
    }
}

using PipServices3.Components.State;

public class MyComponent
{
    private IStateStore _store;

    ...

    public async void DoSomething(string correlationId, string objectId)
    {
        // Get state from the store or create a new one if the state wasn't found
        MyState state = await this._store.LoadAsync<MyState>(correlationId, "mycomponent:" + objectId);
        if (state != null) { state = new MyState(); }
        ...

        await this._store.SaveAsync(correlationId, "mycomponent:" + objectId, state);
    }
}


import (
	"context"

	cstate "github.com/pip-services3-gox/pip-services3-components-gox/state"
)

type MyComponent struct {
	store cstate.IStateStore[any]
}

// ...

func (c *MyComponent) DoSomething(ctx context.Context, correlationId string, objectId string) {
	// Get state from the store or create a new one if the state wasn't found
	state := c.store.Load(ctx, correlationId, "mycomponent:"+objectId)
	if state != nil {
		// ...
	}

	c.store.Save(ctx, correlationId, "mycomponent:"+objectId, state)
}

import 'package:pip_services3_components/src/state/IStateStore.dart';

class MyComponent {
    IStateStore _store;

    ...

    void doSomething(String? correlationId, String objectId) async {
        // Get state from the store or create a new one if the state wasn't found
        var state = await _store.load(correlationId, 'mycomponent:' + objectId);
        if (state != null) { state = MyState(); }
        ...

        await _store.save(correlationId, 'mycomponent:' + objectId, state);
    }
}

from pip_services3_components.state import IStateStore


class MyComponent:
    _store: IStateStore = None

    ...

    def do_something(self, correlation_id: str, object_id: str):
        # Get state from the store or create a new one if the state wasn't found
        state: MyState = self._store.load(correlation_id, "mycomponent:" + object_id)
        if state is not None: state = MyState()
        ...

        self._store.save(correlation_id, "mycomponent:" + object_id, state)


Not available

The Pip.Service toolkit provides many different implementations of the state stores. They are:

  • NullStateStore: Dummy state store implementation that doesn’t do anything.
  • MemoryStateStore: State store that keeps states in the process memory.
  • RedisStateStore
  • MemcachedStateStore
  • MongoDbStateStore
  • PostgresStateStore
  • MySqlStateStore
  • And others

Caching

Caching is a mechanism that is primarily used to optimize requests. When a microservice processes a time-consuming request, it can save the result in a cache. On subsequent requests, the microservice first tries to get the result from the cache, and on success, returns it without running the full logic.

Another scenario could be to save the transaction state in a cache, instead of in a persistent storage. However, even though in this way the state can be lost after some time, this behavior could be acceptable in many cases.

To provide for these scenarios, the cache package in the components module contains the ICache interface that allows storing, retrieving, and deleting cached values using their unique keys. The key is just a string. To prevent conflicts, it is recommended to combine the microservice or/and collection name in the object id.

There are a few implementations of caches in the toolkit:

  • NullCache: Dummy cache implementation that doesn’t do anything.
  • MemoryCache: Cache that stores values in the process memory.
  • RedisCache: Distributed cache that stores values in Redis in-memory database.
  • MemcachedCache: Distributed cache that stores values in Memcached’s caching service.

An example of using a cache is the following:

import { ICache } from "pip-services3-components-nodex";

class MyComponent {
  private _cache: ICache;

  ...

  public getMyObjectById(correlationId: string, objectId: string): Promise<MyObject> {
    let result = await this._cache.retrieve(correlationId, "mycomponent:" + objectId);
    if (result != null) { return result; }

    // Retrieve the object
    ...

    await this._cache.store(correlationId, "mycomponent:" + objectId, result, 1000);
    return result;
  }
}
using PipServices3.Components.Cache;

public class MyComponent
{
    private ICache _cache;

    ...

    public async Task<MyObject> GetMyObjectById(string correlationId, string objectId)
    {
        var result = await this._cache.RetrieveAsync<MyObject>(correlationId, "mycomponent:" + objectId);
        if (result != null) { return result; }

        // Retrieve the object
        ...

        await this._cache.StoreAsync(correlationId, "mycomponent:" + objectId, result, 1000);
        return result;
    }
}

import (
	"context"

	ccache "github.com/pip-services3-gox/pip-services3-components-gox/cache"
)

type MyComponent struct {
	cache ccache.ICache[any]
}

// ...

func (c *MyComponent) GetMyObjectById(ctx context.Context, correlationId string, objectId string) any {
	// Get state from the store or create a new one if the state wasn't found
	result, err := c.cache.Retrieve(ctx, correlationId, "mycomponent:"+objectId)
	if err != nil {
		return err
	}
	if result != nil {
		return result
	}

	// Retrieve the object
	// ...

	c.cache.Store(ctx, correlationId, "mycomponent:"+objectId, result, 1000)
	return result
}

import 'package:pip_services3_components/pip_services3_components.dart';

class MyComponent {
  ICache _cache;

  ...

  Future<MyObject> getMyObjectById(String? correlationId, String objectId) async {
    var result = await _cache.retrieve(correlationId, 'mycomponent:' + objectId);
    if (result != null) { return result; }

    // Retrieve the object
    ...

    await _cache.store(correlationId, 'mycomponent:' + objectId, result, 1000);
    return result;
  }
}

from pip_services3_components.cache import ICache


class MyComponent:
    _cache: ICache = None

    ...

    def get_my_object_by_id(self, correlation_id: str, object_id: str) -> MyObject:
        result = self._cache.retrieve(correlation_id, "mycomponent:" + object_id)
        if result is not None:
            return result

        # Retrieve the object
        ...

        self._cache.store(correlation_id, "mycomponent:" + object_id, result, 1000)
        return result

Not available

Locking

Locks provided by Pip.Services work similarly to traditional synchronization primitives available in many programming languages. The main difference is they support coordination across multiple microservices running on potentially different computing instances across the network.

In order to implement locks, the components have to implement the standard ILock interface defined in the lock package in the components module. There are two possible scenarios for this implementation.

The first scenario is to acquire a lock before running a transaction to prevent other instances to override changes or create conflicts in any other way. This is a dangerous path since distributed locks can significantly lower system throughput or/and cause deadlocks. The example below shows how this case is implemented.

import { ILock } from "pip-services3-components-nodex";

class MyComponent {
  private _lock: ILock;

  ...
  public processMyObject(correlationId: string, objectId: string) {
    // Acquire lock for 10 secs
    await this._lock.acquireLock(correlationId, "mycomponent:" + objectId, 10000, 10000);
    try {
      ...
    } finally {
      // Release lock
      await this._lock.releaseLock(correlationId, "mycomponent:" + objectId);
    }
  }
}
using PipServices3.Components.Lock;

public class MyComponent
{
    private ILock _lock;

    ...
    public void ProcessMyObject(string correlationId, string objectId)
    {
        // Acquire lock for 10 secs
        this._lock.AcquireLock(correlationId, "mycomponent:" + objectId, 10000, 10000);
        try
        {
            ...
        }
        finally
        {
            // Release lock
            this._lock.ReleaseLock(correlationId, "mycomponent:" + objectId);
        }
    }
}

import (
	"context"

	ccache "github.com/pip-services3-gox/pip-services3-components-gox/lock"
)

type MyComponent struct {
	lock ccache.ILock
}

// ...

func (c *MyComponent) ProcessMyObject(ctx context.Context, correlationId string, objectId string) {
	// Acquire lock for 10 secs
	c.lock.AcquireLock(ctx, correlationId, "mycomponent:"+objectId, 10000, 10000)
	// Release lock
	defer c.lock.ReleaseLock(ctx, correlationId, "mycomponent:"+objectId)

	// ...
}

import 'package:pip_services3_components/pip_services3_components.dart';

class MyComponent {
  ILock _lock;

  ...
  void processMyObject(String correlationId, String objectId) async {
    // Acquire lock for 10 secs
    await _lock.acquireLock(correlationId, 'mycomponent:' + objectId, 10000, 10000);
    try {
      ...
    } finally {
      // Release lock
      await _lock.releaseLock(correlationId, 'mycomponent:' + objectId);
    }
  }
}

from pip_services3_components.lock import ILock


class MyComponent:
    _lock: ILock = None

    ...

    def process_my_object(self, correlation_id: str, object_id: str):
        # Acquire lock for 10 secs
        self._lock.acquire_lock(correlation_id, "mycomponent:" + object_id, 10000, 10000)
        try:
            ...
        finally:
            # Release lock
            self._lock.release_lock(correlation_id, "mycomponent:" + object_id)


Not available

The second scenario can be used just to prevent double processing. Before executing a transaction, the component tries to acquire a lock. If a lock is not acquired, that means another instance is already processing the transaction and this instance cancels the execution. The following code provides an example of this scenario.

using PipServices3.Components.Lock;

public class MyComponent
{
    private ILock _lock;

    // ...
    public void ProcessMyObject(string correlationId, string objectId)
    {
        // Try to acquire lock for 10 secs
        if (!_lock.TryAcquireLock(correlationId, "mycomponent:" + objectId, 10000))
        {
            // Other instance already executing that transaction
            return;
        }

        ...
    }
}

import { ILock } from "pip-services3-components-nodex";

class MyComponent {
  private _lock: ILock;

  ...
  public processMyObject(correlationId: string, objectId: string) {
    // Try to acquire lock for 10 secs
    if(!await this._lock.tryAcquireLock(correlationId, "mycomponent:" + objectId, 10000)) {
      // Other instance already executing that transaction
      return;
    }

  ...
  }
}
import (
	"context"

	ccache "github.com/pip-services3-gox/pip-services3-components-gox/lock"
)

type MyComponent struct {
	lock ccache.ILock
}

// ...

func (c *MyComponent) ProcessMyObject(ctx context.Context, correlationId string, objectId string) error {
	// Try to acquire lock for 10 secs
	ok, err := c.lock.TryAcquireLock(ctx, correlationId, "mycomponent:"+objectId, 10000)
	if !ok || err != nil {
		// Other instance already executing that transaction
		return err
	}

	// ...
}
import 'package:pip_services3_components/pip_services3_components.dart';

class MyComponent {
  ILock _lock;

  ...
  void processMyObject(String correlationId, String objectId) async {
    // Try to acquire lock for 10 secs
    if(!await _lock.tryAcquireLock(correlationId, 'mycomponent:' + objectId, 10000)) {
      // Other instance already executing that transaction
      return;
    }

  ...
  }
}

from pip_services3_components.lock import ILock


class MyComponent:
    _lock: ILock = None

    ...

    def process_my_object(self, correlation_id: str, object_id: str):
        # Try to acquire lock for 10 secs
        if not self._lock.try_acquire_lock(correlation_id, "mycomponent:" + object_id, 10000):

            # Other instance already executing that transaction
            return

        ...

Not available

There are several ready to use Lock components in the toolkit. They include:

  • NullLock: Dummy lock implementation with no real effect.
  • MemoryLock: Lock used to synchronize the execution of a process using shared memory.
  • RedisLock: Distributed lock that is implemented based on the Redis in-memory database.
  • MemcachedLock: Distributed lock that is implemented based on Memcached’s caching service.

References

For more information about connectivity see: