Skip to content

Commit ba9dc2d

Browse files
committed
Update index safely between threads. Fixes fluent#160 and fluent#326.
As mentioned in a warning, as well as fluent#326 and fluent#160, the process of determining the index added to the default object key is not thread-safe. This adds some thread-safety until version 2.x is out where chunk_id is used instead of an index value. This is not a perfect implementation, since there can still be races between different workers if workers are enabled in fluentd, or if there are multiple fluentd instances uploading to the same bucket. This commit is just to resolve this problem short-term in a way that's backwards compatible.
1 parent f957b40 commit ba9dc2d

File tree

1 file changed

+16
-3
lines changed

1 file changed

+16
-3
lines changed

lib/fluent/plugin/out_s3.rb

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require 'fluent/log-ext'
33
require 'fluent/timezone'
44
require 'aws-sdk-s3'
5+
require 'concurrent'
56
require 'zlib'
67
require 'time'
78
require 'tempfile'
@@ -223,7 +224,7 @@ def configure(conf)
223224
# For backward compatibility
224225
# TODO: Remove time_slice_format when end of support compat_parameters
225226
@configured_time_slice_format = conf['time_slice_format']
226-
@values_for_s3_object_chunk = {}
227+
@values_for_s3_object_chunk = Concurrent::Hash.new
227228
@time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
228229
end
229230

@@ -251,6 +252,9 @@ def start
251252
@s3 = Aws::S3::Resource.new(client: s3_client)
252253
@bucket = @s3.bucket(@s3_bucket)
253254

255+
@index = Concurrent::AtomicFixnum.new(-1)
256+
@time_slice = Concurrent::AtomicReference.new
257+
254258
check_apikeys if @check_apikey_on_start
255259
ensure_bucket if @check_bucket
256260
ensure_bucket_lifecycle
@@ -273,8 +277,18 @@ def write(chunk)
273277
@time_slice_with_tz.call(metadata.timekey)
274278
end
275279

280+
# If we set a new time slice, then reset our index.
281+
# There is a small race here, where a new time slice can have an old index set.
282+
# This shouldn't be a problem if @check_object is enabled but could cause overwrites
283+
# otherwise, when the old index is reached on the new timeslice
284+
if @time_slice.get_and_set(time_slice) != time_slice
285+
@index.value= -1
286+
end
287+
276288
if @check_object
277289
begin
290+
i = @index.increment
291+
278292
@values_for_s3_object_chunk[chunk.unique_id] ||= {
279293
"%{hex_random}" => hex_random(chunk),
280294
}
@@ -284,7 +298,7 @@ def write(chunk)
284298
}
285299
values_for_s3_object_key_post = {
286300
"%{time_slice}" => time_slice,
287-
"%{index}" => sprintf(@index_format,i),
301+
"%{index}" => sprintf(@index_format, i),
288302
}.merge!(@values_for_s3_object_chunk[chunk.unique_id])
289303
values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled
290304

@@ -302,7 +316,6 @@ def write(chunk)
302316
end
303317
end
304318

305-
i += 1
306319
previous_path = s3path
307320
end while @bucket.object(s3path).exists?
308321
else

0 commit comments

Comments
 (0)