Skip to content

Commit c07b367

Browse files
RUBY-3680 Close pipe fds
1 parent e741df4 commit c07b367

File tree

6 files changed

+97
-3
lines changed

6 files changed

+97
-3
lines changed

lib/mongo/server/connection_pool.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,7 @@ def close(options = nil)
671671

672672
@max_connecting_cv.broadcast
673673
@size_cv.broadcast
674+
@generation_manager.close_all_pipes
674675
end
675676

676677
publish_cmap_event(

lib/mongo/server/connection_pool/generation_manager.rb

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@ def generation_unlocked(service_id: nil)
4747
end
4848

4949
def pipe_fds(service_id: nil)
50-
@pipe_fds[service_id][@map[service_id]]
50+
@pipe_fds.dig(service_id, @map[service_id])
5151
end
5252

5353
def remove_pipe_fds(generation, service_id: nil)
5454
validate_service_id!(service_id)
5555

5656
r, w = @pipe_fds[service_id].delete(generation)
57+
return unless r && w
58+
5759
w.close
5860
# Schedule the read end of the pipe to be closed. We cannot close it
5961
# immediately since we need to wait for any Kernel#select calls to
@@ -89,8 +91,31 @@ def bump(service_id: nil)
8991
end
9092
end
9193

94+
# Close all pipes in the generation manager.
95+
#
96+
# This method should be called only when the +ConnectionPool+ that
97+
# owns this +GenerationManager+ is closed, to ensure that all
98+
# pipes are closed properly.
99+
def close_all_pipes
100+
@lock.synchronize do
101+
close_all_scheduled
102+
@pipe_fds.keys.each do |service_id|
103+
generations = @pipe_fds.delete(service_id)
104+
generations.values.each do |(r, w)|
105+
r.close
106+
w.close
107+
rescue IOError
108+
# Ignore any IOError that occurs when closing the
109+
# pipe, as there is nothing we can do about it.
110+
end
111+
end
112+
end
113+
end
114+
115+
92116
private
93117

118+
94119
def validate_service_id!(service_id)
95120
if service_id
96121
unless server.load_balancer?

lib/mongo/socket.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ class Socket
6464
# connection (for non-monitoring connections) that created this socket.
6565
# @option options [ true | false ] :monitor Whether this socket was
6666
# created by a monitoring connection.
67+
# @option options :pipe [ IO ] The file descriptor for the read end of the
68+
# pipe to listen on during the select system call when reading from the
69+
# socket.
6770
#
6871
# @api private
6972
def initialize(timeout, options)
@@ -106,6 +109,13 @@ def monitor?
106109
!!options[:monitor]
107110
end
108111

112+
# @return [ IO ] The file descriptor for the read end of the pipe to
113+
# listen on during the select system call when reading from the
114+
# socket.
115+
def pipe
116+
options[:pipe]
117+
end
118+
109119
# @return [ String ] Human-readable summary of the socket for debugging.
110120
#
111121
# @api private
@@ -161,7 +171,7 @@ def close
161171
begin
162172
# Sometimes it seems the close call can hang for a long time
163173
::Timeout.timeout(5) do
164-
@socket.close
174+
@socket&.close
165175
end
166176
rescue
167177
# Silence all errors
@@ -390,7 +400,6 @@ def read_from_socket(length, socket_timeout: nil, csot: false)
390400
raise_timeout_error!("Took more than #{_timeout} seconds to receive data", csot)
391401
end
392402
end
393-
pipe = options[:pipe]
394403
if exc.is_a?(IO::WaitReadable)
395404
if pipe
396405
select_args = [[@socket, pipe], nil, [@socket, pipe], select_timeout]
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# frozen_string_literal: true
2+
3+
require 'spec_helper'
4+
5+
describe Mongo::Server::ConnectionPool::GenerationManager do
6+
describe '#close_all_pipes' do
7+
let(:service_id) { 'test_service_id' }
8+
9+
let(:server) { instance_double('Mongo::Server') }
10+
11+
let(:manager) { described_class.new(server: server) }
12+
13+
before do
14+
manager.pipe_fds(service_id: service_id)
15+
end
16+
17+
it 'closes all pipes' do
18+
expect(manager.pipe_fds(service_id: service_id).size).to eq(2)
19+
20+
manager.instance_variable_get(:@pipe_fds)[service_id].each do |gen, (r, w)|
21+
expect(r).to receive(:close)
22+
expect(w).to receive(:close)
23+
end
24+
25+
manager.close_all_pipes
26+
end
27+
28+
it 'removes all pipes from the map' do
29+
expect(manager.pipe_fds(service_id: service_id).size).to eq(2)
30+
31+
manager.instance_variable_get(:@pipe_fds)[service_id].each do |gen, (r, w)|
32+
expect(r).to receive(:close)
33+
expect(w).to receive(:close)
34+
end
35+
36+
manager.close_all_pipes
37+
end
38+
end
39+
end

spec/mongo/server/connection_pool_spec.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,6 +1215,11 @@ def create_pool(min_pool_size)
12151215
expect(pool).to be_closed
12161216
end
12171217
end
1218+
1219+
it 'closes all pipes' do
1220+
expect(pool.generation_manager).to receive(:close_all_pipes).and_call_original
1221+
pool.close
1222+
end
12181223
end
12191224

12201225
describe '#inspect' do

spec/mongo/socket_spec.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,19 @@
167167
end
168168
end
169169
end
170+
171+
describe '#close' do
172+
let(:pipe) { instance_double(IO) }
173+
174+
let(:socket) do
175+
described_class.new(0, {pipe: pipe})
176+
end
177+
178+
context 'when pipe is provided' do
179+
it 'closes the pipe' do
180+
expect(pipe).to receive(:close)
181+
socket.close
182+
end
183+
end
184+
end
170185
end

0 commit comments

Comments
 (0)