Skip to content

GG model update: add client config argument to set async launch mode #525

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ namespace Aws

std::future<RpcError> Close(OnMessageFlushCallback onMessageFlushCallback = nullptr) noexcept;
std::future<TaggedResult> GetOperationResult() noexcept;
void WithLaunchMode(std::launch mode) noexcept;

protected:
std::future<RpcError> Activate(
Expand All @@ -515,6 +516,7 @@ namespace Aws
OnMessageFlushCallback onMessageFlushCallback) noexcept;
virtual Crt::String GetModelName() const noexcept = 0;
const OperationModelContext &m_operationModelContext;
std::launch m_asyncLaunchMode;

private:
EventStreamRpcStatusCode HandleData(const Crt::Optional<Crt::ByteBuf> &payload);
Expand Down
8 changes: 5 additions & 3 deletions eventstream_rpc/source/EventStreamClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1107,9 +1107,9 @@ namespace Aws
std::shared_ptr<StreamResponseHandler> streamHandler,
const OperationModelContext &operationModelContext,
Crt::Allocator *allocator) noexcept
: m_operationModelContext(operationModelContext), m_messageCount(0), m_allocator(allocator),
m_streamHandler(streamHandler), m_clientContinuation(connection.NewStream(*this)), m_expectedCloses(0),
m_streamClosedCalled(false)
: m_operationModelContext(operationModelContext), m_asyncLaunchMode(std::launch::deferred),
m_messageCount(0), m_allocator(allocator), m_streamHandler(streamHandler),
m_clientContinuation(connection.NewStream(*this)), m_expectedCloses(0), m_streamClosedCalled(false)
{
}

Expand Down Expand Up @@ -1515,6 +1515,8 @@ namespace Aws
}
}

void ClientOperation::WithLaunchMode(std::launch mode) noexcept { m_asyncLaunchMode = mode; }

std::future<RpcError> ClientOperation::Close(OnMessageFlushCallback onMessageFlushCallback) noexcept
{
const std::lock_guard<std::mutex> lock(m_continuationMutex);
Expand Down
28 changes: 23 additions & 5 deletions eventstream_rpc/tests/EchoTestRpcClient.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

/* This file is generated. */

#include <aws/crt/Types.h>
#include <aws/crt/io/Bootstrap.h>
#include <awstest/EchoTestRpcClient.h>
Expand All @@ -7,7 +14,8 @@ namespace Awstest
EchoTestRpcClient::EchoTestRpcClient(
Aws::Crt::Io::ClientBootstrap &clientBootstrap,
Aws::Crt::Allocator *allocator) noexcept
: m_connection(allocator), m_clientBootstrap(clientBootstrap), m_allocator(allocator)
: m_connection(allocator), m_clientBootstrap(clientBootstrap), m_allocator(allocator),
m_asyncLaunchMode(std::launch::deferred)
{
m_echoTestRpcServiceModel.AssignModelNameToErrorResponse(
Aws::Crt::String("awstest#ServiceError"), ServiceError::s_allocateFromPayload);
Expand All @@ -22,18 +30,24 @@ namespace Awstest

void EchoTestRpcClient::Close() noexcept { m_connection.Close(); }

void EchoTestRpcClient::WithLaunchMode(std::launch mode) noexcept { m_asyncLaunchMode = mode; }

EchoTestRpcClient::~EchoTestRpcClient() noexcept { Close(); }

std::shared_ptr<GetAllProductsOperation> EchoTestRpcClient::NewGetAllProducts() noexcept
{
return Aws::Crt::MakeShared<GetAllProductsOperation>(
auto operation = Aws::Crt::MakeShared<GetAllProductsOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_getAllProductsOperationContext, m_allocator);
operation->WithLaunchMode(m_asyncLaunchMode);
return operation;
}

std::shared_ptr<CauseServiceErrorOperation> EchoTestRpcClient::NewCauseServiceError() noexcept
{
return Aws::Crt::MakeShared<CauseServiceErrorOperation>(
auto operation = Aws::Crt::MakeShared<CauseServiceErrorOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_causeServiceErrorOperationContext, m_allocator);
operation->WithLaunchMode(m_asyncLaunchMode);
return operation;
}

std::shared_ptr<CauseStreamServiceToErrorOperation> EchoTestRpcClient::NewCauseStreamServiceToError(
Expand All @@ -60,14 +74,18 @@ namespace Awstest

std::shared_ptr<EchoMessageOperation> EchoTestRpcClient::NewEchoMessage() noexcept
{
return Aws::Crt::MakeShared<EchoMessageOperation>(
auto operation = Aws::Crt::MakeShared<EchoMessageOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_echoMessageOperationContext, m_allocator);
operation->WithLaunchMode(m_asyncLaunchMode);
return operation;
}

std::shared_ptr<GetAllCustomersOperation> EchoTestRpcClient::NewGetAllCustomers() noexcept
{
return Aws::Crt::MakeShared<GetAllCustomersOperation>(
auto operation = Aws::Crt::MakeShared<GetAllCustomersOperation>(
m_allocator, m_connection, m_echoTestRpcServiceModel.m_getAllCustomersOperationContext, m_allocator);
operation->WithLaunchMode(m_asyncLaunchMode);
return operation;
}

} // namespace Awstest
22 changes: 13 additions & 9 deletions eventstream_rpc/tests/EchoTestRpcModel.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

/* This file is generated. */

#include <aws/crt/Api.h>
#include <awstest/EchoTestRpcModel.h>

Expand Down Expand Up @@ -978,7 +985,7 @@ namespace Awstest

std::future<GetAllProductsResult> GetAllProductsOperation::GetResult() noexcept
{
return std::async(std::launch::deferred, [this]() { return GetAllProductsResult(GetOperationResult().get()); });
return std::async(m_asyncLaunchMode, [this]() { return GetAllProductsResult(GetOperationResult().get()); });
}

GetAllProductsOperation::GetAllProductsOperation(
Expand Down Expand Up @@ -1046,8 +1053,7 @@ namespace Awstest

std::future<CauseServiceErrorResult> CauseServiceErrorOperation::GetResult() noexcept
{
return std::async(
std::launch::deferred, [this]() { return CauseServiceErrorResult(GetOperationResult().get()); });
return std::async(m_asyncLaunchMode, [this]() { return CauseServiceErrorResult(GetOperationResult().get()); });
}

CauseServiceErrorOperation::CauseServiceErrorOperation(
Expand Down Expand Up @@ -1138,7 +1144,7 @@ namespace Awstest
std::future<CauseStreamServiceToErrorResult> CauseStreamServiceToErrorOperation::GetResult() noexcept
{
return std::async(
std::launch::deferred, [this]() { return CauseStreamServiceToErrorResult(GetOperationResult().get()); });
m_asyncLaunchMode, [this]() { return CauseStreamServiceToErrorResult(GetOperationResult().get()); });
}

CauseStreamServiceToErrorOperation::CauseStreamServiceToErrorOperation(
Expand Down Expand Up @@ -1224,8 +1230,7 @@ namespace Awstest

std::future<EchoStreamMessagesResult> EchoStreamMessagesOperation::GetResult() noexcept
{
return std::async(
std::launch::deferred, [this]() { return EchoStreamMessagesResult(GetOperationResult().get()); });
return std::async(m_asyncLaunchMode, [this]() { return EchoStreamMessagesResult(GetOperationResult().get()); });
}

EchoStreamMessagesOperation::EchoStreamMessagesOperation(
Expand Down Expand Up @@ -1292,7 +1297,7 @@ namespace Awstest

std::future<EchoMessageResult> EchoMessageOperation::GetResult() noexcept
{
return std::async(std::launch::deferred, [this]() { return EchoMessageResult(GetOperationResult().get()); });
return std::async(m_asyncLaunchMode, [this]() { return EchoMessageResult(GetOperationResult().get()); });
}

EchoMessageOperation::EchoMessageOperation(
Expand Down Expand Up @@ -1359,8 +1364,7 @@ namespace Awstest

std::future<GetAllCustomersResult> GetAllCustomersOperation::GetResult() noexcept
{
return std::async(
std::launch::deferred, [this]() { return GetAllCustomersResult(GetOperationResult().get()); });
return std::async(m_asyncLaunchMode, [this]() { return GetAllCustomersResult(GetOperationResult().get()); });
}

GetAllCustomersOperation::GetAllCustomersOperation(
Expand Down
13 changes: 9 additions & 4 deletions eventstream_rpc/tests/include/awstest/EchoTestRpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@ namespace Awstest
Aws::Crt::Allocator *allocator = Aws::Crt::g_allocator) noexcept;
/**
* Connect the client to the server
* @param lifecycleHandler An interface that is called upon when lifecycle events relating to the connection
* occur.
* @param connectionConfig The configuration parameters used for establishing the connection.
* @return An `RpcError` that can be used to check whether the connection was established.
* @param lifecycleHandler An interface that is called upon when lifecycle
* events relating to the connection occur.
* @param connectionConfig The configuration parameters used for establishing
* the connection.
* @return An `RpcError` that can be used to check whether the connection was
* established.
*/
std::future<RpcError> Connect(
ConnectionLifecycleHandler &lifecycleHandler,
const ConnectionConfig &connectionConfig = DefaultConnectionConfig()) noexcept;
bool IsConnected() const noexcept { return m_connection.IsOpen(); }
void Close() noexcept;
void WithLaunchMode(std::launch mode) noexcept;

std::shared_ptr<GetAllProductsOperation> NewGetAllProducts() noexcept;

std::shared_ptr<CauseServiceErrorOperation> NewCauseServiceError() noexcept;
Expand All @@ -60,5 +64,6 @@ namespace Awstest
Aws::Crt::Io::ClientBootstrap &m_clientBootstrap;
Aws::Crt::Allocator *m_allocator;
MessageAmendment m_connectAmendment;
std::launch m_asyncLaunchMode;
};
} // namespace Awstest
Loading