Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Browser.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use React\Http\Io\Sender;
use React\Http\Io\Transaction;
use React\Promise\PromiseInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use React\Stream\ReadableStreamInterface;
use InvalidArgumentException;
Expand Down Expand Up @@ -757,7 +758,7 @@ public function withResponseBuffer($maximumSize)
* @see self::withFollowRedirects()
* @see self::withRejectErrorResponse()
*/
private function withOptions(array $options)
public function withOptions(array $options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I like exposing the withOptions() method again. It has been deprecated with clue/reactphp-buzz#172 not too long ago.

Do we really need this method? It's my understanding we could just rely on the Connection: upgrade and/or Upgrade: … request header(s) being present?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clue I didn't know the method was deprecated.

But my thought behind it was to let the consumer using Browser to be able to choose to watch out for the upgrade event or not. As by default, Browser will not. This means that, if an upgrade did happen, I need to configure browser to capture it like $browser->withOptions(['upgrade' => true]) (default is false in Transaction.php).

This, however, won't be necessary if the default mode should be to capture the upgrade. That was the reason I made the method public (also seeing it was public in clue/reactphp-buzz)

{
$browser = clone $this;
$browser->transaction = $this->transaction->withOptions($options);
Expand All @@ -770,7 +771,7 @@ private function withOptions(array $options)
* @param string $url
* @param array $headers
* @param string|ReadableStreamInterface $body
* @return PromiseInterface<ResponseInterface,\Exception>
* @return PromiseInterface<ResponseInterface,\Exception,ConnectionInterface>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this supposed to do? The PromiseInterface does not currently nor the foreseeable future have a third template type.

*/
private function requestMayBeStreaming($method, $url, array $headers = array(), $body = '')
{
Expand Down
15 changes: 15 additions & 0 deletions src/Client/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRe
});
}

protected function responseIsAnUpgradeResponse($response)
{
return
$response->hasHeader('Connection') &&
(in_array('upgrade', array_map('strtolower', $response->getHeader('Connection')))) &&
(int) $response->getStatusCode() === 101;
}

public function write($data)
{
if (!$this->isWritable()) {
Expand Down Expand Up @@ -140,6 +148,13 @@ public function handleData($data)
try {
$response = gPsr\parse_response($this->buffer);
$bodyChunk = (string) $response->getBody();

if ($this->responseIsAnUpgradeResponse($response)) {
$this->stream->removeListener('data', array($this, 'handleData'));

$this->emit('upgrade', array($this->stream, $response, $this));
return;
}
} catch (\InvalidArgumentException $exception) {
$this->emit('error', array($exception));
}
Expand Down
8 changes: 8 additions & 0 deletions src/Client/RequestData.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,12 @@ private function getAuthHeaders()

return array();
}

/**
* @return array
*/
public function getHeaders()
{
return $this->headers;
}
}
56 changes: 56 additions & 0 deletions src/Client/UpgradedResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

namespace React\Http\Client;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Stream\DuplexStreamInterface;

class UpgradedResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a new class as part of our public API in the internal React\Http\Client namespace.

{
/**
* @var DuplexStreamInterface
*/
private $connection;

/**
* @var ResponseInterface
*/
private $response;

/**
* @var RequestInterface
*/
private $request;

public function __construct(DuplexStreamInterface $connection, ResponseInterface $response, RequestInterface $request)
{
$this->connection = $connection;
$this->response = $response;
$this->request = $request;
}

/**
* @return DuplexStreamInterface
*/
public function getConnection()
{
return $this->connection;
}

/**
* @return ResponseInterface
*/
public function getResponse()
{
return $this->response;
}

/**
* @return RequestInterface
*/
public function getRequest()
{
return $this->request;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing EOL

66 changes: 43 additions & 23 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
use React\EventLoop\LoopInterface;
use React\Http\Client\Client as HttpClient;
use React\Http\Message\Response;
use React\Http\Client\UpgradedResponse;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\ConnectorInterface;
use React\Stream\DuplexStreamInterface;
use React\Stream\ReadableStreamInterface;

/**
Expand Down Expand Up @@ -70,33 +72,14 @@ public function __construct(HttpClient $http)
*
* @internal
* @param RequestInterface $request
* @return PromiseInterface Promise<ResponseInterface, Exception>
* @param bool $upgrade Configures the sender to listen for upgrade
* @return PromiseInterface Promise<ResponseInterface, ConnectionInterface, Exception>
*/
public function send(RequestInterface $request)
public function send(RequestInterface $request, $upgrade = false)
{
$body = $request->getBody();
$size = $body->getSize();

if ($size !== null && $size !== 0) {
// automatically assign a "Content-Length" request header if the body size is known and non-empty
$request = $request->withHeader('Content-Length', (string)$size);
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
$request = $request->withHeader('Content-Length', '0');
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
$request = $request->withHeader('Transfer-Encoding', 'chunked');
} else {
// do not use chunked encoding if size is known or if this is an empty request body
$size = 0;
}

$headers = array();
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}

$requestStream = $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion());
list($size, $requestStream) = $this->createRequestStream($request);

$deferred = new Deferred(function ($_, $reject) use ($requestStream) {
// close request stream if request is cancelled
Expand All @@ -122,6 +105,16 @@ public function send(RequestInterface $request)
$deferred->resolve($response->withBody(new ReadableBodyStream($body, $length)));
});

/**
* We listen for an upgrade, if the request was upgraded, we will hijack it and resolve immediately
* This is useful for websocket connections that requires an HTTP Upgrade.
*/
if ($upgrade) {
$requestStream->on('upgrade', function (DuplexStreamInterface $socket, ResponseInterface $response) use ($request, $deferred) {
$deferred->resolve(new UpgradedResponse($socket, $response, $request));
});
}

if ($body instanceof ReadableStreamInterface) {
if ($body->isReadable()) {
// length unknown => apply chunked transfer-encoding
Expand Down Expand Up @@ -157,4 +150,31 @@ public function send(RequestInterface $request)

return $deferred->promise();
}

protected function createRequestStream(RequestInterface $request)
{
$body = $request->getBody();
$size = $body->getSize();

if ($size !== null && $size !== 0) {
// automatically assign a "Content-Length" request header if the body size is known and non-empty
$request = $request->withHeader('Content-Length', (string)$size);
} elseif ($size === 0 && \in_array($request->getMethod(), array('POST', 'PUT', 'PATCH'))) {
// only assign a "Content-Length: 0" request header if the body is expected for certain methods
$request = $request->withHeader('Content-Length', '0');
} elseif ($body instanceof ReadableStreamInterface && $body->isReadable() && !$request->hasHeader('Content-Length')) {
// use "Transfer-Encoding: chunked" when this is a streaming body and body size is unknown
$request = $request->withHeader('Transfer-Encoding', 'chunked');
} else {
// do not use chunked encoding if size is known or if this is an empty request body
$size = 0;
}

$headers = array();
foreach ($request->getHeaders() as $name => $values) {
$headers[$name] = implode(', ', $values);
}

return array($size, $this->http->request($request->getMethod(), (string)$request->getUri(), $headers, $request->getProtocolVersion()));
}
}
9 changes: 7 additions & 2 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Transaction

private $streaming = false;

// Determines whether to return the connection of an upgrade or not
private $upgrade = false;

private $maximumSize = 16777216; // 16 MiB = 2^24 bytes

public function __construct(Sender $sender, LoopInterface $loop)
Expand Down Expand Up @@ -81,7 +84,7 @@ public function send(RequestInterface $request)

$loop = $this->loop;
$this->next($request, $deferred)->then(
function (ResponseInterface $response) use ($deferred, $loop, &$timeout) {
function ($response) use ($deferred, $loop, &$timeout) {
if (isset($deferred->timeout)) {
$loop->cancelTimer($deferred->timeout);
unset($deferred->timeout);
Expand Down Expand Up @@ -144,7 +147,9 @@ private function next(RequestInterface $request, Deferred $deferred)
$that = $this;
++$deferred->numRequests;

$promise = $this->sender->send($request);
$promise = $this->sender->send($request, $this->upgrade);

if ($this->upgrade) return $promise;

if (!$this->streaming) {
$promise = $promise->then(function ($response) use ($deferred, $that) {
Expand Down