Concurrency

In order to achieve scalability and resilience, microservices have to allow running multiple instances of the same microservice. In this way, infrastructure can distribute load across them, and switch traffic to surviving instances, when some instances fail. The easier way to achieve this is to implement microservices as completely stateless independent components. However, there are situations when microservices have to maintain their states or coordinate their work among their instances or with other microservices. To support these scenarios the Pip.Services toolkit offers a few abstractions. Let’s look at them.

State management

When a microservice is created to be stateful, it needs to save its state in a distributed storage, so other instances can also access it. The Pip.Services toolkit defines the IStateStore interface for state management components in the state package in the components module. It allows saving, loading, and deleting states using a unique transaction id as a key. Furthermore, since stores can be used by different microservices and different transactions, it is recommended to append the key with a microservice name or transaction type to ensure their global uniqueness.

The following is an example of state management in a stateful microservice:

import { Context } from "pip-services4-components-node";
import { IStateStore } from "pip-services4-logic-node";

class MyComponent {
    private _store: IStateStore;

    ...

    public doSomething(ctx: Context, objectId: string) {
        // Get state from the store or create a new one if the state wasn't found
        let state: MyState = await this._store.load(ctx, "mycomponent:" + objectId);
        if (state != null) { state = new MyState(); }
        ...

        await this._store.save(ctx, "mycomponent:" + objectId, state);
    }
}

Not available
import (
	"context"

	cstate "github.com/pip-services4/pip-services4-go/pip-services4-logic-go/state"
)

type MyComponent struct {
	store cstate.IStateStore[any]
}

// ...

func (c *MyComponent) DoSomething(ctx context.Context, objectId string) {
	// Get state from the store or create a new one if the state wasn't found
	state := c.store.Load(ctx, "mycomponent:"+objectId)
	if state != nil {
		// ...
	}

	c.store.Save(ctx, "mycomponent:"+objectId, state)
}
Not available
from pip_services4_logic.state import IStateStore
from pip_services4_components.context import IContext

class MyComponent:
    _store: IStateStore = None

    ...

    def do_something(self, context: IContext, object_id: str):
        # Get state from the store or create a new one if the state wasn't found
        state: MyState = self._store.load(context, "mycomponent:" + object_id)
        if state is not None: state = MyState()
        ...

        self._store.save(context, "mycomponent:" + object_id, state)

Not available

The Pip.Service toolkit provides many different implementations of the state stores. They are:

  • NullStateStore: Dummy state store implementation that doesn’t do anything.
  • MemoryStateStore: State store that keeps states in the process memory.
  • RedisStateStore
  • MemcachedStateStore
  • MongoDbStateStore
  • PostgresStateStore
  • MySqlStateStore
  • And others

Caching

Caching is a mechanism that is primarily used to optimize requests. When a microservice processes a time-consuming request, it can save the result in a cache. On subsequent requests, the microservice first tries to get the result from the cache, and on success, returns it without running the full logic.

Another scenario could be to save the transaction state in a cache, instead of in a persistent storage. However, even though in this way the state can be lost after some time, this behavior could be acceptable in many cases.

To provide for these scenarios, the cache package in the components module contains the ICache interface that allows storing, retrieving, and deleting cached values using their unique keys. The key is just a string. To prevent conflicts, it is recommended to combine the microservice or/and collection name in the object id.

There are a few implementations of caches in the toolkit:

  • NullCache: Dummy cache implementation that doesn’t do anything.
  • MemoryCache: Cache that stores values in the process memory.
  • RedisCache: Distributed cache that stores values in Redis in-memory database.
  • MemcachedCache: Distributed cache that stores values in Memcached’s caching service.

An example of using a cache is the following:

import { Context } from "pip-services4-components-node";
import { ICache } from "pip-services4-logic-node";

class MyComponent {
    private _cache: ICache;
  
    ...
  
    public getMyObjectById(ctx: Context, objectId: string): Promise<MyObject> {
      let result = await this._cache.retrieve(ctx, "mycomponent:" + objectId);
      if (result != null) { return result; }
  
      // Retrieve the object
      ...
  
      await this._cache.store(ctx, "mycomponent:" + objectId, result, 1000);
      return result;
    }
  }
Not available
import (
	"context"

	ccache "github.com/pip-services4/pip-services4-go/pip-services4-logic-go/cache"
)

type MyComponent struct {
	cache ccache.ICache[any]
}

// ...

func (c *MyComponent) GetMyObjectById(ctx context.Context, objectId string) any {
	// Get state from the store or create a new one if the state wasn't found
	result, err := c.cache.Retrieve(ctx, "mycomponent:"+objectId)
	if err != nil {
		return err
	}
	if result != nil {
		return result
	}

	// Retrieve the object
	// ...

	c.cache.Store(ctx, "mycomponent:"+objectId, result, 1000)
	return result
}
Not available
from pip_services4_logic.cache import ICache
from pip_services4_components.context import IContext

class MyObject:
    def myMethod():
        return "some result"

class MyComponent:
    _cache: ICache = None

    #...

    def get_my_object_by_id(self, context: IContext, object_id: str) -> MyObject:
        result = self._cache.retrieve(context, "mycomponent:" + object_id)
        if result is not None:
            return result

        # Retrieve the object
        #...

        self._cache.store(context, "mycomponent:" + object_id, result, 1000)
        return result

Not available

Locking

Locks provided by Pip.Services work similarly to traditional synchronization primitives available in many programming languages. The main difference is they support coordination across multiple microservices running on potentially different computing instances across the network.

In order to implement locks, the components have to implement the standard ILock interface defined in the lock package in the components module. There are two possible scenarios for this implementation.

The first scenario is to acquire a lock before running a transaction to prevent other instances to override changes or create conflicts in any other way. This is a dangerous path since distributed locks can significantly lower system throughput or/and cause deadlocks. The example below shows how this case is implemented.

import { Context } from "pip-services4-components-node";
import { ILock } from "pip-services4-logic-node";

class MyComponent {
    private _lock: ILock;
  
    ...
    public processMyObject(ctx: Context, objectId: string) {
      // Acquire lock for 10 secs
      await this._lock.acquireLock(ctx, "mycomponent:" + objectId, 10000, 10000);
      try {
        ...
      } finally {
        // Release lock
        await this._lock.releaseLock(ctx, "mycomponent:" + objectId);
      }
    }
  }
Not available
import (
	"context"

	clock "github.com/pip-services4/pip-services4-go/pip-services4-logic-go/lock"
)

type MyComponent struct {
	lock clock.ILock
}

// ...

func (c *MyComponent) ProcessMyObject(ctx context.Context, objectId string) {
	// Acquire lock for 10 secs
	c.lock.AcquireLock(ctx, "mycomponent:"+objectId, 10000, 10000)
	// Release lock
	defer c.lock.ReleaseLock(ctx, "mycomponent:"+objectId)

	// ...
}
Not available
from pip_services4_logic.lock import ILock
from pip_services4_components.context import IContext

class MyComponent:
    _lock: ILock = None

    ...

    def process_my_object(self, context: IContext, object_id: str):
        # Acquire lock for 10 secs
        self._lock.acquire_lock(context, "mycomponent:" + object_id, 10000, 10000)
        try:
            ...
        finally:
            # Release lock
            self._lock.release_lock(context, "mycomponent:" + object_id)



Not available

The second scenario can be used just to prevent double processing. Before executing a transaction, the component tries to acquire a lock. If a lock is not acquired, that means another instance is already processing the transaction and this instance cancels the execution. The following code provides an example of this scenario.

import { Context } from "pip-services4-components-node";
import { ILock } from "pip-services4-logic-node";

class MyComponent {
    private _lock: ILock;
  
    ...
    public ProcessMyObject(ctx: Context, objectId: string)
    {
        // Try to acquire lock for 10 secs
        if (!this._lock.tryAcquireLock(ctx, "mycomponent:" + objectId, 10000))
        {
            // Other instance already executing that transaction
            return;
        }

        ...
    }
  }
Not available
import (
	"context"

	clock "github.com/pip-services4/pip-services4-go/pip-services4-logic-go/lock"
)

type MyComponent struct {
	lock clock.ILock
}

// ...

func (c *MyComponent) ProcessMyObject(ctx context.Context, objectId string) error {
	// Try to acquire lock for 10 secs
	ok, err := c.lock.TryAcquireLock(ctx, "mycomponent:"+objectId, 10000)
	if !ok || err != nil {
		// Other instance already executing that transaction
		return err
	}

	// ...
}
Not available
from pip_services4_logic.lock import ILock
from pip_services4_components.context import IContext

class MyComponent:
    _lock: ILock = None

    ...

    def process_my_object(self, context: IContext, object_id: str):
        # Try to acquire lock for 10 secs
        if not self._lock.try_acquire_lock(context, "mycomponent:" + object_id, 10000):

            # Other instance already executing that transaction
            return

        ...


Not available

There are several ready to use Lock components in the toolkit. They include:

  • NullLock: Dummy lock implementation with no real effect.
  • MemoryLock: Lock used to synchronize the execution of a process using shared memory.
  • RedisLock: Distributed lock that is implemented based on the Redis in-memory database.
  • MemcachedLock: Distributed lock that is implemented based on Memcached’s caching service.

References

For more information about connectivity see: