Skip to content

Adds a type-erased response adapter to the public API #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions include/boost/redis/adapter/any_adapter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/* Copyright (c) 2018-2023 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#ifndef BOOST_REDIS_ANY_ADAPTER_HPP
#define BOOST_REDIS_ANY_ADAPTER_HPP


#include <boost/redis/resp3/node.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <functional>
#include <string_view>
#include <type_traits>

namespace boost::redis {

namespace detail {

// Forward decl
template <class Executor>
class connection_base;

}

/** @brief A type-erased reference to a response.
* @ingroup high-level-api
*
* A type-erased response adapter. It can be executed using @ref connection::async_exec.
* Using this type instead of raw response references enables separate compilation.
*
* Given a response object `resp` that can be passed to `async_exec`, the following two
* statements have the same effect:
* ```
* co_await conn.async_exec(req, resp);
* co_await conn.async_exec(req, any_response(resp));
* ```
*/
class any_adapter
{
using fn_type = std::function<void(std::size_t, resp3::basic_node<std::string_view> const&, system::error_code&)>;

struct impl_t {
fn_type adapt_fn;
std::size_t supported_response_size;
} impl_;

template <class T>
static auto create_impl(T& resp) -> impl_t
{
using namespace boost::redis::adapter;
auto adapter = boost_redis_adapt(resp);
std::size_t size = adapter.get_supported_response_size();
return { std::move(adapter), size };
}

template <class Executor>
friend class detail::connection_base;

public:
/**
* @brief Constructor.
*
* Creates a type-erased response adapter from `resp` by calling
* `boost_redis_adapt`. `T` must be a valid Redis response type.
* Any type passed to @ref connection::async_exec qualifies.
*
* This object stores a reference to `resp`, which must be kept alive
* while `*this` is being used.
*/
template <class T, class = std::enable_if_t<!std::is_same_v<T, any_adapter>>>
explicit any_adapter(T& resp) : impl_(create_impl(resp)) {}
};

}

#endif
42 changes: 38 additions & 4 deletions include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_CONNECTION_HPP
#define BOOST_REDIS_CONNECTION_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/connection_base.hpp>
#include <boost/redis/logger.hpp>
#include <boost/redis/config.hpp>
Expand All @@ -17,7 +18,7 @@
#include <boost/asio/any_completion_handler.hpp>

#include <chrono>
#include <memory>
#include <cstddef>
#include <limits>

namespace boost::redis {
Expand Down Expand Up @@ -256,7 +257,22 @@ class basic_connection {
Response& resp = ignore,
CompletionToken&& token = CompletionToken{})
{
return impl_.async_exec(req, resp, std::forward<CompletionToken>(token));
return impl_.async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/** @copydoc async_exec
*
* @details This function uses the type-erased @ref any_adapter class, which
* encapsulates a reference to a response object.
*/
template <class CompletionToken = asio::default_completion_token_t<executor_type>>
auto
async_exec(
request const& req,
any_adapter adapter,
CompletionToken&& token = CompletionToken{})
{
return impl_.async_exec(req, std::move(adapter), std::forward<CompletionToken>(token));
}

/** @brief Cancel operations.
Expand Down Expand Up @@ -392,9 +408,21 @@ class connection {

/// Calls `boost::redis::basic_connection::async_exec`.
template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
auto async_exec(request const& req, Response& resp, CompletionToken&& token)
{
return impl_.async_exec(req, resp, std::move(token));
return async_exec(req, any_adapter(resp), std::forward<CompletionToken>(token));
}

/// Calls `boost::redis::basic_connection::async_exec`.
template <class CompletionToken>
auto async_exec(request const& req, any_adapter adapter, CompletionToken&& token)
{
return asio::async_initiate<
CompletionToken, void(boost::system::error_code, std::size_t)>(
[](auto handler, connection* self, request const* req, any_adapter&& adapter)
{
self->async_exec_impl(*req, std::move(adapter), std::move(handler));
}, token, this, &req, std::move(adapter));
}

/// Calls `boost::redis::basic_connection::cancel`.
Expand Down Expand Up @@ -435,6 +463,12 @@ class connection {
config const& cfg,
logger l,
asio::any_completion_handler<void(boost::system::error_code)> token);

void
async_exec_impl(
request const& req,
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token);

basic_connection<executor_type> impl_;
};
Expand Down
19 changes: 11 additions & 8 deletions include/boost/redis/detail/connection_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_CONNECTION_BASE_HPP
#define BOOST_REDIS_CONNECTION_BASE_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/adapter/adapt.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/error.hpp>
Expand All @@ -30,15 +31,16 @@
#include <boost/asio/read_until.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/associated_immediate_executor.hpp>

#include <algorithm>
#include <array>
#include <chrono>
#include <deque>
#include <memory>
#include <string_view>
#include <type_traits>
#include <functional>
#include <utility>

namespace boost::redis::detail
{
Expand Down Expand Up @@ -121,7 +123,9 @@ struct exec_op {
// be stablished.
if (info_->req_->get_config().cancel_if_not_connected && !conn_->is_open()) {
BOOST_ASIO_CORO_YIELD
asio::post(std::move(self));
asio::dispatch(
asio::get_associated_immediate_executor(self, self.get_io_executor()),
std::move(self));
return self.complete(error::not_connected, 0);
}

Expand Down Expand Up @@ -440,14 +444,13 @@ class connection_base {
cancel_impl(op);
}

template <class Response, class CompletionToken>
auto async_exec(request const& req, Response& resp, CompletionToken token)
template <class CompletionToken>
auto async_exec(request const& req, any_adapter&& adapter, CompletionToken&& token)
{
using namespace boost::redis::adapter;
auto f = boost_redis_adapt(resp);
BOOST_ASSERT_MSG(req.get_expected_responses() <= f.get_supported_response_size(), "Request and response have incompatible sizes.");
auto& adapter_impl = adapter.impl_;
BOOST_ASSERT_MSG(req.get_expected_responses() <= adapter_impl.supported_response_size, "Request and response have incompatible sizes.");

auto info = std::make_shared<req_info>(req, f, get_executor());
auto info = std::make_shared<req_info>(req, std::move(adapter_impl.adapt_fn), get_executor());

return asio::async_compose
< CompletionToken
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/detail/health_checker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/operation.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/helper.hpp>
#include <boost/redis/config.hpp>
#include <boost/asio/steady_timer.hpp>
Expand Down Expand Up @@ -44,7 +45,7 @@ class ping_op {
}

BOOST_ASIO_CORO_YIELD
conn_->async_exec(checker_->req_, checker_->resp_, std::move(self));
conn_->async_exec(checker_->req_, any_adapter(checker_->resp_), std::move(self));
if (ec || is_cancelled(self)) {
logger_.trace("ping_op: error/cancelled (1).");
checker_->wait_timer_.cancel();
Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/detail/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#ifndef BOOST_REDIS_RUNNER_HPP
#define BOOST_REDIS_RUNNER_HPP

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/detail/health_checker.hpp>
#include <boost/redis/config.hpp>
#include <boost/redis/response.hpp>
Expand Down Expand Up @@ -47,7 +48,7 @@ struct hello_op {
runner_->add_hello();

BOOST_ASIO_CORO_YIELD
conn_->async_exec(runner_->hello_req_, runner_->hello_resp_, std::move(self));
conn_->async_exec(runner_->hello_req_, any_adapter(runner_->hello_resp_), std::move(self));
logger_.on_hello(ec, runner_->hello_resp_);

if (ec || runner_->has_error_in_response() || is_cancelled(self)) {
Expand Down
10 changes: 10 additions & 0 deletions include/boost/redis/impl/connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include <boost/redis/connection.hpp>
#include <cstddef>

namespace boost::redis {

Expand All @@ -31,6 +32,15 @@ connection::async_run_impl(
impl_.async_run(cfg, l, std::move(token));
}

void
connection::async_exec_impl(
request const& req,
any_adapter&& adapter,
asio::any_completion_handler<void(boost::system::error_code, std::size_t)> token)
{
impl_.async_exec(req, std::move(adapter), std::move(token));
}

void connection::cancel(operation op)
{
impl_.cancel(op);
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ make_test(test_conn_exec_cancel 20)
make_test(test_conn_exec_cancel2 20)
make_test(test_conn_echo_stress 20)
make_test(test_conn_run_cancel 20)
make_test(test_any_adapter 17)
make_test(test_issue_50 20)
make_test(test_issue_181 17)

Expand Down
1 change: 1 addition & 0 deletions test/Jamfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ local tests =
test_low_level
test_request
test_run
test_any_adapter
;

# Build and run the tests
Expand Down
49 changes: 49 additions & 0 deletions test/test_any_adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#include <boost/redis/ignore.hpp>
#include <boost/redis/response.hpp>
#include <boost/redis/adapter/any_adapter.hpp>
#include <string>
#define BOOST_TEST_MODULE any_adapter
#include <boost/test/included/unit_test.hpp>

using boost::redis::generic_response;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::any_adapter;

BOOST_AUTO_TEST_CASE(any_adapter_response_types)
{
// any_adapter can be used with any supported responses
response<int> r1;
response<int, std::string> r2;
generic_response r3;

BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{ignore});
}

BOOST_AUTO_TEST_CASE(any_adapter_copy_move)
{
// any_adapter can be copied/moved
response<int, std::string> r;
any_adapter ad1 {r};

// copy constructor
any_adapter ad2 {ad1};

// move constructor
any_adapter ad3 {std::move(ad2)};

// copy assignment
BOOST_CHECK_NO_THROW(ad2 = ad1);

// move assignment
BOOST_CHECK_NO_THROW(ad2 = std::move(ad1));
}
24 changes: 24 additions & 0 deletions test/test_conn_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
* accompanying file LICENSE.txt)
*/

#include <boost/redis/adapter/any_adapter.hpp>
#include <boost/redis/connection.hpp>
#include <boost/system/errc.hpp>
#include <boost/asio/detached.hpp>
#include <string>
#define BOOST_TEST_MODULE conn-exec
#include <boost/test/included/unit_test.hpp>
#include <iostream>
Expand Down Expand Up @@ -191,3 +193,25 @@ BOOST_AUTO_TEST_CASE(large_number_of_concurrent_requests_issue_170)
BOOST_CHECK_EQUAL(counter, repeat);
}

BOOST_AUTO_TEST_CASE(exec_any_adapter)
{
// Executing an any_adapter object works
request req;
req.push("PING", "PONG");
response<std::string> res;

net::io_context ioc;

auto conn = std::make_shared<connection>(ioc);

conn->async_exec(req, boost::redis::any_adapter(res), [&](auto ec, auto){
BOOST_TEST(!ec);
conn->cancel();
});

run(conn);
ioc.run();

BOOST_TEST(std::get<0>(res).value() == "PONG");
}