From f73b33f12d8db524eec197a35548f29a5426e2d7 Mon Sep 17 00:00:00 2001 From: bosunski Date: Sun, 26 Jul 2020 12:13:42 +0100 Subject: [PATCH 01/13] Begin Writable implementation --- src/Browser.php | 10 ++++- src/Client/Request.php | 28 ++++++++++++- src/Client/RequestData.php | 13 +++++++ src/Io/Sender.php | 80 ++++++++++++++++++++++++++++---------- src/Io/Transaction.php | 4 +- 5 files changed, 111 insertions(+), 24 deletions(-) diff --git a/src/Browser.php b/src/Browser.php index 46a5d115..ed5b3aa9 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -8,6 +8,7 @@ use React\Http\Io\Sender; use React\Http\Io\Transaction; use React\Promise\PromiseInterface; +use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; use InvalidArgumentException; @@ -381,6 +382,11 @@ public function requestStreaming($method, $url, $headers = array(), $contents = return $this->withOptions(array('streaming' => true))->requestMayBeStreaming($method, $url, $headers, $contents); } + public function requestUpgrade($method, $url, $headers = array(), $contents = '') + { + return $this->withOptions(array('upgrade' => true))->requestMayBeStreaming($method, $url, $headers, $contents); + } + /** * Changes the maximum timeout used for waiting for pending requests. * @@ -708,7 +714,7 @@ public function withResponseBuffer($maximumSize) * @see self::withFollowRedirects() * @see self::withRejectErrorResponse() */ - private function withOptions(array $options) + public function withOptions(array $options) { $browser = clone $this; $browser->transaction = $this->transaction->withOptions($options); @@ -721,7 +727,7 @@ private function withOptions(array $options) * @param string $url * @param array $headers * @param string|ReadableStreamInterface $contents - * @return PromiseInterface + * @return PromiseInterface */ private function requestMayBeStreaming($method, $url, array $headers = array(), $contents = '') { diff --git a/src/Client/Request.php b/src/Client/Request.php index 7ebb627f..88c7e9a5 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -61,7 +61,28 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $streamRef = $stream; $stream->on('drain', array($that, 'handleDrain')); - $stream->on('data', array($that, 'handleData')); + + $buffer = ''; + $headerParser = function ($data) use ($requestData, &$headerParser, $buffer, $streamRef, $that) { + $buffer .= $data; + + static $headerParsed = false; + + if (!$requestData->isUpgradeRequest() || $headerParsed || false == strpos($buffer, "\r\n\r\n")) { + return $that->handleData($data); + } + + $headerParsed = true; + + $response = gPsr\parse_response($buffer); + + $streamRef->removeListener('data', $headerParser); + + $that->emit('upgrade', array($streamRef, $response, $that)); + }; + + $stream->on('data', $headerParser); + $stream->on('end', array($that, 'handleEnd')); $stream->on('error', array($that, 'handleError')); $stream->on('close', array($that, 'handleClose')); @@ -292,4 +313,9 @@ public function getResponseFactory() return $factory; } + + public function getRequestData() + { + return $this->requestData; + } } diff --git a/src/Client/RequestData.php b/src/Client/RequestData.php index a5908a08..c37c0065 100644 --- a/src/Client/RequestData.php +++ b/src/Client/RequestData.php @@ -125,4 +125,17 @@ private function getAuthHeaders() return array(); } + + public function isUpgradeRequest() + { + return isset($this->headers['Connection']) && strtolower($this->headers['Connection']) === "upgrade"; + } + + /** + * @return array + */ + public function getHeaders() + { + return $this->headers; + } } diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 6f3367e5..27c15291 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -8,6 +8,7 @@ use React\Http\Client\Response as ResponseStream; use React\Promise\PromiseInterface; use React\Promise\Deferred; +use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; use React\Stream\ReadableStreamInterface; @@ -78,26 +79,7 @@ public function send(RequestInterface $request) $body = $request->getBody(); $size = $body->getSize(); - if ($size !== null && $size !== 0) { - // automatically assign a "Content-Length" request header if the body size is known and non-empty - $request = $request->withHeader('Content-Length', (string)$size); - } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { - // only assign a "Content-Length: 0" request header if the body is expected for certain methods - $request = $request->withHeader('Content-Length', '0'); - } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { - // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown - $request = $request->withHeader('Transfer-Encoding', 'chunked'); - } else { - // do not use chunked encoding if size is known or if this is an empty request body - $size = 0; - } - - $headers = array(); - foreach ($request->getHeaders() as $name => $values) { - $headers[$name] = implode(', ', $values); - } - - $requestStream = $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()); + $requestStream = $this->createRequestStream($request); $deferred = new Deferred(function ($_, $reject) use ($requestStream) { // close request stream if request is cancelled @@ -122,6 +104,10 @@ public function send(RequestInterface $request) )); }); + $requestStream->on('upgrade', function (ConnectionInterface $socket) use ($deferred) { + $deferred->resolve($socket); + }); + if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { // length unknown => apply chunked transfer-encoding @@ -157,4 +143,58 @@ public function send(RequestInterface $request) return $deferred->promise(); } + + /** + * + * @internal + * @param RequestInterface $request + * @return PromiseInterface Promise + */ + public function upgrade(RequestInterface $request) + { + $requestStream = $this->createRequestStream($request); + + $deferred = new Deferred(function ($_, $reject) use ($requestStream) { + // close request stream if request is cancelled + $reject(new \RuntimeException('Request cancelled')); + $requestStream->close(); + }); + + $requestStream->on('error', function($error) use ($deferred) { + $deferred->reject($error); + }); + + $requestStream->on('upgrade', function (ConnectorInterface $socket) use ($deferred) { + $deferred->resolve($socket); + }); + + return $deferred->promise(); + } + + protected function createRequestStream(RequestInterface $request) + { + $body = $request->getBody(); + $size = $body->getSize(); + + if ($size !== null && $size !== 0) { + // automatically assign a "Content-Length" request header if the body size is known and non-empty + $request = $request->withHeader('Content-Length', (string)$size); + } elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) { + // only assign a "Content-Length: 0" request header if the body is expected for certain methods + $request = $request->withHeader('Content-Length', '0'); + } elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) { + // use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown + $request = $request->withHeader('Transfer-Encoding', 'chunked'); + } else { + // do not use chunked encoding if size is known or if this is an empty request body + $size = 0; + } + + $headers = array(); + foreach ($request->getHeaders() as $name => $values) { + $headers[$name] = implode(', ', $values); + } + + return $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()); + } } diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index a593e684..512be4f8 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -34,6 +34,8 @@ class Transaction private $streaming = false; + private $upgrade = false; + private $maximumSize = 16777216; // 16 MiB = 2^24 bytes public function __construct(Sender $sender, MessageFactory $messageFactory, LoopInterface $loop) @@ -81,7 +83,7 @@ public function send(RequestInterface $request) $loop = $this->loop; $this->next($request, $deferred)->then( - function (ResponseInterface $response) use ($deferred, $loop, &$timeout) { + function ($response) use ($deferred, $loop, &$timeout) { if (isset($deferred->timeout)) { $loop->cancelTimer($deferred->timeout); unset($deferred->timeout); From 7cc5c5a36046570668fe11d2a1db21edf907877d Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:21:57 +0100 Subject: [PATCH 02/13] Add upgrade listener and provide option for upgrade in Transaction --- src/Client/UpgradedResponse.php | 56 +++++++++++++++++++++++++++++++++ src/Io/Sender.php | 37 +++++----------------- src/Io/Transaction.php | 2 ++ 3 files changed, 66 insertions(+), 29 deletions(-) create mode 100644 src/Client/UpgradedResponse.php diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php new file mode 100644 index 00000000..795869e8 --- /dev/null +++ b/src/Client/UpgradedResponse.php @@ -0,0 +1,56 @@ +connection = $connection; + $this->response = $response; + $this->request = $request; + } + + /** + * @return ConnectionInterface + */ + public function getConnection() + { + return $this->connection; + } + + /** + * @return ResponseInterface + */ + public function getResponse() + { + return $this->response; + } + + /** + * @return RequestInterface + */ + public function getRequest() + { + return $this->request; + } +} \ No newline at end of file diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 27c15291..9eaa0ed7 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -3,9 +3,11 @@ namespace React\Http\Io; use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; use React\EventLoop\LoopInterface; use React\Http\Client\Client as HttpClient; use React\Http\Client\Response as ResponseStream; +use React\Http\Client\UpgradedResponse; use React\Promise\PromiseInterface; use React\Promise\Deferred; use React\Socket\ConnectionInterface; @@ -104,8 +106,12 @@ public function send(RequestInterface $request) )); }); - $requestStream->on('upgrade', function (ConnectionInterface $socket) use ($deferred) { - $deferred->resolve($socket); + /** + * We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately + * This is useful for websocket connections that requires an HTTP Upgrade. + */ + $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $deferred->resolve(new UpgradedResponse($socket, $response, $request)); }); if ($body instanceof ReadableStreamInterface) { @@ -144,33 +150,6 @@ public function send(RequestInterface $request) return $deferred->promise(); } - /** - * - * @internal - * @param RequestInterface $request - * @return PromiseInterface Promise - */ - public function upgrade(RequestInterface $request) - { - $requestStream = $this->createRequestStream($request); - - $deferred = new Deferred(function ($_, $reject) use ($requestStream) { - // close request stream if request is cancelled - $reject(new \RuntimeException('Request cancelled')); - $requestStream->close(); - }); - - $requestStream->on('error', function($error) use ($deferred) { - $deferred->reject($error); - }); - - $requestStream->on('upgrade', function (ConnectorInterface $socket) use ($deferred) { - $deferred->resolve($socket); - }); - - return $deferred->promise(); - } - protected function createRequestStream(RequestInterface $request) { $body = $request->getBody(); diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index 512be4f8..fd73155d 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -148,6 +148,8 @@ private function next(RequestInterface $request, Deferred $deferred) $promise = $this->sender->send($request); + if ($this->upgrade) return $promise; + if (!$this->streaming) { $promise = $promise->then(function ($response) use ($deferred, $that) { return $that->bufferResponse($response, $deferred); From 662df471621d016b3c7f160f1de6da0628061935 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:23:31 +0100 Subject: [PATCH 03/13] Add upgrade listener and provide option for upgrade in Transaction --- src/Io/Sender.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 9eaa0ed7..e289e402 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -74,7 +74,7 @@ public function __construct(HttpClient $http, MessageFactory $messageFactory) * * @internal * @param RequestInterface $request - * @return PromiseInterface Promise + * @return PromiseInterface Promise */ public function send(RequestInterface $request) { From ccca59c76d0a6ba10cf9653d2c96dd50f1ab7378 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:30:05 +0100 Subject: [PATCH 04/13] Reverted Scope --- src/Browser.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Browser.php b/src/Browser.php index ed5b3aa9..2cd1cccf 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -714,7 +714,7 @@ public function withResponseBuffer($maximumSize) * @see self::withFollowRedirects() * @see self::withRejectErrorResponse() */ - public function withOptions(array $options) + private function withOptions(array $options) { $browser = clone $this; $browser->transaction = $this->transaction->withOptions($options); From 3585dcaaff9e5c37fdb1df36988b639d93bbdcfe Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 20:48:17 +0100 Subject: [PATCH 05/13] Configures sender to upgrade or not --- src/Io/Sender.php | 11 +++++++---- src/Io/Transaction.php | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 6cefae5d..570a3eeb 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -73,9 +73,10 @@ public function __construct(HttpClient $http) * * @internal * @param RequestInterface $request + * @param bool $upgrade Configures the sender to listen for upgrade * @return PromiseInterface Promise */ - public function send(RequestInterface $request) + public function send(RequestInterface $request, $upgrade = false) { $body = $request->getBody(); $size = $body->getSize(); @@ -115,9 +116,11 @@ public function send(RequestInterface $request) * We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately * This is useful for websocket connections that requires an HTTP Upgrade. */ - $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { - $deferred->resolve(new UpgradedResponse($socket, $response, $request)); - }); + if ($upgrade) { + $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $deferred->resolve(new UpgradedResponse($socket, $response, $request)); + }); + } if ($body instanceof ReadableStreamInterface) { if ($body->isReadable()) { diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index f54b24fc..a100582a 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -35,6 +35,7 @@ class Transaction private $streaming = false; + // Determines whether to return the connection of an upgrade or not private $upgrade = false; private $maximumSize = 16777216; // 16 MiB = 2^24 bytes @@ -146,7 +147,7 @@ private function next(RequestInterface $request, Deferred $deferred) $that = $this; ++$deferred->numRequests; - $promise = $this->sender->send($request); + $promise = $this->sender->send($request, $this->upgrade); if ($this->upgrade) return $promise; From 9a23b03ca930ddd32eadc72f7d1e958207f08fd9 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 21:16:15 +0100 Subject: [PATCH 06/13] Add check for upgrade on response instead of request --- src/Client/Request.php | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index 88c7e9a5..f27c644b 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -68,7 +68,7 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe static $headerParsed = false; - if (!$requestData->isUpgradeRequest() || $headerParsed || false == strpos($buffer, "\r\n\r\n")) { + if ($headerParsed || false == strpos($buffer, "\r\n\r\n")) { return $that->handleData($data); } @@ -76,6 +76,10 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $response = gPsr\parse_response($buffer); + if (!$that->responseIsAnUpgradeResponse($response)) { + return $that->handleData($data); + } + $streamRef->removeListener('data', $headerParser); $that->emit('upgrade', array($streamRef, $response, $that)); @@ -110,6 +114,14 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe }); } + protected function responseIsAnUpgradeResponse($response) + { + return + $response->hasHeader('Connection') && + (in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) && + (int) $response->getStatusCode() === 101; + } + public function write($data) { if (!$this->isWritable()) { From 9a024808c5e4ceb4105728a7f49c09ec0a24952e Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 21:16:51 +0100 Subject: [PATCH 07/13] Remove isUpgradeRequest --- src/Client/RequestData.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Client/RequestData.php b/src/Client/RequestData.php index c37c0065..c3b0f685 100644 --- a/src/Client/RequestData.php +++ b/src/Client/RequestData.php @@ -126,11 +126,6 @@ private function getAuthHeaders() return array(); } - public function isUpgradeRequest() - { - return isset($this->headers['Connection']) && strtolower($this->headers['Connection']) === "upgrade"; - } - /** * @return array */ From 88d862f56f00e0fd67495dedc3202f08eb4f99e5 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 21:18:52 +0100 Subject: [PATCH 08/13] Remove requestUpgrade method --- src/Browser.php | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Browser.php b/src/Browser.php index 06a7aba3..33a38361 100644 --- a/src/Browser.php +++ b/src/Browser.php @@ -381,11 +381,6 @@ public function requestStreaming($method, $url, $headers = array(), $contents = return $this->withOptions(array('streaming' => true))->requestMayBeStreaming($method, $url, $headers, $contents); } - public function requestUpgrade($method, $url, $headers = array(), $contents = '') - { - return $this->withOptions(array('upgrade' => true))->requestMayBeStreaming($method, $url, $headers, $contents); - } - /** * Changes the maximum timeout used for waiting for pending requests. * From 532755a17b6e59dd27be56c5304ecdf5dd1cceb7 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 22:08:48 +0100 Subject: [PATCH 09/13] Replaced ConnectionInteface With DuplexStreamInterface In upgraded response --- src/Client/UpgradedResponse.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Client/UpgradedResponse.php b/src/Client/UpgradedResponse.php index 795869e8..8659b304 100644 --- a/src/Client/UpgradedResponse.php +++ b/src/Client/UpgradedResponse.php @@ -4,12 +4,12 @@ use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; -use React\Socket\ConnectionInterface; +use React\Stream\DuplexStreamInterface; class UpgradedResponse { /** - * @var ConnectionInterface + * @var DuplexStreamInterface */ private $connection; @@ -23,7 +23,7 @@ class UpgradedResponse */ private $request; - public function __construct(ConnectionInterface $connection, ResponseInterface $response, RequestInterface $request) + public function __construct(DuplexStreamInterface $connection, ResponseInterface $response, RequestInterface $request) { $this->connection = $connection; $this->response = $response; @@ -31,7 +31,7 @@ public function __construct(ConnectionInterface $connection, ResponseInterface $ } /** - * @return ConnectionInterface + * @return DuplexStreamInterface */ public function getConnection() { From 65f563c55032c07d112dd8ac8d346c282a9e3939 Mon Sep 17 00:00:00 2001 From: bosunski Date: Mon, 27 Jul 2020 22:11:38 +0100 Subject: [PATCH 10/13] Replaced ConnectionInteface With DuplexStreamInterface In Sender --- src/Io/Sender.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Io/Sender.php b/src/Io/Sender.php index 570a3eeb..af2e2aaa 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -11,8 +11,8 @@ use React\Http\Client\UpgradedResponse; use React\Promise\PromiseInterface; use React\Promise\Deferred; -use React\Socket\ConnectionInterface; use React\Socket\ConnectorInterface; +use React\Stream\DuplexStreamInterface; use React\Stream\ReadableStreamInterface; /** @@ -117,7 +117,7 @@ public function send(RequestInterface $request, $upgrade = false) * This is useful for websocket connections that requires an HTTP Upgrade. */ if ($upgrade) { - $requestStream->on('upgrade', function (ConnectionInterface $socket, ResponseInterface $response) use ($request, $deferred) { + $requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) { $deferred->resolve(new UpgradedResponse($socket, $response, $request)); }); } From b3264a58bc68830ab87fb10b404c3ae315e0c792 Mon Sep 17 00:00:00 2001 From: bosunski Date: Tue, 28 Jul 2020 01:10:26 +0100 Subject: [PATCH 11/13] Start fixing tests --- src/Client/Request.php | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index f27c644b..0a026d98 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -63,7 +63,7 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $stream->on('drain', array($that, 'handleDrain')); $buffer = ''; - $headerParser = function ($data) use ($requestData, &$headerParser, $buffer, $streamRef, $that) { + $headerParser = function ($data) use (&$headerParser, $buffer, $streamRef, $that) { $buffer .= $data; static $headerParsed = false; @@ -76,17 +76,17 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $response = gPsr\parse_response($buffer); - if (!$that->responseIsAnUpgradeResponse($response)) { - return $that->handleData($data); - } + if ($that->responseIsAnUpgradeResponse($response)) { + $that->stream->removeListener('data', $headerParser); - $streamRef->removeListener('data', $headerParser); + $that->emit('upgrade', array($that->stream, $response, $that)); + return; + } - $that->emit('upgrade', array($streamRef, $response, $that)); + return $that->handleData($data); }; - $stream->on('data', $headerParser); - + $stream->on('data', array($that, 'handleData')); $stream->on('end', array($that, 'handleEnd')); $stream->on('error', array($that, 'handleError')); $stream->on('close', array($that, 'handleClose')); @@ -168,7 +168,29 @@ public function handleData($data) { $this->buffer .= $data; + static $headerParsed = false; + + if ($headerParsed || false == strpos($this->buffer, "\r\n\r\n")) { + return $this->handleEx(); + } + + $headerParsed = true; + + $response = gPsr\parse_response($this->buffer); + + if ($this->responseIsAnUpgradeResponse($response)) { + $this->stream->removeListener('data', array($this, 'handleData')); + + $this->emit('upgrade', array($this->stream, $response, $this)); + return; + } + // buffer until double CRLF (or double LF for compatibility with legacy servers) + return $this->handleEx(); + } + + protected function handleEx() + { if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { try { list($response, $bodyChunk) = $this->parseResponse($this->buffer); From 7f3890fd61cb866c3fda4624de61b30ac96c60f9 Mon Sep 17 00:00:00 2001 From: bosunski Date: Tue, 28 Jul 2020 01:53:05 +0100 Subject: [PATCH 12/13] Refactored some parts so tests can pass --- src/Client/Request.php | 62 ++++++++---------------------------------- src/Io/Sender.php | 5 ++-- 2 files changed, 14 insertions(+), 53 deletions(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index 0a026d98..1466ef66 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -61,31 +61,6 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe $streamRef = $stream; $stream->on('drain', array($that, 'handleDrain')); - - $buffer = ''; - $headerParser = function ($data) use (&$headerParser, $buffer, $streamRef, $that) { - $buffer .= $data; - - static $headerParsed = false; - - if ($headerParsed || false == strpos($buffer, "\r\n\r\n")) { - return $that->handleData($data); - } - - $headerParsed = true; - - $response = gPsr\parse_response($buffer); - - if ($that->responseIsAnUpgradeResponse($response)) { - $that->stream->removeListener('data', $headerParser); - - $that->emit('upgrade', array($that->stream, $response, $that)); - return; - } - - return $that->handleData($data); - }; - $stream->on('data', array($that, 'handleData')); $stream->on('end', array($that, 'handleEnd')); $stream->on('error', array($that, 'handleError')); @@ -168,32 +143,19 @@ public function handleData($data) { $this->buffer .= $data; - static $headerParsed = false; - - if ($headerParsed || false == strpos($this->buffer, "\r\n\r\n")) { - return $this->handleEx(); - } - - $headerParsed = true; - - $response = gPsr\parse_response($this->buffer); - - if ($this->responseIsAnUpgradeResponse($response)) { - $this->stream->removeListener('data', array($this, 'handleData')); - - $this->emit('upgrade', array($this->stream, $response, $this)); - return; - } - // buffer until double CRLF (or double LF for compatibility with legacy servers) - return $this->handleEx(); - } - - protected function handleEx() - { if (false !== strpos($this->buffer, "\r\n\r\n") || false !== strpos($this->buffer, "\n\n")) { try { - list($response, $bodyChunk) = $this->parseResponse($this->buffer); + $psrResponse = gPsr\parse_response($this->buffer); + + if ($this->responseIsAnUpgradeResponse($psrResponse)) { + $this->stream->removeListener('data', array($this, 'handleData')); + + $this->emit('upgrade', array($this->stream, $psrResponse, $this)); + return; + } + + list($response, $bodyChunk) = $this->parseResponse($psrResponse); } catch (\InvalidArgumentException $exception) { $this->emit('error', array($exception)); } @@ -277,9 +239,9 @@ public function close() $this->removeAllListeners(); } - protected function parseResponse($data) + protected function parseResponse($psrResponse) { - $psrResponse = gPsr\parse_response($data); +// $psrResponse = gPsr\parse_response($data); $headers = array_map(function($val) { if (1 === count($val)) { $val = $val[0]; diff --git a/src/Io/Sender.php b/src/Io/Sender.php index af2e2aaa..91c9324e 100644 --- a/src/Io/Sender.php +++ b/src/Io/Sender.php @@ -79,9 +79,8 @@ public function __construct(HttpClient $http) public function send(RequestInterface $request, $upgrade = false) { $body = $request->getBody(); - $size = $body->getSize(); - $requestStream = $this->createRequestStream($request); + list($size, $requestStream) = $this->createRequestStream($request); $deferred = new Deferred(function ($_, $reject) use ($requestStream) { // close request stream if request is cancelled @@ -182,6 +181,6 @@ protected function createRequestStream(RequestInterface $request) $headers[$name] = implode(', ', $values); } - return $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()); + return array($size, $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion())); } } From 26b3b9aa45c0a8ab260a66ff815d3a3deb2812e7 Mon Sep 17 00:00:00 2001 From: bosunski Date: Tue, 28 Jul 2020 02:06:56 +0100 Subject: [PATCH 13/13] Remove comment --- src/Client/Request.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Client/Request.php b/src/Client/Request.php index 1466ef66..520386dd 100644 --- a/src/Client/Request.php +++ b/src/Client/Request.php @@ -241,7 +241,6 @@ public function close() protected function parseResponse($psrResponse) { -// $psrResponse = gPsr\parse_response($data); $headers = array_map(function($val) { if (1 === count($val)) { $val = $val[0];