Skip to content

Use Set instead of Array for Condition @waiting queue to avoid memory leak #186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

oeoeaio
Copy link

@oeoeaio oeoeaio commented Oct 27, 2022

Related to #176

We found that calling dequeue on instances of Queue (and LimitedQueue) inside a Task.with_timeout block inside a loop will result in fibers accumulating in the @waiting list for the Queue in the absence of any other fiber signalling the Queue, leading to a memory leak.

Our short term fix for this was just to manually signal the Queue whenever the timeout was reached, but this only works where a single fibre is waiting on the queue with a timeout in a loop.

Switching to using a Set instead of an Array to hold the list of waiting fibres resolves the core issue. I don't think this will break anything as I can't really see a use case for having the same fibre in the @waiting list multiple times.

Keen for input on what tests might be appropriate for this, if any. Is suppose the main thing is that this change does not break the existing test suite.

Types of Changes

  • Bug fix.

Contribution

Profiling code

#!/usr/bin/env ruby

require 'async'
require 'async/queue'
require 'memory_profiler'

def produce(input_channel)
  Async do
    input_channel.enqueue('some infrequent message')
  end
end

def consume(input_channel)
  Async do |task|
    buffer = []
    report = MemoryProfiler.report do
      10000.times do
        # This timeout is artificially tight to demonstrate the memory leak
        task.with_timeout(0.001) do
          buffer << input_channel.dequeue
        rescue Async::TimeoutError
          # do something with the buffer
        end
        # do something with the buffer
      end
    end
    report.pretty_print
  end
end

Async do
  input_channel = Async::Queue.new
  produce(input_channel)
  consume(input_channel)
end

Profiling before this change

retained memory by location

399960  /usr/local/bundle/gems/async-2.2.1/lib/async/condition.rb:37
   584  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:73
   160  /usr/local/bundle/gems/timers-4.3.5/lib/timers/timer.rb:25
    88  /usr/local/bundle/gems/timers-4.3.5/lib/timers/group.rb:46
    40  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:273
    40  /usr/local/bundle/gems/timers-4.3.5/lib/timers/events.rb:62

retained memory by class

399960  Async::Condition::Queue
   584  Thread::Backtrace
   160  Proc
    88  Timers::Timer
    40  Async::TimeoutError
    40  Timers::Events::Handle

Profiling after this change

retained memory by location

   584  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:73
   384  /usr/local/lib/ruby/3.1.0/set.rb:522
   192  /usr/local/lib/ruby/3.1.0/set.rb:540
   160  /usr/local/bundle/gems/timers-4.3.5/lib/timers/timer.rb:25
    88  /usr/local/bundle/gems/timers-4.3.5/lib/timers/group.rb:46
    40  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:273
    40  /usr/local/bundle/gems/timers-4.3.5/lib/timers/events.rb:62

retained memory by class

   584  Thread::Backtrace
   576  Hash
   160  Proc
    88  Timers::Timer
    40  Async::TimeoutError
    40  Timers::Events::Handle

@ioquatix
Copy link
Member

ioquatix commented Oct 27, 2022

The ideal solution for this is to use a linked list. Short of a native implementation, we could fall back to using a native Ruby mutex/queue. It might even be faster, but the semantics aren’t quite the same. A Ruby mutex uses a linked list internally and my goal was to preserve FIFO order. I like your PR but I think we might need to be a little more sensitive to order and I’m not sure about performance either.

Do you mind opening another PR using Thread::Queue as the queue? Let’s see if it works. In Ruby 3+ Thread::Queue supports non-blocking fibers.

@ioquatix ioquatix mentioned this pull request Oct 29, 2022
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants