Connection utils

How to use a set of utilities that simplify the handling of connections.

Key takeaways

composeUri Utility that creates a URI string from information stored in a ConfigParams object.
concat Utility that merges two ConfigParams objects into one.
exclude Utility that creates a new ConfigParams object based on a given one and a set of excluded fields.
include Utility that creates a new ConfigParams object based on a given one and a set of included fields.
parseURI Utility that obtains information from a URI and stores it in a ConfigParams object.

Introduction

In this tutorial, you will learn how to use a set of utilities offered by Pip.Services that can be used to simplify the handling of connections. We will start by explaining how to import these components. Then, we will see their syntax and how to use them. Finally, we will explore an example that illustrates the practical application of some of these utilities.

Pre-conditions

In order to use these utilities, we need to import them first. The following command shows how to do this:

import { ConnectionUtils } from "pip-services3-components-nodex";
using PipServices3.Components.Connect;
import (
    ccon "github.com/pip-services3-gox/pip-services3-components-gox/connect"
)
import 'package:pip_services3_components/pip_services3_components.dart';
from pip_services3_components.connect import ConnectionUtils
Not available

Utility # 1: composeUri

The composeUri() method creates a URI based on the information given via a ConfigParams object. It also accepts default values for protocol and port, which are used when the ConfigParams object doesn’t provide this information.

The syntax of this command is

composeUri(options: ConfigParams, default_protocol: str, default_port: int)

And, the following example illustrates how to use it:

import { ConfigParams } from "pip-services3-commons-nodex";
import { ConnectionUtils } from "pip-services3-components-nodex";

let options = ConfigParams.fromTuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", null
);

let uri = ConnectionUtils.composeUri(options, "tcp", 9092);

using PipServices3.Commons.Config;
using PipServices3.Components.Connect;

var options = ConfigParams.FromTuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", null
);

var uri = ConnectionUtils.ComposeUri(options, "tcp", 9092);

import (
	"fmt"

	cconf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	ccon "github.com/pip-services3-gox/pip-services3-components-gox/connect"
)


options := cconf.NewConfigParamsFromTuples(
	"host", "broker1,broker2",
	"port", ",8082",
	"username", "user",
	"password", "pass123",
	"param1", "ABC",
	"param2", "XYZ",
	"param3", nil,
)

uri := ccon.ConnectionUtils.ComposeUri(options, "tcp", 9092)

import 'package:pip_services3_commons/pip_services3_commons.dart';
import 'package:pip_services3_components/pip_services3_components.dart';

var options = ConfigParams.fromTuples([
  'host', 'broker1,broker2',
  'port', ',8082',
  'username', 'user',
  'password', 'pass123',
  'param1', 'ABC',
  'param2', 'XYZ',
  'param3', null
]);
var uri = ConnectionUtils.composeUri(options, 'tcp', 9092);

from pip_services3_commons.config import ConfigParams

options = ConfigParams.from_tuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", None
)

uri = ConnectionUtils.compose_uri(options, "tcp", 9092)
Not available

Which returns

figure 0

Utility # 2: concat

The concat() method merges the information from two ConfigParams objects into one. It also accepts a set of parameter names, which if specified, determine the parameters to be merged.

Its syntax is

concat(options1: ConfigParams, options2: ConfigParams, *keys: str)

The example below shows how to merge the username and password only.

let configA = ConfigParams.fromTuples(
    "host", "broker1,broker2",
    "port", "8082",
    "username", "user2",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", "p3A"
);
let configB = ConfigParams.fromTuples(
    "host", "broker3,broke42",
    "port", "8083",
    "username", "user2",
    "password", "pass1234",
    "param1", "ABCD",
    "param2", "WXYZ",
    "param3", "p3B"
);

let config = ConnectionUtils.concat(configA, configB, "username", "password");

var configA = ConfigParams.FromTuples(
    "host", "broker1,broker2",
    "port", "8082",
    "username", "user2",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", "p3A"
);

var configB = ConfigParams.FromTuples(
    "host", "broker3,broke42",
    "port", "8083",
    "username", "user2",
    "password", "pass1234",
    "param1", "ABCD",
    "param2", "WXYZ",
    "param3", "p3B"
);

var config = ConnectionUtils.Concat(configA, configB, "username", "password");
configA := cconf.NewConfigParamsFromTuples(
	"host", "broker1,broker2",
	"port", "8082",
	"username", "user2",
	"password", "pass123",
	"param1", "ABC",
	"param2", "XYZ",
	"param3", "p3A",
)

configB := cconf.NewConfigParamsFromTuples(
	"host", "broker3,broke42",
	"port", "8083",
	"username", "user2",
	"password", "pass1234",
	"param1", "ABCD",
	"param2", "WXYZ",
	"param3", "p3B",
)

config := ccon.ConnectionUtils.Concat(configA, configB, "username", "password")
var configA = ConfigParams.fromTuples([
  'host', 'broker1,broker2',
  'port', '8082',
  'username', 'user2',
  'password', 'pass123',
  'param1', 'ABC',
  'param2', 'XYZ',
  'param3', 'p3A'
]);

var configB = ConfigParams.fromTuples([
  'host', 'broker3,broke42',
  'port', '8083',
  'username', 'user2',
  'password', 'pass1234',
  'param1', 'ABCD',
  'param2', 'WXYZ',
  'param3', 'p3B'
]);
  
var config = ConnectionUtils.concat(configA, configB, ['username', 'password']);

configA = ConfigParams.from_tuples(
    "host", "broker1,broker2",
    "port", "8082",
    "username", "user2",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", "p3A"
)

configB = ConfigParams.from_tuples(
    "host", "broker3,broke42",
    "port", "8083",
    "username", "user2",
    "password", "pass1234",
    "param1", "ABCD",
    "param2", "WXYZ",
    "param3", "p3B"
)

config = ConnectionUtils.concat(configA,configB, 'username', 'password')
Not available

Where the merged object has the following structure:

figure 1

Utility # 3: exclude

The exclude() utility creates a new ConfigParams object that includes all the fields from a given one except for those specified as input parameters.

Its syntax is

exclude(options: ConfigParams, *keys: str)

In the following example, we create a new ConfigParams object containing all the same fields as the original one with the exemption of username and password:

let config = ConfigParams.fromTuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", null
);

let config2 = ConnectionUtils.exclude(config, "username", "password");

var config = ConfigParams.FromTuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", null
);

var config2 = ConnectionUtils.Exclude(config, "username", "password");

config := cconf.NewConfigParamsFromTuples(
	"host", "broker1,broker2",
	"port", ",8082",
	"username", "user",
	"password", "pass123",
	"param1", "ABC",
	"param2", "XYZ",
	"param3", nil,
)

config2 := ccon.ConnectionUtils.Exclude(config, "username", "password")

var config = ConfigParams.fromTuples([
  'host', 'broker1,broker2',
  'port', ',8082',
  'username', 'user',
  'password', 'pass123',
  'param1', 'ABC',
  'param2', 'XYZ',
  'param3', null  
]);

var config2 = ConnectionUtils.exclude(config, ['username', 'password']);

config = ConfigParams.from_tuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", None
)

config2 = ConnectionUtils.exclude(config, 'username', 'password')
Not available

And, after running the above code, we obtain the following result:

figure 2

Utility # 4: include

The include() method creates a new ConfigParams object based on a given one and a set of fields to be included.

The syntax is

include(options: ConfigParams, *keys: str)

For example, in the code below, a new object is created from an original one which includes the username and password only.

let config = ConfigParams.fromTuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", null
);

let config2 = ConnectionUtils.include(config, "username", "password");

var config = ConfigParams.FromTuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", null
);

var config2 = ConnectionUtils.Include(config, "username", "password");

config := cconf.NewConfigParamsFromTuples(
	"host", "broker1,broker2",
	"port", ",8082",
	"username", "user",
	"password", "pass123",
	"param1", "ABC",
	"param2", "XYZ",
	"param3", nil,
)

config2 := ccon.ConnectionUtils.Include(config, "username", "password")

var config = ConfigParams.fromTuples([
  'host', 'broker1,broker2',
  'port', ',8082',
  'username', 'user',
  'password', 'pass123',
  'param1', 'ABC',
  'param2', 'XYZ',
  'param3', null  
]);

var config2 = ConnectionUtils.include(config, ['username', 'password']);

config = ConfigParams.from_tuples(
    "host", "broker1,broker2",
    "port", ",8082",
    "username", "user",
    "password", "pass123",
    "param1", "ABC",
    "param2", "XYZ",
    "param3", None
)

config2 = ConnectionUtils.include(options, 'username', 'password')
Not available

With the resulting object containing the following fields and values:

figure 3

Utility # 5: parse_uri

The parseURI() method obtains information from a URI string and stores it in a ConfigParams object. It accepts default values for protocol and port, which are included in the ConfigParams object in those cases that this information cannot be obtained from the URI string.

Its syntax is

parseUri(uri: str, default_protocol: str, default_port: str)

The following example shows how this utility works:

let config = ConnectionUtils.parseUri("tcp://user:pass123@broker1:8082,broker2:8082?param1=ABC&param2=XYZ", "tcp", 9092);
var config = ConnectionUtils.ParseUri("tcp://user:pass123@broker1:8082,broker2:8082?param1=ABC&param2=XYZ", "tcp", 9092);
config := ccon.ConnectionUtils.ParseUri("tcp://user:pass123@broker1:8082,broker2:8082?param1=ABC&param2=XYZ", "tcp", 9092)
 var config = ConnectionUtils.parseUri('tcp://user:pass123@broker1:8082,broker2:8082?param1=ABC&param2=XYZ', 'tcp', 9092);
config = ConnectionUtils.parse_uri("tcp://user:pass123@broker1:8082,broker2:8082?param1=ABC&param2=XYZ", "tcp", 9092)
Not available

figure 4

And, this example illustrates how to use the default values:

let config = ConnectionUtils.parseUri("user:pass123@broker1,broker2?param1=ABC&param2=XYZ", "tcp", 9092);
var config = ConnectionUtils.ParseUri("user:pass123@broker1,broker2?param1=ABC&param2=XYZ", "tcp", 9092);
config := ccon.ConnectionUtils.ParseUri("user:pass123@broker1,broker2?param1=ABC&param2=XYZ", "tcp", 9092)
var config = ConnectionUtils.parseUri('user:pass123@broker1,broker2?param1=ABC&param2=XYZ', 'tcp', 9092);
config = ConnectionUtils.parse_uri("user:pass123@broker1,broker2?param1=ABC&param2=XYZ", "tcp", 9092)
Not available

figure 5

Practical example

The example below shows a practical application of two of these utilities, namely parseURL() and exclude(). In it, we create a MongoDB connector where these methods are used during configuration to consider two possibilities: a connection passed as a URI string and access to the database without the need for authorization.

import { ConfigParams, ConnectionException, IConfigurable, IOpenable } from "pip-services3-commons-nodex";
import { ConnectionUtils } from "pip-services3-components-nodex";

export async function main() {
    let options = ConfigParams.fromTuples(
        "host", "localhost",
        "port", ",27017",
        "username", "user",
        "password", "pass123",
        "protocol", "mongodb",
        "collection", "my_db_name"
    );

    // Create connection
    let conn = new MongoDbConnector();
    conn.configure(options);
    await conn.open(null);
}

export class MongoDbConnector implements IOpenable, IConfigurable {

    // The MongoDB connection object.
    protected _connection: any;
    // The MongoDB database.
    protected _database: any;
    
    protected _databaseName: string;
    protected _config: ConfigParams;
    private _secureMongo: boolean;

    public MongoDbConnector(secureMongo: boolean = false) {
        this._secureMongo = secureMongo;
    }

    public async getCollection(): Promise<any> {
        return await new Promise<any>((resolve, reject) => {
            this._database.collection('test', (err, collection) => {
                if (err == null) resolve(collection);
                else reject(err);
            });
        });
    }

    public configure(config: ConfigParams): void {
        this._config = config;

        // if connection passed as uri
        if (this._config.getAsNullableString("uri") != null)
            this._config = ConnectionUtils.parseUri(this._config.getAsString("uri"), "mongodb", 27017);

        // if mongo without auth
        if (!this._secureMongo)
            this._config = ConnectionUtils.exclude(this._config, "username", "password");

    }

    public isOpen(): boolean {
        return this._connection != null && this._database != null;
    }

    private composeSettings(): any {
        let authUser = this._config.getAsNullableString("auth_user");
        let authPassword = this._config.getAsNullableString("auth_password");

        let settings: any = {};

        if (authUser != null)
            settings['auth.user'] = authUser;
        if (authPassword != null)
            settings['auth.password'] = authPassword;

        settings.useNewUrlParser = true;
        settings.useUnifiedTopology = true;

        return settings;
    }

    public async open(correlationId: string): Promise<void> {
        let collection = this._config.getAsNullableString("collection");

        this._config = ConnectionUtils.exclude(this._config, "collection");

        var uri = ConnectionUtils.composeUri(this._config, "mongodb", 27017);
        uri += "/" + collection;

        try {
            let settings = this.composeSettings();
            let MongoClient = require('mongodb').MongoClient;

            let client = await new Promise<any>((resolve, reject) => {
                MongoClient.connect(uri, settings, (err, client) => {
                    if (err == null) resolve(client);
                    else reject(err);
                });
            });

            this._connection = client;
            this._database = client.db();
            this._databaseName = this._database.databaseName;
        } catch (ex) {
            throw new ConnectionException(
                correlationId,
                "CONNECT_FAILED",
                "Connection to mongodb failed"
            ).withCause(ex);
        }
    }

    public async close(correlationId: string): Promise<void> {
        if (this._connection == null) {
            return;
        }

        await new Promise((resolve, reject) => {
            this._connection.close((err) => {
                if (err == null) resolve(null);
                else reject(err);
            });
        });

        this._connection = null;
        this._database = null;
        this._databaseName = null;
    }
}

using System.Threading.Tasks;
using MongoDB.Driver;
using PipServices3.Commons.Config;
using PipServices3.Commons.Run;
using PipServices3.Components.Connect;


namespace ExampleApp
{
    class Program
    {
        static void Main(string[] args)
        {

            var options = ConfigParams.FromTuples(
                "host", "localhost",
                "port", ",27017",
                "username", "user",
                "password", "pass123",
                "protocol", "mongodb",
                "collection", "my_db_name"
            );

            // Create connection
            var conn = new MongoDbConnector();
            conn.Configure(options);
            conn.OpenAsync(null).Wait();
        }
    }

    public class MongoDbConnector: IOpenable, IConfigurable
    {
        // The MongoDB connection object.
        protected MongoClient _connection;
        /// The MongoDB database.
        protected IMongoDatabase _database;
        /// </summary>
        protected string _databaseName;

        protected ConfigParams _config;
        private bool secureMongo = false;
        
        public MongoDbConnector(bool secureMongo = false)
        {
            this.secureMongo = secureMongo;
        }

        public IMongoCollection<T> GetCollection<T>()
        {
            return _connection.GetDatabase(_databaseName).GetCollection<T>("test");
        }

        public void Configure(ConfigParams config)
        {
            _config = config;

            // if connection passed as uri
            if (_config.GetAsNullableString("uri") != null)
                _config = ConnectionUtils.ParseUri(_config.GetAsString("uri"), "mongodb", 27017);

            // if mongo without auth
            if (!secureMongo)
                _config = ConnectionUtils.Exclude(_config, "username", "password");
        }

        public bool IsOpen()
        {
            return _connection != null && _database != null;
        }

        public async Task OpenAsync(string correlationId)
        {
            var collection = _config.GetAsNullableString("collection");

            _config = ConnectionUtils.Exclude(_config, "collection");

            var uri = ConnectionUtils.ComposeUri(_config, "mongodb", 27017);
            uri += "/" + collection;

            _connection = new MongoClient(uri);
            _databaseName = MongoUrl.Create(uri).DatabaseName;
            _database = _connection.GetDatabase(_databaseName);

            await Task.Delay(0);
        }

        public async Task CloseAsync(string correlationId)
        {
            _connection = null;
            _database = null;
            _databaseName = null;

            await Task.Delay(0);
        }
    }
}

package main

import (
	"fmt"

	"context"

	ccon "github.com/pip-services3-gox/pip-services3-components-gox/connect"

	cconf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	cerror "github.com/pip-services3-gox/pip-services3-commons-gox/errors"
	mongodrv "go.mongodb.org/mongo-driver/mongo"
	mongoclopt "go.mongodb.org/mongo-driver/mongo/options"
	"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
)

func main() {
	options := cconf.NewConfigParamsFromTuples(
		"host", "localhost",
		"port", ",27017",
		"username", "user",
		"password", "pass123",
		"protocol", "mongodb",
		"collection", "my_db_name",
	)

	conn := NewMongoDbConnector(false)
	conn.Configure(options)
	err := conn.Open("123")

	fmt.Println(err)
}

type MongoDbConnector struct {
	//   The configuration options.
	Config *cconf.ConfigParams
	//   The MongoDB connection object.
	Connection *mongodrv.Client
	//   The MongoDB database name.
	DatabaseName string
	//   The MongoDb database object.
	Db *mongodrv.Database

	secureMongo bool
}

// NewMongoDbConnector are creates a new instance of the connection component.
// Returns *MongoDbConnector with default config
func NewMongoDbConnector(secureMongo bool) *MongoDbConnector {
	c := MongoDbConnector{
		// The configuration options.
		secureMongo: secureMongo,
		Config:      cconf.NewEmptyConfigParams(),
	}
	return &c
}

func (c *MongoDbConnector) GetCollection() *mongodrv.Collection {
	return c.Db.Collection("test")
}

func (c *MongoDbConnector) Configure(ctx context.Context, config *cconf.ConfigParams) {
	c.Config = config

	// if connection passed as uri
	if *c.Config.GetAsNullableString("uri") != "" {
		c.Config = ccon.ConnectionUtils.ParseUri(c.Config.GetAsString("uri"), "mongodb", 27017)
	}

	// if mongo without auth
	if !c.secureMongo {
		c.Config = ccon.ConnectionUtils.Exclude(c.Config, "username", "password")
	}
}

func (c *MongoDbConnector) IsOpen() bool {
	return c.Connection != nil
}

func (c *MongoDbConnector) composeSettings(settings *mongoclopt.ClientOptions) {
	authUser := c.Config.GetAsString("auth_user")
	authPassword := c.Config.GetAsString("auth_password")

	// Auth params
	if authUser != "" && authPassword != "" {
		authParams := mongoclopt.Credential{
			Username: authUser,
			Password: authPassword,
		}
		settings.SetAuth(authParams)
	}
}

func (c *MongoDbConnector) Open(ctx context.Context, correlationId string) error {
	collection := c.Config.GetAsNullableString("collection")

	c.Config = ccon.ConnectionUtils.Exclude(c.Config, "collection")

	uri := ccon.ConnectionUtils.ComposeUri(c.Config, "mongodb", 27017)
	uri += "/" + *collection

	settings := mongoclopt.Client()
	settings.ApplyURI(uri)
	c.composeSettings(settings)

	client, err := mongodrv.NewClient(settings)

	if err != nil {
		err = cerror.NewConnectionError(correlationId, "CONNECT_FAILED", "Create client for mongodb failed").WithCause(err)
		return err
	}
	cs, _ := connstring.Parse(uri)
	c.DatabaseName = cs.Database

	err = client.Connect(ctx)
	if err != nil {
		err = cerror.NewConnectionError(correlationId, "CONNECT_FAILED", "Connection to mongodb failed").WithCause(err)
		return err
	}
	c.Connection = client
	c.Db = client.Database(c.DatabaseName)
	return nil
}

func (c *MongoDbConnector) Close(ctx context.Context, correlationId string) error {
	if c.Connection == nil {
		return nil
	}

	err := c.Connection.Disconnect(ctx)
	c.Connection = nil
	c.Db = nil
	c.DatabaseName = ""

	if err != nil {
		err = cerror.NewConnectionError(correlationId, "DISCONNECT_FAILED", "Disconnect from mongodb failed: ").WithCause(err)
	}

	return err
}

import 'package:mongo_dart/mongo_dart.dart' as mongo;

import 'package:pip_services3_commons/pip_services3_commons.dart';
import 'package:pip_services3_components/pip_services3_components.dart';

void main(List<String> argument) async {
  var options = ConfigParams.fromTuples([
    'host',
    'localhost',
    'port',
    ',27017',
    'username',
    'user',
    'password',
    'pass123',
    'protocol',
    'mongodb',
    'collection',
    'my_db_name'
  ]);

  // Create connection
  var conn = MongoDbConnector();
  conn.configure(options);
  await conn.open(null);
  await conn.close(null);
}

class MongoDbConnector implements IConfigurable, IOpenable {
  // The MongoDB database.
  mongo.Db? _database;

  bool _secureMongo;
  ConfigParams? _config;
  String? databaseName;

  MongoDbConnector({bool secureMongo = false}) : _secureMongo = secureMongo;

  @override
  void configure(ConfigParams config) {
    _config = config;

    // if connection passed as uri
    if (_config!.getAsNullableString('uri') != null) {
      _config = ConnectionUtils.parseUri(
          _config!.getAsString('uri'), 'mongodb', 27017);
    }

    // if mongo without auth
    if (!_secureMongo) {
      _config = ConnectionUtils.exclude(_config!, ['username', 'password']);
    }
  }

  mongo.DbCollection getCollection() {
    return _database!.collection('test');
  }

  @override
  bool isOpen() {
    return _database != null;
  }

  Map<String, dynamic> _composeSettings() {
    var authUser = _config!.getAsNullableString('auth_user');
    var authPassword = _config!.getAsNullableString('auth_password');

    var settings = <String, dynamic>{
      'useNewUrlParser': true,
      'useUnifiedTopology': true
    };

    if (authUser != null) {
      settings['user'] = authUser;
    }
    if (authPassword != null) {
      settings['password'] = authPassword;
    }

    return settings;
  }

  @override
  Future open(String? correlationId) async {
    var collection = _config!.getAsString('collection');

    _config = ConnectionUtils.exclude(_config!, ['collection']);
    var uri = ConnectionUtils.composeUri(_config!, 'mongodb', 27017);
    uri += '/' + collection;

    try {
      var settings = _composeSettings();

      _database = mongo.Db(uri);
      await _database!.open();
      if (settings['username'] != null) {
        await _database!
            .authenticate(settings['username'], settings['password']);
      }

      databaseName = _database?.databaseName;
    } catch (ex) {
      throw ConnectionException(
              correlationId, 'CONNECT_FAILED', 'Connection to mongodb failed')
          .withCause(ex);
    }
  }

  @override
  Future close(String? correlationId) async {
    if (_database == null) {
      return null;
    }
    try {
      await _database!.close();
      _database = null;
      databaseName = null;
    } catch (err) {
      throw ConnectionException(correlationId, 'DISCONNECT_FAILED',
              'Disconnect from mongodb failed: ')
          .withCause(err);
    }
  }
}
from typing import Optional

import pymongo
from pip_services3_commons.config import IConfigurable, ConfigParams

from pip_services3_commons.run import IOpenable
from pymongo.collection import Collection

from pip_services3_components.connect import ConnectionUtils


class MongoDbConnector(IOpenable, IConfigurable):
    def __init__(self, secure_mongo=False):
        # The MongoDB connection object.
        self._connection: pymongo.MongoClient = None

        self._secure_mongo = secure_mongo
        self._config = ConfigParams()

    def is_open(self) -> bool:
        return self._connection is not None

    def get_collection(self) -> Collection:
        return self._connection.get_database().get_collection('test')

    def configure(self, config: ConfigParams):
        self._config = config

        # if connection passed as uri
        if self._config.get_as_nullable_string('uri'):
            self._config = ConnectionUtils.parse_uri(self._config.get_as_string('uri'), 'mongodb', 27017)

        # if mongo without auth
        if not self._secure_mongo:
            self._config = ConnectionUtils.exclude(self._config, 'username', 'password')

    def open(self, correlation_id: Optional[str]):
        collection = self._config.get_as_nullable_string('collection')
        self._config = ConnectionUtils.exclude(self._config, 'collection')

        uri = ConnectionUtils.compose_uri(self._config, 'mongodb', 27017)
        uri += '/' + collection

        self._connection = pymongo.MongoClient(uri)

    def close(self, correlation_id: Optional[str]):
        self._connection.close()
        self._connection = None


options = ConfigParams.from_tuples(
    "host", "localhost",
    "port", ",27017",
    "username", "user",
    "password", "pass123",
    "protocol", "mongodb",
    'collection', 'my_db_name'
)
# Create connection
conn = MongoDbConnector()
conn.configure(options)
conn.open(None)

Not available

Wrapping up

In this tutorial, we have seen how to use a set of utilities offered in the Component module that facilitate the programming of connections.

We learned that these utilities help us to compose a URI from a set of parameters stored in a ConfigParams object, merge two sets of configuration parameters, filter a ConfigParams object, and decompose a URI string.

In the end, we saw an example that illustrated the practical use of some of these utilities.