diff --git a/CHANGELOG.md b/CHANGELOG.md index e8dcf55..0035de1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,4 +2,6 @@ - Initial commit of... - `FutureOr String isExecutable(path)`. - - `ExitCode`. + - `ExitCode` + - `ProcessManager` and `Spawn` + - `sharedStdIn` and `SharedStdIn` diff --git a/README.md b/README.md index ac5833f..088d31d 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Contains utilities for the Dart VM's `dart:io`. +[![Build Status](https://travis-ci.org/dart-lang/io.svg?branch=master)](https://travis-ci.org/dart-lang/io) + ## Usage ### Files @@ -15,4 +17,58 @@ operating system. #### `ExitCode` -An enum-type class that contains known exit codes. \ No newline at end of file +An enum-type class that contains known exit codes. + +#### `ProcessManager` + +A higher-level service for spawning and communicating with processes. + +##### Use `spawn` to create a process with std[in|out|err] forwarded by default + +```dart +/// Runs `dartfmt` commands and `pub publish`. +Future main() async { + final manager = new ProcessManager(); + + // Runs dartfmt --version and outputs the result via stdout. + print('Running dartfmt --version'); + var spawn = await manager.spawn('dartfmt', arguments: ['--version']); + await spawn.exitCode; + + // Runs dartfmt -n . and outputs the result via stdout. + print('Running dartfmt -n .'); + spawn = await manager.spawn('dartfmt', arguments: ['-n', '.']); + await spawn.exitCode; + + // Runs pub publish. Upon hitting a blocking stdin state, you may directly + // output to the processes's stdin via your own, similar to how a bash or + // shell script would spawn a process. + print('Running pub publish'); + spawn = await manager.spawn('pub', arguments: ['publish']); + await spawn.exitCode; + + // Closes stdin for the entire program. + await sharedStdIn.terminate(); +} +``` + +#### `sharedStdIn` + +A safer version of the default `stdin` stream from `dart:io` that allows a +subscriber to cancel their subscription, and then allows a _new_ subscriber to +start listening. This differs from the default behavior where only a single +listener is ever allowed in the application lifecycle: + +```dart +test('should allow multiple subscribers', () async { + final logs = []; + final asUtf8 = sharedStdIn.transform(UTF8.decoder); + // Wait for input for the user. + logs.add(await asUtf8.first); + // Wait for more input for the user. + logs.add(await asUtf8.first); + expect(logs, ['Hello World', 'Goodbye World']); +}); +``` + +For testing, an instance of `SharedStdIn` may be created directly. diff --git a/example/spawn_process.dart b/example/spawn_process.dart new file mode 100644 index 0000000..d865138 --- /dev/null +++ b/example/spawn_process.dart @@ -0,0 +1,32 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:io/io.dart'; + +/// Runs `dartfmt` commands and `pub publish`. +Future main() async { + final manager = new ProcessManager(); + + // Runs dartfmt --version and outputs the result via stdout. + print('Running dartfmt --version'); + var spawn = await manager.spawn('dartfmt', arguments: ['--version']); + await spawn.exitCode; + + // Runs dartfmt -n . and outputs the result via stdout. + print('Running dartfmt -n .'); + spawn = await manager.spawn('dartfmt', arguments: ['-n', '.']); + await spawn.exitCode; + + // Runs pub publish. Upon hitting a blocking stdin state, you may directly + // output to the processes's stdin via your own, similar to how a bash or + // shell script would spawn a process. + print('Running pub publish'); + spawn = await manager.spawn('pub', arguments: ['publish']); + await spawn.exitCode; + + // Closes stdin for the entire program. + await sharedStdIn.terminate(); +} diff --git a/lib/io.dart b/lib/io.dart index 552217c..3af40ae 100644 --- a/lib/io.dart +++ b/lib/io.dart @@ -4,3 +4,5 @@ export 'src/exit_codes.dart' show ExitCode; export 'src/permissions.dart' show isExecutable; +export 'src/process_manager.dart' show ProcessManager, Spawn; +export 'src/shared_stdin.dart' show SharedStdIn, sharedStdIn; diff --git a/lib/src/process_manager.dart b/lib/src/process_manager.dart new file mode 100644 index 0000000..a126335 --- /dev/null +++ b/lib/src/process_manager.dart @@ -0,0 +1,162 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io' as io; + +import 'package:meta/meta.dart'; + +import 'shared_stdin.dart'; + +/// A high-level abstraction around using and managing processes on the system. +abstract class ProcessManager { + /// Terminates the global `stdin` listener, making future listens impossible. + /// + /// This method should be invoked only at the _end_ of a program's execution. + static Future terminateStdIn() async { + await sharedStdIn.terminate(); + } + + /// Create a new instance of [ProcessManager] for the current platform. + /// + /// May manually specify whether the current platform [isWindows], otherwise + /// this is derived from the Dart runtime (i.e. [io.Platform.isWindows]). + factory ProcessManager({ + Stream> stdin, + io.IOSink stdout, + io.IOSink stderr, + bool isWindows, + }) { + stdin ??= sharedStdIn; + stdout ??= io.stdout; + stderr ??= io.stderr; + isWindows ??= io.Platform.isWindows; + if (isWindows) { + return new _WindowsProcessManager(stdin, stdout, stderr); + } + return new _UnixProcessManager(stdin, stdout, stderr); + } + + final Stream> _stdin; + final io.IOSink _stdout; + final io.IOSink _stderr; + + const ProcessManager._(this._stdin, this._stdout, this._stderr); + + /// Spawns a process by invoking [executable] with [arguments]. + /// + /// This is _similar_ to [io.Process.start], but all standard input and output + /// is forwarded/routed between the process and the host, similar to how a + /// shell script works. + /// + /// Returns a future that completes with a handle to the spawned process. + Future spawn( + String executable, { + Iterable arguments: const [], + }) async { + final process = io.Process.start(executable, arguments.toList()); + return new _ForwardingSpawn(await process, _stdin, _stdout, _stderr); + } +} + +/// A process instance created and managed through [ProcessManager]. +/// +/// Unlike one created directly by [io.Process.start] or [io.Process.run], a +/// spawned process works more like executing a command in a shell script. +class Spawn implements io.Process { + final io.Process _delegate; + + Spawn._(this._delegate) { + _delegate.exitCode.then((_) => _onClosed()); + } + + @mustCallSuper + @visibleForOverriding + void _onClosed() {} + + @override + bool kill([io.ProcessSignal signal = io.ProcessSignal.SIGTERM]) => + _delegate.kill(signal); + + @override + Future get exitCode => _delegate.exitCode; + + @override + int get pid => _delegate.pid; + + @override + Stream> get stderr => _delegate.stderr; + + @override + io.IOSink get stdin => _delegate.stdin; + + @override + Stream> get stdout => _delegate.stdout; +} + +/// Forwards `stdin`/`stdout`/`stderr` to/from the host. +class _ForwardingSpawn extends Spawn { + final StreamSubscription _stdInSub; + final StreamSubscription _stdOutSub; + final StreamSubscription _stdErrSub; + + factory _ForwardingSpawn( + io.Process delegate, + Stream> stdin, + io.IOSink stdout, + io.IOSink stderr, + ) { + final stdInSub = stdin.listen(delegate.stdin.add); + final stdOutSub = delegate.stdout.listen(stdout.add); + final stdErrSub = delegate.stderr.listen(stderr.add); + return new _ForwardingSpawn._delegate( + delegate, + stdInSub, + stdOutSub, + stdErrSub, + ); + } + + _ForwardingSpawn._delegate( + io.Process delegate, + this._stdInSub, + this._stdOutSub, + this._stdErrSub, + ) + : super._(delegate); + + @override + void _onClosed() { + _stdInSub.cancel(); + _stdOutSub.cancel(); + _stdErrSub.cancel(); + super._onClosed(); + } +} + +class _UnixProcessManager extends ProcessManager { + const _UnixProcessManager( + Stream> stdin, + io.IOSink stdout, + io.IOSink stderr, + ) + : super._( + stdin, + stdout, + stderr, + ); +} + +class _WindowsProcessManager extends ProcessManager { + const _WindowsProcessManager( + Stream> stdin, + io.IOSink stdout, + io.IOSink stderr, + ) + : super._( + stdin, + stdout, + stderr, + ); +} diff --git a/lib/src/shared_stdin.dart b/lib/src/shared_stdin.dart new file mode 100644 index 0000000..79ff97d --- /dev/null +++ b/lib/src/shared_stdin.dart @@ -0,0 +1,81 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:meta/meta.dart'; + +/// A shared singleton instance of `dart:io`'s [stdin] stream. +/// +/// _Unlike_ the normal [stdin] stream, [sharedStdIn] may switch subscribers +/// as long as the previous subscriber cancels before the new subscriber starts +/// listening. +/// +/// [SharedStdIn.terminate] *must* be invoked in order to close the underlying +/// connection to [stdin], allowing your program to close automatically without +/// hanging. +final SharedStdIn sharedStdIn = new SharedStdIn(stdin); + +/// A singleton wrapper around `stdin` that allows new subscribers. +/// +/// This class is visible in order to be used as a test harness for mock +/// implementations of `stdin`. In normal programs, [sharedStdIn] should be +/// used directly. +@visibleForTesting +class SharedStdIn extends Stream> { + StreamController> _current; + StreamSubscription> _sub; + + SharedStdIn([Stream> stream]) { + _sub = (stream ??= stdin).listen(_onInput); + } + + void _onInput(List event) => _getCurrent().add(event); + + StreamController> _getCurrent() { + if (_current == null) { + _current = new StreamController>( + onCancel: () { + _current = null; + }, + sync: true); + } + return _current; + } + + @override + StreamSubscription> listen( + void onData(List event), { + Function onError, + void onDone(), + bool cancelOnError, + }) { + if (_sub == null) { + throw new StateError('Stdin has already been terminated.'); + } + final controller = _getCurrent(); + if (controller.hasListener) { + throw new StateError('' + 'Subscriber already listening. The existing subscriber must cancel ' + 'before another may be added.'); + } + return controller.stream.listen( + onData, + onDone: onDone, + onError: onError, + cancelOnError: cancelOnError, + ); + } + + /// Terminates the connection to `stdin`, closing all subscription. + Future terminate() async { + if (_sub == null) { + throw new StateError('Stdin has already been terminated.'); + } + await _sub.cancel(); + await _current?.close(); + _sub = null; + } +} diff --git a/pubspec.yaml b/pubspec.yaml index 999e752..92293fb 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -9,4 +9,5 @@ environment: sdk: ">=1.22.0 <2.0.0" dev_dependencies: + path: ^1.0.0 test: ^0.12.0 diff --git a/test/_files/stderr_hello.dart b/test/_files/stderr_hello.dart new file mode 100644 index 0000000..d29f213 --- /dev/null +++ b/test/_files/stderr_hello.dart @@ -0,0 +1,7 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +void main() => stderr.write('Hello'); diff --git a/test/_files/stdin_echo.dart b/test/_files/stdin_echo.dart new file mode 100644 index 0000000..96913e4 --- /dev/null +++ b/test/_files/stdin_echo.dart @@ -0,0 +1,7 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +void main() => stdout.writeln('You said: ${stdin.readLineSync()}'); diff --git a/test/_files/stdout_hello.dart b/test/_files/stdout_hello.dart new file mode 100644 index 0000000..492a190 --- /dev/null +++ b/test/_files/stdout_hello.dart @@ -0,0 +1,7 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +void main() => stdout.write('Hello'); diff --git a/test/permissions_test.dart b/test/permissions_test.dart index 51fbf6b..2874732 100644 --- a/test/permissions_test.dart +++ b/test/permissions_test.dart @@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +@TestOn('vm') import 'package:io/io.dart'; import 'package:test/test.dart'; diff --git a/test/process_manager_test.dart b/test/process_manager_test.dart new file mode 100644 index 0000000..4ad49d7 --- /dev/null +++ b/test/process_manager_test.dart @@ -0,0 +1,72 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +// TODO: Change to io.dart once these features are published. +import 'package:io/io.dart' hide sharedStdIn; +import 'package:path/path.dart' as p; +import 'package:test/test.dart'; + +void main() { + StreamController fakeStdIn; + ProcessManager processManager; + SharedStdIn sharedStdIn; + List stdoutLog; + List stderrLog; + + group('spawn', () { + setUp(() async { + fakeStdIn = new StreamController(sync: true); + sharedStdIn = new SharedStdIn(fakeStdIn.stream.map((s) => s.codeUnits)); + stdoutLog = []; + stderrLog = []; + + final stdoutController = new StreamController>(sync: true); + stdoutController.stream.map(UTF8.decode).listen(stdoutLog.add); + final stdout = new IOSink(stdoutController); + final stderrController = new StreamController>(sync: true); + stderrController.stream.map(UTF8.decode).listen(stderrLog.add); + final stderr = new IOSink(stderrController); + + processManager = new ProcessManager( + stdin: sharedStdIn, + stdout: stdout, + stderr: stderr, + ); + }); + + test('should output Hello from another process [via stdout]', () async { + final spawn = await processManager.spawn( + 'dart', + arguments: [p.join('test', '_files', 'stdout_hello.dart')], + ); + await spawn.exitCode; + expect(stdoutLog, ['Hello']); + }); + + test('should output Hello from another process [via stderr]', () async { + final spawn = await processManager.spawn( + 'dart', + arguments: [p.join('test', '_files', 'stderr_hello.dart')], + ); + await spawn.exitCode; + expect(stderrLog, ['Hello']); + }); + + test('should forward stdin to another process', () async { + final spawn = await processManager.spawn( + 'dart', + arguments: [p.join('test', '_files', 'stdin_echo.dart')], + ); + spawn.stdin.writeln('Ping'); + await spawn.exitCode; + // TODO: https://github.com/dart-lang/sdk/issues/30119. + // expect(stdoutLog, ['You said: Ping', '\n']); + expect(stdoutLog.join(''), contains('You said: Ping')); + }); + }); +} diff --git a/test/shared_stdin_test.dart b/test/shared_stdin_test.dart new file mode 100644 index 0000000..722ec1f --- /dev/null +++ b/test/shared_stdin_test.dart @@ -0,0 +1,46 @@ +// Copyright 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:convert'; + +import 'package:io/io.dart' hide sharedStdIn; +import 'package:test/test.dart'; + +void main() { + StreamController fakeStdIn; + SharedStdIn sharedStdIn; + + setUp(() async { + fakeStdIn = new StreamController(sync: true); + sharedStdIn = new SharedStdIn(fakeStdIn.stream.map((s) => s.codeUnits)); + }); + + test('should allow a single subscriber', () async { + final logs = []; + final sub = sharedStdIn.transform(UTF8.decoder).listen(logs.add); + fakeStdIn.add('Hello World'); + await sub.cancel(); + expect(logs, ['Hello World']); + }); + + test('should allow multiple subscribers', () async { + final logs = []; + final asUtf8 = sharedStdIn.transform(UTF8.decoder); + var sub = asUtf8.listen(logs.add); + fakeStdIn.add('Hello World'); + await sub.cancel(); + sub = asUtf8.listen(logs.add); + fakeStdIn.add('Goodbye World'); + await sub.cancel(); + expect(logs, ['Hello World', 'Goodbye World']); + }); + + test('should throw if a subscriber is still active', () async { + final active = sharedStdIn.listen((_) {}); + expect(() => sharedStdIn.listen((_) {}), throwsStateError); + await active.cancel(); + expect(() => sharedStdIn.listen((_) {}), returnsNormally); + }); +}