Skip to content

feat: add operation timeout support #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
== 7.0.0-beta.4 2025-02-24

Improvements:
* Added support for per-operation timeout.
* Added global setting for the default per-operation timeout.
* FFMPEG::Status objects now keep track of the duration of operations.

Fixes:
* No progress reported for audio-only transcoding operations.

== 7.0.0-beta.3 2025-02-10

Improvements:
Expand Down
40 changes: 22 additions & 18 deletions lib/ffmpeg.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ module FFMPEG

class << self
attr_writer :logger, :reporters
attr_accessor :timeout

# Get the FFMPEG logger.
#
Expand All @@ -59,6 +60,11 @@ def logger
@logger ||= Logger.new($stdout, level: Logger::INFO)
end

# Get the reporters that are used by default to parse the output of the ffmpeg command.
def reporters
@reporters ||= [FFMPEG::Reporters::Progress]
end

# Get the timeout that's used when waiting for ffmpeg output.
# Defaults to 30 seconds.
#
Expand All @@ -85,11 +91,6 @@ def io_encoding=(encoding)
FFMPEG::IO.encoding = encoding
end

# Get the reporters that are used by default to parse the output of the ffmpeg command.
def reporters
@reporters ||= [FFMPEG::Reporters::Progress]
end

# Set the path to the ffmpeg binary.
#
# @param path [String]
Expand Down Expand Up @@ -143,26 +144,29 @@ def ffmpeg_popen3(*args, &)
# @param reporters [Array<FFMPEG::Reporters::Output>] The reporters to use to parse the output.
# @yield [report] Reports from the ffmpeg command (see FFMPEG::Reporters).
# @return [FFMPEG::Status]
def ffmpeg_execute(*args, status: nil, reporters: nil)
def ffmpeg_execute(*args, status: nil, reporters: nil, timeout: nil)
status ||= FFMPEG::Status.new
reporters ||= self.reporters
timeout ||= self.timeout

status.bind!(
status.bind! do
ffmpeg_popen3(*args) do |_stdin, stdout, stderr, wait_thr|
stderr.each(chomp: true) do |line|
reporter = reporters.find { |r| r.match?(line) }
status.puts(line) if reporter.nil? || reporter.log?
Timeout.timeout(timeout) do
stderr.each(chomp: true) do |line|
reporter = reporters.find { |r| r.match?(line) }
status.output.puts(line) if reporter.nil? || reporter.log?

next unless reporter && block_given?
next unless reporter && block_given?

yield reporter.new(line)
end
yield reporter.new(line)
end

::IO.copy_stream(stdout, status.output) if status.empty?
::IO.copy_stream(stdout, status.output) if status.output.string.empty?

wait_thr.value
wait_thr.value
end
end
)
end
end

# Execute a ffmpeg command and raise an error
Expand All @@ -172,8 +176,8 @@ def ffmpeg_execute(*args, status: nil, reporters: nil)
# @param reporters [Array<FFMPEG::Reporters::Output>] The reporters to use to parse the output.
# @yield [report] Reports from the ffmpeg command (see FFMPEG::Reporters).
# @return [FFMPEG::Status]
def ffmpeg_execute!(*args, status: nil, reporters: nil)
ffmpeg_execute(*args, status:, reporters:).assert!
def ffmpeg_execute!(*args, status: nil, reporters: nil, timeout: nil)
ffmpeg_execute(*args, status:, reporters:, timeout:).assert!
end

# Get the path to the ffprobe binary.
Expand Down
8 changes: 4 additions & 4 deletions lib/ffmpeg/media.rb
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ def local?
# @param inargs [Array<String>] The arguments to pass before the input.
# @yield [report] Reports from the ffmpeg command (see FFMPEG::Reporters).
# @return [Process::Status]
def ffmpeg_execute(*args, inargs: [], status: nil, reporters: nil, &block)
FFMPEG.ffmpeg_execute(*inargs, '-i', path, *args, status:, reporters:, &block)
def ffmpeg_execute(*args, inargs: [], status: nil, reporters: nil, timeout: nil, &block)
FFMPEG.ffmpeg_execute(*inargs, '-i', path, *args, status:, reporters:, timeout:, &block)
end

# Execute a ffmpeg command with the media as input
Expand All @@ -506,8 +506,8 @@ def ffmpeg_execute(*args, inargs: [], status: nil, reporters: nil, &block)
# @param inargs [Array<String>] The arguments to pass before the input.
# @yield [report] Reports from the ffmpeg command (see FFMPEG::Reporters).
# @return [Process::Status]
def ffmpeg_execute!(*args, inargs: [], status: nil, reporters: nil, &block)
ffmpeg_execute(*args, inargs:, status:, reporters:, &block).assert!
def ffmpeg_execute!(*args, inargs: [], status: nil, reporters: nil, timeout: nil, &block)
ffmpeg_execute(*args, inargs:, status:, reporters:, timeout:, &block).assert!
end
end
end
15 changes: 13 additions & 2 deletions lib/ffmpeg/preset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,19 @@ def args(media)
# @param output_path [String, Pathname] The path to the output file.
# @yield The block to execute when progress is made.
# @return [FFMPEG::Transcoder::Status] The status of the transcoding process.
def transcode(media, output_path, &)
FFMPEG::Transcoder.new(presets: [self]).process(media, output_path, &)
def transcode(media, output_path, timeout: nil, &)
FFMPEG::Transcoder.new(presets: [self], timeout:).process(media, output_path, &)
end

# Transcode the media to the output path and raise an error
# if the process did not finish successfully.
#
# @param media [Media] The media to transcode.
# @param output_path [String, Pathname] The path to the output file.
# @yield The block to execute when progress is made.
# @return [FFMPEG::Transcoder::Status] The status of the transcoding process.
def transcode!(media, output_path, timeout: nil, &)
transcode(media, output_path, timeout:, &).assert!
end
end
end
2 changes: 1 addition & 1 deletion lib/ffmpeg/reporters/progress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Progress < Output
def self.log? = false

def self.match?(line)
line.match?(/^\s*frame=/)
line.match?(/^\s*(?:size|time|frame)=/)
end

# Returns the current frame number.
Expand Down
30 changes: 14 additions & 16 deletions lib/ffmpeg/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,34 @@ def initialize(message, output)
end
end

attr_reader :output, :upstream
attr_reader :duration, :output, :upstream

def initialize
@mutex = Mutex.new
@output = StringIO.new
end

def puts(*args)
output.puts(*args)
end

# Returns true if the output is empty.
def empty?
output.string.empty?
end

# Raises an error if the subprocess did not finish successfully.
def assert!
return self if success?

message = output.string.match(/\b(?:error|invalid|failed|could not)\b.+$/i)
message = @output.string.match(/\b(?:error|invalid|failed|could not)\b.+$/i)
message ||= 'FFmpeg exited with non-zero exit status'

raise ExitError.new("#{message} (code: #{exitstatus})", output.string)
raise ExitError.new("#{message} (code: #{exitstatus})", @output.string)
end

# Binds the status to an upstream Process::Status object.
def bind!(upstream)
@upstream = upstream

freeze
def bind!
@mutex.synchronize do
t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@upstream = yield
t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@duration = t1 - t0
@output.close_write

freeze
end
end

private
Expand Down
6 changes: 4 additions & 2 deletions lib/ffmpeg/transcoder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ def media(*ffprobe_args, load: true, autoload: true)
end
end

attr_reader :name, :metadata, :presets, :reporters
attr_reader :name, :metadata, :presets, :reporters, :timeout

def initialize(name: nil, metadata: nil, presets: [], reporters: nil, &compose_inargs)
def initialize(name: nil, metadata: nil, presets: [], reporters: nil, timeout: nil, &compose_inargs)
@name = name
@metadata = metadata
@presets = presets
@reporters = reporters
@timeout = timeout
@compose_inargs = compose_inargs
end

Expand Down Expand Up @@ -86,6 +87,7 @@ def process(media, output_path, &)
*args,
inargs:,
reporters:,
timeout:,
status: Status.new(output_paths),
&
)
Expand Down
2 changes: 1 addition & 1 deletion lib/ffmpeg/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module FFMPEG
VERSION = '7.0.0-beta.3'
VERSION = '7.0.0-beta.4'
end
8 changes: 5 additions & 3 deletions spec/ffmpeg/media_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,15 @@ module FFMPEG
it 'calls assert! on the result of ffmpeg_execute' do
inargs = [SecureRandom.hex]
args = [SecureRandom.hex]
reporters = [SecureRandom.hex]
status = instance_double(Status)
reporters = [SecureRandom.hex]
timeout = rand(999)
block = proc {}

expect(status).to receive(:assert!).and_return(status)
expect(subject).to receive(:ffmpeg_execute).with(*args, inargs:, status:, reporters:, &block).and_return(status)
expect(subject.ffmpeg_execute!(*args, inargs:, status:, reporters:, &block)).to be(status)
expect(subject).to receive(:ffmpeg_execute).with(*args, inargs:, status:, reporters:, timeout:,
&block).and_return(status)
expect(subject.ffmpeg_execute!(*args, inargs:, status:, reporters:, timeout:, &block)).to be(status)
end
end
end
Expand Down
8 changes: 5 additions & 3 deletions spec/ffmpeg/reporters/progress_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ module Reporters
end

describe '.match?' do
context 'when the line starts with a frame number' do
context 'when the line starts with size, time or frame' do
it 'returns true' do
expect(Progress.match?('size=1')).to be(true)
expect(Progress.match?('time=1')).to be(true)
expect(Progress.match?('frame=1')).to be(true)
end
end

context 'when the line does not start with a frame number' do
context 'when the line does not start with size, time or frame' do
it 'returns false' do
expect(Progress.match?('size=1')).to be(false)
expect(Progress.match?('foo=1')).to be(false)
end
end
end
Expand Down
41 changes: 28 additions & 13 deletions spec/ffmpeg/status_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

module FFMPEG
describe Status do
let(:upstream) { double('upstream') }
let(:output) { StringIO.new }
let(:upstream) { instance_double(Process::Status) }

subject { described_class.new }

before do
allow(StringIO).to receive(:new).and_return(output)
end

describe '#assert!' do
before do
subject.bind!(upstream)
subject.bind! { upstream }
end

context 'when the process was successful' do
Expand All @@ -25,12 +30,13 @@ module FFMPEG
before do
allow(upstream).to receive(:success?).and_return(false)
allow(upstream).to receive(:exitstatus).and_return(999)

subject.output.puts('Copyright (c) 2000-2024 the FFmpeg developers')
subject.output.puts('Press [q] to stop, [?] for help')
subject.output.puts('[vf#0:0 @ 0x000000000000] Error reinitializing filters!')
subject.output.puts('frame= 0 fps=0.0 q=0.0 Lsize= 0KiB time=N/A bitrate=N/A speed=N/A')
subject.output.puts('Conversion failed!')
allow(output).to receive(:string).and_return([
'Copyright (c) 2000-2024 the FFmpeg developers',
'Press [q] to stop, [?] for help',
'[vf#0:0 @ 0x000000000000] Error reinitializing filters!',
'frame= 0 fps=0.0 q=0.0 Lsize= 0KiB time=N/A bitrate=N/A speed=N/A',
'Conversion failed!'
].join("\n"))
end

it 'raises an error' do
Expand All @@ -40,20 +46,29 @@ module FFMPEG
end

describe '#bind!' do
before { subject.bind!(upstream) }
before do
subject.bind! do
sleep(0.1)
upstream
end
end

it 'freezes the object' do
expect(subject).to be_frozen
expect { subject.bind!('foo') }.to raise_error(FrozenError)
expect { subject.bind! { 'foo' } }.to raise_error(FrozenError)
end

it 'measures the duration of the block' do
expect(subject.duration).to be >= 0.1
end
end

describe '#method_missing' do
before { subject.bind!(upstream) }
before { subject.bind! { upstream } }

it 'delegates to the upstream object' do
expect(upstream).to receive(:foo).and_return('bar')
expect(subject.foo).to eq('bar')
expect(upstream).to receive(:exitstatus).and_return(999)
expect(subject.exitstatus).to eq(999)
end
end
end
Expand Down
22 changes: 18 additions & 4 deletions spec/ffmpeg_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,31 @@

context 'when ffmpeg hangs' do
before do
FFMPEG::IO.timeout = 0.5
FFMPEG.ffmpeg_binary = fixture_file('bin/ffmpeg-hanging')
end

after do
FFMPEG::IO.remove_instance_variable(:@timeout)
FFMPEG.ffmpeg_binary = nil
end

it 'raises IO::TimeoutError' do
expect { described_class.ffmpeg_execute(*args) }.to raise_error(IO::TimeoutError)
context 'with IO timeout set' do
before do
FFMPEG::IO.timeout = 0.5
end

after do
FFMPEG::IO.remove_instance_variable(:@timeout)
end

it 'raises IO::TimeoutError' do
expect { described_class.ffmpeg_execute(*args) }.to raise_error(IO::TimeoutError)
end
end

context 'with operation timeout set' do
it 'raises Timeout::Error' do
expect { described_class.ffmpeg_execute(*args, timeout: 0.5) }.to raise_error(Timeout::Error)
end
end
end
end
Expand Down