Skip to content

WriteApi should use ThreadPoolScheduler for batching #561

Closed
@goznauk

Description

@goznauk

When creating WriteApi instance, window_with_time_or_count is called, which is usingTimeoutScheduler for default. This creates new thread for every flush interval.

    def __init__(self,
                 influxdb_client,
                 write_options: WriteOptions = WriteOptions(),
                 point_settings: PointSettings = PointSettings(),
                 **kwargs) -> None:
   
        ...

        if self._write_options.write_type is WriteType.batching:
            # Define Subject that listen incoming data and produces writes into InfluxDB
            self._subject = Subject()

            self._disposable = self._subject.pipe(
                # Split incoming data to windows by batch_size or flush_interval
                ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval)),
                ...
  

Instead of TimeoutScheduler creating millions of threads, we can just use ThreadPoolScheduler(1) for handling window. TimeoutScheduler might not make sense to use for python. It can easily changed by just a line of code.

   ops.window_with_time_or_count(count=write_options.batch_size,
                                              timespan=timedelta(milliseconds=write_options.flush_interval), scheduler=ThreadPoolScheduler(1)),
         

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions