From 65e6bc5c3fc8343278acc09896f5d98a36c8a970 Mon Sep 17 00:00:00 2001 From: hallvictoria Date: Mon, 13 May 2024 16:32:10 -0500 Subject: [PATCH 1/2] support for aio sdk types --- azure_functions_worker/bindings/meta.py | 8 ++-- azure_functions_worker/dispatcher.py | 2 +- .../function_app.py | 38 +++++++++++++++++++ .../test_deferred_bindings_blob_functions.py | 14 +++++++ 4 files changed, 57 insertions(+), 5 deletions(-) diff --git a/azure_functions_worker/bindings/meta.py b/azure_functions_worker/bindings/meta.py index c6148344d..272ce412e 100644 --- a/azure_functions_worker/bindings/meta.py +++ b/azure_functions_worker/bindings/meta.py @@ -150,7 +150,7 @@ def has_implicit_output(bind_name: str) -> bool: return getattr(binding, 'has_implicit_output', lambda: False)() -def from_incoming_proto( +async def from_incoming_proto( binding: str, pb: protos.ParameterBinding, *, pytype: typing.Optional[type], @@ -180,7 +180,7 @@ def from_incoming_proto( try: # if the binding is an sdk type binding if is_deferred_binding: - return deferred_bindings_decode(binding=binding, + return await deferred_bindings_decode(binding=binding, pb=pb, pytype=pytype, datum=datum, @@ -277,7 +277,7 @@ def to_outgoing_param_binding(binding: str, obj: typing.Any, *, data=rpc_val) -def deferred_bindings_decode(binding: typing.Any, +async def deferred_bindings_decode(binding: typing.Any, pb: protos.ParameterBinding, *, pytype: typing.Optional[type], datum: typing.Any, @@ -298,7 +298,7 @@ def deferred_bindings_decode(binding: typing.Any, pytype, datum.value.content)) else: - deferred_binding_type = binding.decode(datum, + deferred_binding_type = await binding.decode(datum, trigger_metadata=metadata, pytype=pytype) deferred_bindings_cache[(pb.name, diff --git a/azure_functions_worker/dispatcher.py b/azure_functions_worker/dispatcher.py index 902e9d86e..e2192ac8c 100644 --- a/azure_functions_worker/dispatcher.py +++ b/azure_functions_worker/dispatcher.py @@ -562,7 +562,7 @@ async def _handle__invocation_request(self, request): else: trigger_metadata = None - args[pb.name] = bindings.from_incoming_proto( + args[pb.name] = await bindings.from_incoming_proto( pb_type_info.binding_name, pb, trigger_metadata=trigger_metadata, diff --git a/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py b/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py index 4df62c480..165bbcdc7 100644 --- a/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py +++ b/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py @@ -4,6 +4,7 @@ import azure.functions as func import azurefunctions.extensions.bindings.blob as blob +import azurefunctions.extensions.bindings.blob.aio as aioblob app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) @@ -257,3 +258,40 @@ def put_blob_bytes(req: func.HttpRequest, file: func.Out[bytes]) -> str: def blob_cache(req: func.HttpRequest, client: blob.BlobClient) -> str: return client.download_blob(encoding='utf-8').readall() + + +@app.function_name(name="aio_blob_client") +@app.blob_input(arg_name="client", + path="python-worker-tests/test-blobclient-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="aio_blob_client") +async def aio_blob_client(req: func.HttpRequest, + client: aioblob.BlobClient) -> str: + stream = await client.download_blob() + data = await stream.readall() + return str(data) + + +@app.function_name(name="aio_container_client") +@app.blob_input(arg_name="client", + path="python-worker-tests/test-containerclient-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="aio_container_client") +async def aio_container_client(req: func.HttpRequest, + client: aioblob.ContainerClient) -> str: + stream = await client.download_blob("test-containerclient-triggered.txt", + encoding='utf-8') + data = await stream.readall() + return str(data) + + +@app.function_name(name="aio_ssd") +@app.blob_input(arg_name="stream", + path="python-worker-tests/test-ssd-triggered.txt", + connection="AzureWebJobsStorage") +@app.route(route="aio_ssd") +async def aio_ssd(req: func.HttpRequest, + stream: aioblob.StorageStreamDownloader) -> str: + file = await stream.readall() + decoded = file.decode('utf-8') + return str(decoded) diff --git a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py index 54e36f9f1..6403d6acc 100644 --- a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py +++ b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py @@ -179,3 +179,17 @@ def test_caching(self): r = self.webhost.request('GET', 'blob_cache') self.assertEqual(r.status_code, 200) + + def test_aio_clients(self): + r = self.webhost.request('GET', 'aio_blob_client') + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, ('b\'{"name": "test-blobclient-trigger.txt", "length": 9, "content": ' + '"DummyData"}\'')) + + r = self.webhost.request('GET', 'aio_container_client') + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, '{\r\n "name": "python-worker-tests",\r\n "content": "DummyData"\r\n}') + + r = self.webhost.request('GET', 'aio_ssd') + self.assertEqual(r.status_code, 200) + self.assertEqual(r.text, '{\r\n "content": "DummyData"\r\n}') From 26395f6b20c6edb3245522a2d5218fe5c79663c4 Mon Sep 17 00:00:00 2001 From: Victoria Hall Date: Wed, 29 May 2024 10:37:40 -0500 Subject: [PATCH 2/2] lint --- azure_functions_worker/bindings/meta.py | 20 +++++++++---------- .../function_app.py | 2 +- .../test_deferred_bindings_blob_functions.py | 10 +++++++--- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/azure_functions_worker/bindings/meta.py b/azure_functions_worker/bindings/meta.py index 272ce412e..a0e86f390 100644 --- a/azure_functions_worker/bindings/meta.py +++ b/azure_functions_worker/bindings/meta.py @@ -181,10 +181,10 @@ async def from_incoming_proto( # if the binding is an sdk type binding if is_deferred_binding: return await deferred_bindings_decode(binding=binding, - pb=pb, - pytype=pytype, - datum=datum, - metadata=metadata) + pb=pb, + pytype=pytype, + datum=datum, + metadata=metadata) return binding.decode(datum, trigger_metadata=metadata) except NotImplementedError: # Binding does not support the data. @@ -278,10 +278,10 @@ def to_outgoing_param_binding(binding: str, obj: typing.Any, *, async def deferred_bindings_decode(binding: typing.Any, - pb: protos.ParameterBinding, *, - pytype: typing.Optional[type], - datum: typing.Any, - metadata: typing.Any): + pb: protos.ParameterBinding, *, + pytype: typing.Optional[type], + datum: typing.Any, + metadata: typing.Any): """ This cache holds deferred binding types (ie. BlobClient, ContainerClient) That have already been created, so that the worker can reuse the @@ -299,8 +299,8 @@ async def deferred_bindings_decode(binding: typing.Any, datum.value.content)) else: deferred_binding_type = await binding.decode(datum, - trigger_metadata=metadata, - pytype=pytype) + trigger_metadata=metadata, + pytype=pytype) deferred_bindings_cache[(pb.name, pytype, datum.value.content)] = deferred_binding_type diff --git a/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py b/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py index b34178e50..05ab70cd1 100644 --- a/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py +++ b/tests/extension_tests/deferred_bindings_tests/deferred_bindings_blob_functions/function_app.py @@ -296,7 +296,7 @@ async def aio_ssd(req: func.HttpRequest, decoded = file.decode('utf-8') return str(decoded) - + @app.function_name(name="invalid_connection_info") @app.blob_input(arg_name="client", path="python-worker-tests/test-blobclient-triggered.txt", diff --git a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py index 035491b31..7fd750be8 100644 --- a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py +++ b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings_blob_functions.py @@ -183,12 +183,16 @@ def test_caching(self): def test_aio_clients(self): r = self.webhost.request('GET', 'aio_blob_client') self.assertEqual(r.status_code, 200) - self.assertEqual(r.text, ('b\'{"name": "test-blobclient-trigger.txt", "length": 9, "content": ' - '"DummyData"}\'')) + self.assertEqual(r.text, + ('b\'{"name": "test-blobclient-trigger.txt",' + ' "length": 9, "content": ' + '"DummyData"}\'')) r = self.webhost.request('GET', 'aio_container_client') self.assertEqual(r.status_code, 200) - self.assertEqual(r.text, '{\r\n "name": "python-worker-tests",\r\n "content": "DummyData"\r\n}') + self.assertEqual(r.text, + '{\r\n "name": "python-worker-tests",\r\n ' + '"content": "DummyData"\r\n}') r = self.webhost.request('GET', 'aio_ssd') self.assertEqual(r.status_code, 200)