import { ConfigParams, ICleanable } from "pip-services3-commons-nodex";
import { KafkaMessageQueue } from "pip-services3-kafka-nodex";
import { IMessageQueue, IMessageReceiver, MessageEnvelope } from "pip-services3-messaging-nodex";

class MyMessageReceiver implements IMessageReceiver, ICleanable {
    private _messages: MessageEnvelope[] = [];

    constructor() { }

    public get messages(): MessageEnvelope[] {
        return this._messages;
    }

    public get messageCount(): number {
        return this._messages.length;
    }

    public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
        this._messages.push(envelope);
    }

    public async clear(correlationId: string): Promise<void> {
        this._messages = [];
    }
}
                      
export async function main() {

    let queue = new KafkaMessageQueue();
    queue.configure(ConfigParams.fromTuples(
        "topic", "mytopic2",
        'connection.protocol', 'tcp',
        "connection.host", "localhost",
        "connection.port", 9092,
        "options.autosubscribe", true
    ));

    await queue.open(null);

    let messageReceiver = new MyMessageReceiver();
    await queue.beginListen(null, messageReceiver);

    let envelope1 = new MessageEnvelope('123', 'Test', 'Test message');
    await queue.send(null, envelope1);

    // await message
    for (let i = 0; i < 15; i++) {
        if (messageReceiver.messages.length > 0) {
            break;
        }
        await new Promise<void>((resolve, reject) => {
            setTimeout(resolve, 500);
        });
    }

    let envelope2 = messageReceiver.messages[0];
    console.log(envelope1.getMessageAsString() == envelope2.getMessageAsString());

    await queue.endListen(null);
    await queue.close(null);
}