From 3d0cda69b382d2989762a70eef03df2a57fbd934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Thu, 25 Apr 2019 10:23:32 +0200 Subject: [PATCH] Only start consuming STDOUT data once connection is ready This fixes a race condition that happened when the target server sent data right after the connection (common for MySQL server connection) --- src/SshProcessConnector.php | 27 +++++- tests/IntegrationSshProcessConnectorTest.php | 90 ++++++++++++++++++++ 2 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 tests/IntegrationSshProcessConnectorTest.php diff --git a/src/SshProcessConnector.php b/src/SshProcessConnector.php index e9b212e..0e9661b 100644 --- a/src/SshProcessConnector.php +++ b/src/SshProcessConnector.php @@ -69,6 +69,10 @@ public function connect($uri) $process = Io\processWithoutFds($command); $process->start($this->loop); + if ($this->debug) { + echo 'Launched "' . $command . '" with PID ' . $process->getPid() . PHP_EOL; // @codeCoverageIgnore + } + $deferred = new Deferred(function () use ($process, $uri) { $process->stdin->close(); $process->terminate(); @@ -76,11 +80,13 @@ public function connect($uri) throw new \RuntimeException('Connection to ' . $uri . ' cancelled while waiting for SSH client'); }); - // process STDERR one line at a time + // pause STDOUT and process STDERR one line at a time until connection is ready + $process->stdout->pause(); $last = null; + $connected = false; $debug = $this->debug; $stderr = new LineSeparatedReader($process->stderr); - $stderr->on('data', function ($line) use ($deferred, $process, $uri, &$last, $debug) { + $stderr->on('data', function ($line) use ($deferred, $process, $uri, &$last, $debug, &$connected) { // remember last line for error output in case process exits $last = $line; @@ -109,11 +115,26 @@ public function connect($uri) return; } + // channel is ready, so resume STDOUT stream and resolve connection + $process->stdout->resume(); $connection = new CompositeConnection($process->stdout, $process->stdin); $deferred->resolve($connection); + $connected = true; }); - $process->on('exit', function ($code) use ($deferred, $uri, &$last) { + // If STDERR closes before connection was established, explicitly close STDOUT stream. + // The STDOUT stream starts in a paused state and as such will prevent the process exit + // logic from triggering when it is not resumed. + $stderr->on('close', function () use ($process, &$connected) { + if (!$connected) { + $process->stdout->close(); + } + }); + + $process->on('exit', function ($code) use ($deferred, $uri, &$last, $debug) { + if ($debug) { + echo 'Process exit with code ' . $code . PHP_EOL; // @codeCoverageIgnore + } $deferred->reject(new \RuntimeException( 'Connection to ' . $uri . ' failed because SSH client died (' . $last . ')', $code diff --git a/tests/IntegrationSshProcessConnectorTest.php b/tests/IntegrationSshProcessConnectorTest.php new file mode 100644 index 0000000..a9eda51 --- /dev/null +++ b/tests/IntegrationSshProcessConnectorTest.php @@ -0,0 +1,90 @@ +setAccessible(true); + $ref->setValue($connector, 'echo "debug2: channel 0: open confirm rwindow 2097152 rmax 32768" >&2; #'); + + $promise = $connector->connect('example.com:80'); + $promise->then($this->expectCallableOnceWith($this->isInstanceOf('React\Socket\ConnectionInterface'))); + + $loop->run(); + } + + public function testConnectWillRejectWithExceptionWhenProcessOutputsChannelOpenFailedMessage() + { + $loop = Factory::create(); + $connector = new SshProcessConnector('host', $loop); + + $ref = new ReflectionProperty($connector, 'cmd'); + $ref->setAccessible(true); + $ref->setValue($connector, 'echo "channel 0: open failed: administratively prohibited: open failed" >&2; #'); + + $promise = $connector->connect('example.com:80'); + $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); + + $loop->run(); + } + + public function testConnectWillRejectWithExceptionWhenProcessOutputsEndsWithoutChannelMessage() + { + $loop = Factory::create(); + $connector = new SshProcessConnector('host', $loop); + + $ref = new ReflectionProperty($connector, 'cmd'); + $ref->setAccessible(true); + $ref->setValue($connector, 'echo foo >&2; #'); + + $promise = $connector->connect('example.com:80'); + $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); + + $loop->run(); + } + + public function testConnectWillResolveWithConnectionThatWillEmitImmediateDataFromProcessStdoutAfterChannelOpenConfirmMessage() + { + $loop = Factory::create(); + $connector = new SshProcessConnector('host', $loop); + + $ref = new ReflectionProperty($connector, 'cmd'); + $ref->setAccessible(true); + $ref->setValue($connector, 'echo "debug2: channel 0: open confirm rwindow 2097152 rmax 32768" >&2; echo foo #'); + + $promise = $connector->connect('example.com:80'); + + $data = $this->expectCallableOnceWith("foo\n"); + $promise->then(function (ConnectionInterface $connection) use ($data) { + $connection->on('data', $data); + }); + + $loop->run(); + } + + protected function expectCallableOnceWith($value) + { + $mock = $this->createCallableMock(); + + $mock + ->expects($this->once()) + ->method('__invoke') + ->with($value); + + return $mock; + } + + protected function createCallableMock() + { + return $this->getMockBuilder('stdClass')->setMethods(array('__invoke'))->getMock(); + } +}