
MQTT connection using the default driver.

Implements: IMessageQueueConnection, IReferenceable, IConfigurable, IOpenable


The MqttConnection class allows you to create MQTT connections using the default driver.

Configuration parameters

  • client_id: (optional) name of the client id
  • connection(s):
    • discovery_key: (optional) key to retrieve the connection from IDiscovery
    • host: host name or IP address
    • port: port number
    • uri: resource URI or connection string with all parameters in it
  • credential(s):
    • store_key: (optional) key to retrieve the credentials from ICredentialStore
    • username: username
    • password: user’s password
  • options:
    • retry_connect: (optional) turns on/off automated reconnect when connection is log (default: true)
    • connect_timeout: (optional) number of milliseconds to wait for connection (default: 30000)
    • reconnect_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 1000)
    • keepalive_timeout: (optional) number of milliseconds to ping broker while inactive (default: 3000)


  • *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 - (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 - (optional) IDiscovery services
  • *:credential-store:*:*:1.0 (optional) credential stores to resolve credentials


Creates a new instance of the connection component.




The hostname as client id

_client_id: str


Connection timeout

_connect_timeout: int = 30000


MQTT connection pool object

_connection: mqtt.Client


Connection resolver

_connection_resolver: MqttConnectionResolver


Keep alive timeout

_keep_alive_timeout: int = 60000



_logger: CompositeLogger = CompositeLogger()


Connection options

_options: ConfigParams = ConfigParams()


Reconnect timeout

_reconnect_timeout: int = 1000


Retry option

_retry_connect: bool = true


Topic subscriptions

_subscriptions: List[MqttSubscription] = []

Instance methods


Checks if the connection is open.
Raises an error if the connection is closed.



Closes a component and frees used resources.

close(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.


Configures the component by passing its configuration parameters.

configure(config: ConfigParams)

  • config: ConfigParams - configuration parameters to be set.


Creates a message queue. If the connection doesn’t support this function, it exists without error.

create_queue(name: str)

  • name: str - name of the queue to be created.


Deletes a message queue. If connection doesn’t support this function, it exists without error.

delete_queue(name: str)

  • name: str - name of the queue to be deleted.


Gets the connection.

getConnection(): Any

  • returns: Any - connection to a MySQL database


Checks if the component is open.

is_open(): bool

  • returns: bool - true if the component is open and false otherwise.


Opens the component.

open(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.


Publish a message to a specified topic.

publish(topic: str, data: Any, options: dict)

  • topic: str - topic name
  • data: Any - message to be published
  • options: dict - publishing options


Reads a list of registered queue names. If the connection doesn’t support this function, it returns an empty list.

read_queue_names(): List[str]

  • returns: List[str] - queue names.


Sets references to dependent components.

set_references(references: IReferences)

  • references: IReferences - references to locate the component dependencies.


Subscribes to a topic

subscribe(topic: str, options: dict, listener: IMqttMessageListener)

  • topic: str - topic name
  • options: dict - subscription options
  • listener: IMqttMessageListener - message listener


Unsubscribes from a previously subscribed topic.

unsubscribe(topic: str, listener: IMqttMessageListener

See also