Skip to content

Only start consuming STDOUT data once connection is ready #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions src/SshProcessConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,24 @@ public function connect($uri)
$process = Io\processWithoutFds($command);
$process->start($this->loop);

if ($this->debug) {
echo 'Launched "' . $command . '" with PID ' . $process->getPid() . PHP_EOL; // @codeCoverageIgnore
}

$deferred = new Deferred(function () use ($process, $uri) {
$process->stdin->close();
$process->terminate();

throw new \RuntimeException('Connection to ' . $uri . ' cancelled while waiting for SSH client');
});

// process STDERR one line at a time
// pause STDOUT and process STDERR one line at a time until connection is ready
$process->stdout->pause();
$last = null;
$connected = false;
$debug = $this->debug;
$stderr = new LineSeparatedReader($process->stderr);
$stderr->on('data', function ($line) use ($deferred, $process, $uri, &$last, $debug) {
$stderr->on('data', function ($line) use ($deferred, $process, $uri, &$last, $debug, &$connected) {
// remember last line for error output in case process exits
$last = $line;

Expand Down Expand Up @@ -109,11 +115,26 @@ public function connect($uri)
return;
}

// channel is ready, so resume STDOUT stream and resolve connection
$process->stdout->resume();
$connection = new CompositeConnection($process->stdout, $process->stdin);
$deferred->resolve($connection);
$connected = true;
});

$process->on('exit', function ($code) use ($deferred, $uri, &$last) {
// If STDERR closes before connection was established, explicitly close STDOUT stream.
// The STDOUT stream starts in a paused state and as such will prevent the process exit
// logic from triggering when it is not resumed.
$stderr->on('close', function () use ($process, &$connected) {
if (!$connected) {
$process->stdout->close();
}
});

$process->on('exit', function ($code) use ($deferred, $uri, &$last, $debug) {
if ($debug) {
echo 'Process exit with code ' . $code . PHP_EOL; // @codeCoverageIgnore
}
$deferred->reject(new \RuntimeException(
'Connection to ' . $uri . ' failed because SSH client died (' . $last . ')',
$code
Expand Down
90 changes: 90 additions & 0 deletions tests/IntegrationSshProcessConnectorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?php

use Clue\React\SshProxy\SshProcessConnector;
use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;

class IntegrationSshProcessConnectorTest extends TestCase
{
public function testConnectWillResolveWithConnectionInterfaceWhenProcessOutputsChannelOpenConfirmMessage()
{
$loop = Factory::create();
$connector = new SshProcessConnector('host', $loop);

$ref = new ReflectionProperty($connector, 'cmd');
$ref->setAccessible(true);
$ref->setValue($connector, 'echo "debug2: channel 0: open confirm rwindow 2097152 rmax 32768" >&2; #');

$promise = $connector->connect('example.com:80');
$promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Socket\ConnectionInterface')));

$loop->run();
}

public function testConnectWillRejectWithExceptionWhenProcessOutputsChannelOpenFailedMessage()
{
$loop = Factory::create();
$connector = new SshProcessConnector('host', $loop);

$ref = new ReflectionProperty($connector, 'cmd');
$ref->setAccessible(true);
$ref->setValue($connector, 'echo "channel 0: open failed: administratively prohibited: open failed" >&2; #');

$promise = $connector->connect('example.com:80');
$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));

$loop->run();
}

public function testConnectWillRejectWithExceptionWhenProcessOutputsEndsWithoutChannelMessage()
{
$loop = Factory::create();
$connector = new SshProcessConnector('host', $loop);

$ref = new ReflectionProperty($connector, 'cmd');
$ref->setAccessible(true);
$ref->setValue($connector, 'echo foo >&2; #');

$promise = $connector->connect('example.com:80');
$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));

$loop->run();
}

public function testConnectWillResolveWithConnectionThatWillEmitImmediateDataFromProcessStdoutAfterChannelOpenConfirmMessage()
{
$loop = Factory::create();
$connector = new SshProcessConnector('host', $loop);

$ref = new ReflectionProperty($connector, 'cmd');
$ref->setAccessible(true);
$ref->setValue($connector, 'echo "debug2: channel 0: open confirm rwindow 2097152 rmax 32768" >&2; echo foo #');

$promise = $connector->connect('example.com:80');

$data = $this->expectCallableOnceWith("foo\n");
$promise->then(function (ConnectionInterface $connection) use ($data) {
$connection->on('data', $data);
});

$loop->run();
}

protected function expectCallableOnceWith($value)
{
$mock = $this->createCallableMock();

$mock
->expects($this->once())
->method('__invoke')
->with($value);

return $mock;
}

protected function createCallableMock()
{
return $this->getMockBuilder('stdClass')->setMethods(array('__invoke'))->getMock();
}
}