Skip to content

to_kafka throughput #398

@roveo

Description

@roveo

I'm testing to_kafka sink and its throughput is limited by polltime (0.2 sec). Looks like self.producer.poll(0) only polls for one message at a time and so only one callback is called every 0.2 seconds.

This fails:

def test_to_kafka_throughput():
    ARGS = {'bootstrap.servers': 'localhost:9092'}
    with kafka_service() as kafka:
        _, TOPIC = kafka
        source = Stream.from_iterable(range(100)).map(lambda x: str(x).encode())
        kafka = source.to_kafka(TOPIC, ARGS)
        out = kafka.sink_to_list()

        source.start()
        wait_for(
            lambda: len(out) == 100,
            5,
            period=0.1,
            fail_func=lambda: print("len(out) ==", len(out))
        )

The existing test_to_kafka test doesn't catch this, because it starts waiting on the result only after all the items are emitted.

I spent some time tinkering with the code, but can't figure out what's wrong and how to fix this, so any ideas are appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions