Skip to content

Commit e41ae3d

Browse files
committed
Add Greengrass IPC sample
1 parent 29878fc commit e41ae3d

File tree

11 files changed

+1019
-148
lines changed

11 files changed

+1019
-148
lines changed

eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,11 @@ namespace Aws
134134
MessageAmendment(const Crt::ByteBuf &payload, Crt::Allocator *allocator = Crt::g_allocator) noexcept;
135135
void AddHeader(EventStreamHeader &&header) noexcept;
136136
void SetPayload(const Crt::Optional<Crt::ByteBuf> &payload) noexcept;
137-
Crt::List<EventStreamHeader> &GetHeaders() noexcept;
137+
Crt::List<EventStreamHeader> &GetHeaders() const noexcept;
138138
const Crt::Optional<Crt::ByteBuf> &GetPayload() const noexcept;
139139

140140
private:
141-
Crt::List<EventStreamHeader> m_headers;
141+
mutable Crt::List<EventStreamHeader> m_headers;
142142
Crt::Optional<Crt::ByteBuf> m_payload;
143143
Crt::Allocator *m_allocator;
144144
};
@@ -198,6 +198,31 @@ namespace Aws
198198
OnMessageFlushCallback m_connectRequestCallback;
199199
};
200200

201+
enum EventStreamRpcStatusCode
202+
{
203+
EVENT_STREAM_RPC_SUCCESS = 0,
204+
EVENT_STREAM_RPC_NULL_PARAMETER,
205+
EVENT_STREAM_RPC_UNINITIALIZED,
206+
EVENT_STREAM_RPC_ALLOCATION_ERROR,
207+
EVENT_STREAM_RPC_CONNECTION_SETUP_FAILED,
208+
EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED,
209+
EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED,
210+
EVENT_STREAM_RPC_CONNECTION_CLOSED,
211+
EVENT_STREAM_RPC_CONTINUATION_CLOSED,
212+
EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE,
213+
EVENT_STREAM_RPC_UNMAPPED_DATA,
214+
EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE,
215+
EVENT_STREAM_RPC_CRT_ERROR
216+
};
217+
218+
struct RpcError
219+
{
220+
EventStreamRpcStatusCode baseStatus;
221+
int crtError;
222+
operator bool() const noexcept { return baseStatus == EVENT_STREAM_RPC_SUCCESS; }
223+
Crt::String StatusToString();
224+
};
225+
201226
class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler
202227
{
203228
public:
@@ -208,17 +233,19 @@ namespace Aws
208233
*/
209234
virtual void OnConnectCallback();
210235
/**
211-
* Invoked upon connection shutdown. `errorCode` will specify
212-
* shutdown reason. A graceful connection close will set `errorCode` to
213-
* `AWS_ERROR_SUCCESS` or 0.
236+
* Invoked upon connection shutdown.
237+
* @param status The status upon disconnection. It can be treated as a bool
238+
* with true implying a successful disconnection.
214239
*/
215-
virtual void OnDisconnectCallback(int errorCode);
240+
virtual void OnDisconnectCallback(RpcError status);
216241
/**
217242
* Invoked upon receiving an error. Use the return value to determine
218243
* whether or not to force the connection to close. Keep in mind that once
219244
* closed, the `ClientConnection` can no longer send messages.
245+
* @param status The status upon disconnection. It can be treated as a bool
246+
* with true implying a successful disconnection.
220247
*/
221-
virtual bool OnErrorCallback(int errorCode);
248+
virtual bool OnErrorCallback(RpcError status);
222249
/**
223250
* Invoked upon receiving a ping from the server. The `headers` and `payload`
224251
* refer to what is contained in the ping message.
@@ -272,31 +299,6 @@ namespace Aws
272299
ContinuationCallbackData *m_callbackData;
273300
};
274301

275-
enum EventStreamRpcStatusCode
276-
{
277-
EVENT_STREAM_RPC_SUCCESS = 0,
278-
EVENT_STREAM_RPC_NULL_PARAMETER,
279-
EVENT_STREAM_RPC_UNINITIALIZED,
280-
EVENT_STREAM_RPC_ALLOCATION_ERROR,
281-
EVENT_STREAM_RPC_CONNECTION_SETUP_FAILED,
282-
EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED,
283-
EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED,
284-
EVENT_STREAM_RPC_CONNECTION_CLOSED,
285-
EVENT_STREAM_RPC_CONTINUATION_CLOSED,
286-
EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE,
287-
EVENT_STREAM_RPC_UNMAPPED_DATA,
288-
EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE,
289-
EVENT_STREAM_RPC_CRT_ERROR
290-
};
291-
292-
struct RpcError
293-
{
294-
EventStreamRpcStatusCode baseStatus;
295-
int crtError;
296-
operator bool() const noexcept { return baseStatus == EVENT_STREAM_RPC_SUCCESS; }
297-
Crt::String ErrorToString();
298-
};
299-
300302
class AWS_EVENTSTREAMRPC_API ClientContinuation final
301303
{
302304
public:

eventstream_rpc/source/EventStreamClient.cpp

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ namespace Aws
9797
rhs.m_payload = Crt::Optional<Crt::ByteBuf>();
9898
}
9999

100-
Crt::List<EventStreamHeader> &MessageAmendment::GetHeaders() noexcept { return m_headers; }
100+
Crt::List<EventStreamHeader> &MessageAmendment::GetHeaders() const noexcept { return m_headers; }
101101

102102
const Crt::Optional<Crt::ByteBuf> &MessageAmendment::GetPayload() const noexcept { return m_payload; }
103103

@@ -213,10 +213,10 @@ namespace Aws
213213
}
214214
}
215215

216-
bool ConnectionLifecycleHandler::OnErrorCallback(int errorCode)
216+
bool ConnectionLifecycleHandler::OnErrorCallback(RpcError error)
217217
{
218-
(void)errorCode;
219-
/* Returning true implies that the connection will close upon receiving an error. */
218+
(void)error;
219+
/* Returning true implies that the connection should close as a result of encountering this error. */
220220
return true;
221221
}
222222

@@ -230,7 +230,43 @@ namespace Aws
230230

231231
void ConnectionLifecycleHandler::OnConnectCallback() {}
232232

233-
void ConnectionLifecycleHandler::OnDisconnectCallback(int errorCode) { (void)errorCode; }
233+
void ConnectionLifecycleHandler::OnDisconnectCallback(RpcError error) { (void)error; }
234+
235+
Crt::String RpcError::StatusToString()
236+
{
237+
switch (baseStatus)
238+
{
239+
case EVENT_STREAM_RPC_SUCCESS:
240+
return "EVENT_STREAM_RPC_SUCCESS";
241+
case EVENT_STREAM_RPC_NULL_PARAMETER:
242+
return "EVENT_STREAM_RPC_NULL_PARAMETER";
243+
case EVENT_STREAM_RPC_UNINITIALIZED:
244+
return "EVENT_STREAM_RPC_UNINITIALIZED";
245+
case EVENT_STREAM_RPC_ALLOCATION_ERROR:
246+
return "EVENT_STREAM_RPC_ALLOCATION_ERROR";
247+
case EVENT_STREAM_RPC_CONNECTION_SETUP_FAILED:
248+
return "EVENT_STREAM_RPC_CONNECTION_SETUP_FAILED";
249+
case EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED:
250+
return "EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED";
251+
case EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED:
252+
return "EVENT_STREAM_RPC_CONNECTION_ALREADY_ESTABLISHED";
253+
case EVENT_STREAM_RPC_CONNECTION_CLOSED:
254+
return "EVENT_STREAM_RPC_CONNECTION_CLOSED";
255+
case EVENT_STREAM_RPC_CONTINUATION_CLOSED:
256+
return "EVENT_STREAM_RPC_CONTINUATION_CLOSED";
257+
case EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE:
258+
return "EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE";
259+
case EVENT_STREAM_RPC_UNMAPPED_DATA:
260+
return "EVENT_STREAM_RPC_UNMAPPED_DATA";
261+
case EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE:
262+
return "EVENT_STREAM_RPC_UNSUPPORTED_CONTENT_TYPE";
263+
case EVENT_STREAM_RPC_CRT_ERROR:
264+
Crt::String ret = "Failed with EVENT_STREAM_RPC_CRT_ERROR, the CRT error was ";
265+
ret += Crt::ErrorDebugString(crtError);
266+
return ret;
267+
}
268+
return "Unknown status code";
269+
}
234270

235271
std::future<RpcError> ClientConnection::Connect(
236272
const ConnectionConfig &connectionConfig,
@@ -320,7 +356,7 @@ namespace Aws
320356
AWS_LOGF_ERROR(
321357
AWS_LS_EVENT_STREAM_RPC_CLIENT,
322358
"A CRT error occurred while attempting to establish the connection: %s",
323-
aws_error_debug_str(crtError));
359+
Crt::ErrorDebugString(crtError));
324360
errorPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, crtError});
325361
return errorPromise.get_future();
326362
}
@@ -393,7 +429,7 @@ namespace Aws
393429
AWS_LOGF_ERROR(
394430
AWS_LS_EVENT_STREAM_RPC_CLIENT,
395431
"A CRT error occurred while attempting to send a message: %s",
396-
aws_error_debug_str(errorCode));
432+
Crt::ErrorDebugString(errorCode));
397433
callbackData->onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
398434
}
399435
else
@@ -463,7 +499,7 @@ namespace Aws
463499
AWS_LOGF_ERROR(
464500
AWS_LS_EVENT_STREAM_RPC_CLIENT,
465501
"A CRT error occurred while queueing a message to be sent on the connection: %s",
466-
aws_error_debug_str(errorCode));
502+
Crt::ErrorDebugString(errorCode));
467503
onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
468504
Crt::Delete(callbackContainer, connection->m_allocator);
469505
}
@@ -607,12 +643,12 @@ namespace Aws
607643
AWS_LOGF_ERROR(
608644
AWS_LS_EVENT_STREAM_RPC_CLIENT,
609645
"A CRT error occurred while setting up the connection: %s",
610-
aws_error_debug_str(errorCode));
646+
Crt::ErrorDebugString(errorCode));
611647
thisConnection->m_connectAckedPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
612648
aws_event_stream_rpc_client_connection_release(connection);
613649
thisConnection->m_underlyingConnection = nullptr;
614650
/* No connection to close on error, so no need to check return value of the callback. */
615-
(void)thisConnection->m_lifecycleHandler->OnErrorCallback(errorCode);
651+
(void)thisConnection->m_lifecycleHandler->OnErrorCallback({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
616652
}
617653
else if (thisConnection->m_clientState == DISCONNECTING || thisConnection->m_clientState == DISCONNECTED)
618654
{
@@ -687,7 +723,14 @@ namespace Aws
687723

688724
if (thisConnection->m_onConnectCalled)
689725
{
690-
thisConnection->m_lifecycleHandler->OnDisconnectCallback(errorCode);
726+
if (errorCode)
727+
{
728+
thisConnection->m_lifecycleHandler->OnDisconnectCallback({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
729+
}
730+
else
731+
{
732+
thisConnection->m_lifecycleHandler->OnDisconnectCallback({EVENT_STREAM_RPC_SUCCESS, 0});
733+
}
691734
thisConnection->m_onConnectCalled = false;
692735
}
693736

@@ -696,7 +739,7 @@ namespace Aws
696739
AWS_LOGF_ERROR(
697740
AWS_LS_EVENT_STREAM_RPC_CLIENT,
698741
"A CRT error occurred while shutting down the connection: %s",
699-
aws_error_debug_str(errorCode));
742+
Crt::ErrorDebugString(errorCode));
700743
thisConnection->m_closedPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
701744
}
702745
else
@@ -770,7 +813,8 @@ namespace Aws
770813
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR:
771814
case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR:
772815

773-
if (thisConnection->m_lifecycleHandler->OnErrorCallback(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR))
816+
if (thisConnection->m_lifecycleHandler->OnErrorCallback(
817+
{EVENT_STREAM_RPC_CRT_ERROR, AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR}))
774818
{
775819
thisConnection->Close();
776820
}
@@ -780,7 +824,8 @@ namespace Aws
780824
default:
781825
return;
782826

783-
if (thisConnection->m_lifecycleHandler->OnErrorCallback(EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE))
827+
if (thisConnection->m_lifecycleHandler->OnErrorCallback(
828+
{EVENT_STREAM_RPC_UNKNOWN_PROTOCOL_MESSAGE, 0}))
784829
{
785830
thisConnection->Close();
786831
}
@@ -1022,7 +1067,7 @@ namespace Aws
10221067
AWS_LOGF_ERROR(
10231068
AWS_LS_EVENT_STREAM_RPC_CLIENT,
10241069
"A CRT error occurred while queueing a message to be sent on a stream: %s",
1025-
aws_error_debug_str(errorCode));
1070+
Crt::ErrorDebugString(errorCode));
10261071
onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
10271072
Crt::Delete(callbackContainer, m_allocator);
10281073
}
@@ -1307,7 +1352,8 @@ namespace Aws
13071352
{
13081353
(void)operationError;
13091354
(void)rpcError;
1310-
/* Note: Always returning true forces the stream to close when an error occurs. */
1355+
/* Note: Always returning true implies that the stream should close
1356+
* as a result of encountering this error. */
13111357
return true;
13121358
}
13131359

@@ -1523,7 +1569,7 @@ namespace Aws
15231569
AWS_LOGF_ERROR(
15241570
AWS_LS_EVENT_STREAM_RPC_CLIENT,
15251571
"A CRT error occurred while closing the stream: %s",
1526-
aws_error_debug_str(errorCode));
1572+
Crt::ErrorDebugString(errorCode));
15271573
onTerminatePromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode});
15281574
Crt::Delete(callbackContainer, m_allocator);
15291575
}

0 commit comments

Comments
 (0)