From 206c339c3e7d74afe7f6fc62ce331aa9903b23a9 Mon Sep 17 00:00:00 2001 From: Pushkar Chawda Date: Fri, 27 Oct 2023 16:22:23 +0000 Subject: [PATCH 1/6] Tidy up changes added with PR#115 andlimit it to only work with Python3.12 in Lambda. --- awslambdaric/bootstrap.py | 4 +++- awslambdaric/lambda_runtime_client.py | 25 +++++++++++++++++++----- awslambdaric/lambda_runtime_exception.py | 1 + tests/test_lambda_runtime_client.py | 16 ++++++++++++++- 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/awslambdaric/bootstrap.py b/awslambdaric/bootstrap.py index a3da58c..a562016 100644 --- a/awslambdaric/bootstrap.py +++ b/awslambdaric/bootstrap.py @@ -462,6 +462,8 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.stdout = Unbuffered(sys.stdout) sys.stderr = Unbuffered(sys.stderr) + aws_exec_env = os.environ.get("AWS_EXECUTION_ENV") + with create_log_sink() as log_sink: lambda_runtime_client = LambdaRuntimeClient(lambda_runtime_api_addr) @@ -479,7 +481,7 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.exit(1) while True: - event_request = lambda_runtime_client.wait_next_invocation() + event_request = lambda_runtime_client.wait_next_invocation(aws_exec_env) _GLOBAL_AWS_REQUEST_ID = event_request.invoke_id diff --git a/awslambdaric/lambda_runtime_client.py b/awslambdaric/lambda_runtime_client.py index b05918b..c08c371 100644 --- a/awslambdaric/lambda_runtime_client.py +++ b/awslambdaric/lambda_runtime_client.py @@ -3,8 +3,9 @@ """ import sys -from concurrent.futures import ThreadPoolExecutor from awslambdaric import __version__ +from concurrent.futures import ThreadPoolExecutor +from .lambda_runtime_exception import FaultException def _user_agent(): @@ -68,10 +69,24 @@ def post_init_error(self, error_response_data): if response.code != http.HTTPStatus.ACCEPTED: raise LambdaRuntimeClientError(endpoint, response.code, response_body) - def wait_next_invocation(self): - with ThreadPoolExecutor() as e: - fut = e.submit(runtime_client.next) - response_body, headers = fut.result() + def wait_next_invocation(self, aws_exec_env): + # Calling runtime_client.next() from a separate thread unblocks the main thread, + # which can then process signals. + if not aws_exec_env or aws_exec_env == "AWS_Lambda_python3.12": + try: + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(runtime_client.next) + response_body, headers = future.result() + except Exception as exception: + raise FaultException( + FaultException.LAMBDA_RUNTIME_CLIENT_ERROR, + "LAMBDA_RUNTIME Failed to get next invocation: {}".format( + str(exception) + ), + None, + ) + else: + response_body, headers = runtime_client.next() return InvocationRequest( invoke_id=headers.get("Lambda-Runtime-Aws-Request-Id"), x_amzn_trace_id=headers.get("Lambda-Runtime-Trace-Id"), diff --git a/awslambdaric/lambda_runtime_exception.py b/awslambdaric/lambda_runtime_exception.py index 416327e..e09af70 100644 --- a/awslambdaric/lambda_runtime_exception.py +++ b/awslambdaric/lambda_runtime_exception.py @@ -12,6 +12,7 @@ class FaultException(Exception): BUILT_IN_MODULE_CONFLICT = "Runtime.BuiltInModuleConflict" MALFORMED_HANDLER_NAME = "Runtime.MalformedHandlerName" LAMBDA_CONTEXT_UNMARSHAL_ERROR = "Runtime.LambdaContextUnmarshalError" + LAMBDA_RUNTIME_CLIENT_ERROR = "Runtime.LambdaRuntimeClientError" def __init__(self, exception_type, msg, trace=None): self.msg = msg diff --git a/tests/test_lambda_runtime_client.py b/tests/test_lambda_runtime_client.py index 47d95cf..e114513 100644 --- a/tests/test_lambda_runtime_client.py +++ b/tests/test_lambda_runtime_client.py @@ -72,7 +72,21 @@ def test_wait_next_invocation(self, mock_runtime_client): mock_runtime_client.next.return_value = response_body, headears runtime_client = LambdaRuntimeClient("localhost:1234") - event_request = runtime_client.wait_next_invocation() + aws_exec_env = "AWS_Lambda_python3.12" + event_request = runtime_client.wait_next_invocation(aws_exec_env) + + self.assertIsNotNone(event_request) + self.assertEqual(event_request.invoke_id, "RID1234") + self.assertEqual(event_request.x_amzn_trace_id, "TID1234") + self.assertEqual(event_request.invoked_function_arn, "FARN1234") + self.assertEqual(event_request.deadline_time_in_ms, 12) + self.assertEqual(event_request.client_context, "client_context") + self.assertEqual(event_request.cognito_identity, "cognito_identity") + self.assertEqual(event_request.content_type, "application/json") + self.assertEqual(event_request.event_body, response_body) + + aws_exec_env = "AWS_Lambda_python3.11" + event_request = runtime_client.wait_next_invocation(aws_exec_env) self.assertIsNotNone(event_request) self.assertEqual(event_request.invoke_id, "RID1234") From 2c7ee0083a4cfe59057db7616f54447067acd381 Mon Sep 17 00:00:00 2001 From: Pushkar Chawda Date: Fri, 27 Oct 2023 18:46:10 +0000 Subject: [PATCH 2/6] Implement code review comments --- awslambdaric/bootstrap.py | 4 ++-- awslambdaric/lambda_runtime_client.py | 13 ++++++------- tests/test_lambda_runtime_client.py | 9 +++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/awslambdaric/bootstrap.py b/awslambdaric/bootstrap.py index a562016..99dc1a9 100644 --- a/awslambdaric/bootstrap.py +++ b/awslambdaric/bootstrap.py @@ -462,7 +462,7 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.stdout = Unbuffered(sys.stdout) sys.stderr = Unbuffered(sys.stderr) - aws_exec_env = os.environ.get("AWS_EXECUTION_ENV") + use_thread_for_polling_next = os.environ.get("AWS_EXECUTION_ENV") == "AWS_Lambda_python3.12" with create_log_sink() as log_sink: lambda_runtime_client = LambdaRuntimeClient(lambda_runtime_api_addr) @@ -481,7 +481,7 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.exit(1) while True: - event_request = lambda_runtime_client.wait_next_invocation(aws_exec_env) + event_request = lambda_runtime_client.wait_next_invocation(use_thread_for_polling_next) _GLOBAL_AWS_REQUEST_ID = event_request.invoke_id diff --git a/awslambdaric/lambda_runtime_client.py b/awslambdaric/lambda_runtime_client.py index c08c371..8714556 100644 --- a/awslambdaric/lambda_runtime_client.py +++ b/awslambdaric/lambda_runtime_client.py @@ -4,9 +4,6 @@ import sys from awslambdaric import __version__ -from concurrent.futures import ThreadPoolExecutor -from .lambda_runtime_exception import FaultException - def _user_agent(): py_version = ( @@ -69,19 +66,21 @@ def post_init_error(self, error_response_data): if response.code != http.HTTPStatus.ACCEPTED: raise LambdaRuntimeClientError(endpoint, response.code, response_body) - def wait_next_invocation(self, aws_exec_env): + def wait_next_invocation(self, use_thread_for_polling_next=False): # Calling runtime_client.next() from a separate thread unblocks the main thread, # which can then process signals. - if not aws_exec_env or aws_exec_env == "AWS_Lambda_python3.12": + if use_thread_for_polling_next: + from concurrent.futures import ThreadPoolExecutor + from .lambda_runtime_exception import FaultException try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(runtime_client.next) response_body, headers = future.result() - except Exception as exception: + except Exception as e: raise FaultException( FaultException.LAMBDA_RUNTIME_CLIENT_ERROR, "LAMBDA_RUNTIME Failed to get next invocation: {}".format( - str(exception) + str(e) ), None, ) diff --git a/tests/test_lambda_runtime_client.py b/tests/test_lambda_runtime_client.py index e114513..860e52f 100644 --- a/tests/test_lambda_runtime_client.py +++ b/tests/test_lambda_runtime_client.py @@ -72,8 +72,8 @@ def test_wait_next_invocation(self, mock_runtime_client): mock_runtime_client.next.return_value = response_body, headears runtime_client = LambdaRuntimeClient("localhost:1234") - aws_exec_env = "AWS_Lambda_python3.12" - event_request = runtime_client.wait_next_invocation(aws_exec_env) + use_thread_for_polling_next = True + event_request = runtime_client.wait_next_invocation(use_thread_for_polling_next) self.assertIsNotNone(event_request) self.assertEqual(event_request.invoke_id, "RID1234") @@ -85,8 +85,9 @@ def test_wait_next_invocation(self, mock_runtime_client): self.assertEqual(event_request.content_type, "application/json") self.assertEqual(event_request.event_body, response_body) - aws_exec_env = "AWS_Lambda_python3.11" - event_request = runtime_client.wait_next_invocation(aws_exec_env) + #Using ThreadPoolExecutor to polling next() + use_thread_for_polling_next = False + event_request = runtime_client.wait_next_invocation(use_thread_for_polling_next) self.assertIsNotNone(event_request) self.assertEqual(event_request.invoke_id, "RID1234") From 63cf68b26e01542784b044f05f49ef185f32d9cb Mon Sep 17 00:00:00 2001 From: Pushkar Chawda Date: Fri, 27 Oct 2023 19:00:45 +0000 Subject: [PATCH 3/6] Format changes to fix build failure. --- awslambdaric/bootstrap.py | 8 ++++++-- awslambdaric/lambda_runtime_client.py | 6 +++--- tests/test_lambda_runtime_client.py | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/awslambdaric/bootstrap.py b/awslambdaric/bootstrap.py index 99dc1a9..0a6eb59 100644 --- a/awslambdaric/bootstrap.py +++ b/awslambdaric/bootstrap.py @@ -462,7 +462,9 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.stdout = Unbuffered(sys.stdout) sys.stderr = Unbuffered(sys.stderr) - use_thread_for_polling_next = os.environ.get("AWS_EXECUTION_ENV") == "AWS_Lambda_python3.12" + use_thread_for_polling_next = ( + os.environ.get("AWS_EXECUTION_ENV") == "AWS_Lambda_python3.12" + ) with create_log_sink() as log_sink: lambda_runtime_client = LambdaRuntimeClient(lambda_runtime_api_addr) @@ -481,7 +483,9 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.exit(1) while True: - event_request = lambda_runtime_client.wait_next_invocation(use_thread_for_polling_next) + event_request = lambda_runtime_client.wait_next_invocation( + use_thread_for_polling_next + ) _GLOBAL_AWS_REQUEST_ID = event_request.invoke_id diff --git a/awslambdaric/lambda_runtime_client.py b/awslambdaric/lambda_runtime_client.py index 8714556..db26db0 100644 --- a/awslambdaric/lambda_runtime_client.py +++ b/awslambdaric/lambda_runtime_client.py @@ -5,6 +5,7 @@ import sys from awslambdaric import __version__ + def _user_agent(): py_version = ( f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" @@ -72,6 +73,7 @@ def wait_next_invocation(self, use_thread_for_polling_next=False): if use_thread_for_polling_next: from concurrent.futures import ThreadPoolExecutor from .lambda_runtime_exception import FaultException + try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(runtime_client.next) @@ -79,9 +81,7 @@ def wait_next_invocation(self, use_thread_for_polling_next=False): except Exception as e: raise FaultException( FaultException.LAMBDA_RUNTIME_CLIENT_ERROR, - "LAMBDA_RUNTIME Failed to get next invocation: {}".format( - str(e) - ), + "LAMBDA_RUNTIME Failed to get next invocation: {}".format(str(e)), None, ) else: diff --git a/tests/test_lambda_runtime_client.py b/tests/test_lambda_runtime_client.py index 860e52f..39c4203 100644 --- a/tests/test_lambda_runtime_client.py +++ b/tests/test_lambda_runtime_client.py @@ -85,7 +85,7 @@ def test_wait_next_invocation(self, mock_runtime_client): self.assertEqual(event_request.content_type, "application/json") self.assertEqual(event_request.event_body, response_body) - #Using ThreadPoolExecutor to polling next() + # Using ThreadPoolExecutor to polling next() use_thread_for_polling_next = False event_request = runtime_client.wait_next_invocation(use_thread_for_polling_next) From ab3c30d15f6caa07937870b6dabec8d761236590 Mon Sep 17 00:00:00 2001 From: Pushkar Chawda Date: Fri, 27 Oct 2023 21:22:41 +0000 Subject: [PATCH 4/6] Implement code review comments. --- awslambdaric/bootstrap.py | 8 ++++---- awslambdaric/lambda_runtime_client.py | 18 ++++++++++-------- tests/test_lambda_runtime_client.py | 8 ++++---- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/awslambdaric/bootstrap.py b/awslambdaric/bootstrap.py index 0a6eb59..f87ee1b 100644 --- a/awslambdaric/bootstrap.py +++ b/awslambdaric/bootstrap.py @@ -467,7 +467,9 @@ def run(app_root, handler, lambda_runtime_api_addr): ) with create_log_sink() as log_sink: - lambda_runtime_client = LambdaRuntimeClient(lambda_runtime_api_addr) + lambda_runtime_client = LambdaRuntimeClient( + lambda_runtime_api_addr, use_thread_for_polling_next + ) try: _setup_logging(_AWS_LAMBDA_LOG_FORMAT, _AWS_LAMBDA_LOG_LEVEL, log_sink) @@ -483,9 +485,7 @@ def run(app_root, handler, lambda_runtime_api_addr): sys.exit(1) while True: - event_request = lambda_runtime_client.wait_next_invocation( - use_thread_for_polling_next - ) + event_request = lambda_runtime_client.wait_next_invocation() _GLOBAL_AWS_REQUEST_ID = event_request.invoke_id diff --git a/awslambdaric/lambda_runtime_client.py b/awslambdaric/lambda_runtime_client.py index db26db0..3aad921 100644 --- a/awslambdaric/lambda_runtime_client.py +++ b/awslambdaric/lambda_runtime_client.py @@ -48,8 +48,11 @@ class LambdaRuntimeClient(object): and response. It allows for function authors to override the the default implementation, LambdaMarshaller which unmarshals and marshals JSON, to an instance of a class that implements the same interface.""" - def __init__(self, lambda_runtime_address): + def __init__(self, lambda_runtime_address, use_thread_for_polling_next=False): self.lambda_runtime_address = lambda_runtime_address + self.use_thread_for_polling_next = use_thread_for_polling_next + if self.use_thread_for_polling_next: + from concurrent.futures import ThreadPoolExecutor def post_init_error(self, error_response_data): # These imports are heavy-weight. They implicitly trigger `import ssl, hashlib`. @@ -67,20 +70,19 @@ def post_init_error(self, error_response_data): if response.code != http.HTTPStatus.ACCEPTED: raise LambdaRuntimeClientError(endpoint, response.code, response_body) - def wait_next_invocation(self, use_thread_for_polling_next=False): + def wait_next_invocation(self): # Calling runtime_client.next() from a separate thread unblocks the main thread, # which can then process signals. - if use_thread_for_polling_next: - from concurrent.futures import ThreadPoolExecutor - from .lambda_runtime_exception import FaultException - + if self.use_thread_for_polling_next: try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(runtime_client.next) response_body, headers = future.result() except Exception as e: - raise FaultException( - FaultException.LAMBDA_RUNTIME_CLIENT_ERROR, + from .lambda_runtime_exception import FaultExceptions + + raise FaultExceptions( + FaultExceptions.LAMBDA_RUNTIME_CLIENT_ERROR, "LAMBDA_RUNTIME Failed to get next invocation: {}".format(str(e)), None, ) diff --git a/tests/test_lambda_runtime_client.py b/tests/test_lambda_runtime_client.py index 39c4203..b0eae4a 100644 --- a/tests/test_lambda_runtime_client.py +++ b/tests/test_lambda_runtime_client.py @@ -72,8 +72,7 @@ def test_wait_next_invocation(self, mock_runtime_client): mock_runtime_client.next.return_value = response_body, headears runtime_client = LambdaRuntimeClient("localhost:1234") - use_thread_for_polling_next = True - event_request = runtime_client.wait_next_invocation(use_thread_for_polling_next) + event_request = runtime_client.wait_next_invocation() self.assertIsNotNone(event_request) self.assertEqual(event_request.invoke_id, "RID1234") @@ -86,8 +85,9 @@ def test_wait_next_invocation(self, mock_runtime_client): self.assertEqual(event_request.event_body, response_body) # Using ThreadPoolExecutor to polling next() - use_thread_for_polling_next = False - event_request = runtime_client.wait_next_invocation(use_thread_for_polling_next) + runtime_client = LambdaRuntimeClient("localhost:1234", True) + + event_request = runtime_client.wait_next_invocation() self.assertIsNotNone(event_request) self.assertEqual(event_request.invoke_id, "RID1234") From 2841042466fd8736e00d561ba23a69483be3d876 Mon Sep 17 00:00:00 2001 From: Pushkar Chawda Date: Fri, 27 Oct 2023 21:41:53 +0000 Subject: [PATCH 5/6] Move import back under if as importing on init doesn't work. --- awslambdaric/lambda_runtime_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/awslambdaric/lambda_runtime_client.py b/awslambdaric/lambda_runtime_client.py index 3aad921..b97e7ec 100644 --- a/awslambdaric/lambda_runtime_client.py +++ b/awslambdaric/lambda_runtime_client.py @@ -51,8 +51,6 @@ class LambdaRuntimeClient(object): def __init__(self, lambda_runtime_address, use_thread_for_polling_next=False): self.lambda_runtime_address = lambda_runtime_address self.use_thread_for_polling_next = use_thread_for_polling_next - if self.use_thread_for_polling_next: - from concurrent.futures import ThreadPoolExecutor def post_init_error(self, error_response_data): # These imports are heavy-weight. They implicitly trigger `import ssl, hashlib`. @@ -74,13 +72,14 @@ def wait_next_invocation(self): # Calling runtime_client.next() from a separate thread unblocks the main thread, # which can then process signals. if self.use_thread_for_polling_next: + from concurrent.futures import ThreadPoolExecutor + from .lambda_runtime_exception import FaultException + try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(runtime_client.next) response_body, headers = future.result() except Exception as e: - from .lambda_runtime_exception import FaultExceptions - raise FaultExceptions( FaultExceptions.LAMBDA_RUNTIME_CLIENT_ERROR, "LAMBDA_RUNTIME Failed to get next invocation: {}".format(str(e)), From 9bdbc9e480432603e9a88ae9ea26d4ccc9fc4fc5 Mon Sep 17 00:00:00 2001 From: Pushkar Chawda Date: Fri, 27 Oct 2023 22:09:46 +0000 Subject: [PATCH 6/6] Moved FaultException import to top. --- awslambdaric/lambda_runtime_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/awslambdaric/lambda_runtime_client.py b/awslambdaric/lambda_runtime_client.py index b97e7ec..91ebd4c 100644 --- a/awslambdaric/lambda_runtime_client.py +++ b/awslambdaric/lambda_runtime_client.py @@ -4,6 +4,7 @@ import sys from awslambdaric import __version__ +from .lambda_runtime_exception import FaultException def _user_agent(): @@ -72,16 +73,15 @@ def wait_next_invocation(self): # Calling runtime_client.next() from a separate thread unblocks the main thread, # which can then process signals. if self.use_thread_for_polling_next: - from concurrent.futures import ThreadPoolExecutor - from .lambda_runtime_exception import FaultException - try: + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(runtime_client.next) response_body, headers = future.result() except Exception as e: - raise FaultExceptions( - FaultExceptions.LAMBDA_RUNTIME_CLIENT_ERROR, + raise FaultException( + FaultException.LAMBDA_RUNTIME_CLIENT_ERROR, "LAMBDA_RUNTIME Failed to get next invocation: {}".format(str(e)), None, )