Skip to content

fix: test fixes for EventHub #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional
import uamqp

from azure.eventhub import EventData
from azure.eventhub import EventData as EventDataSDK
from azurefunctions.extensions.base import Datum, SdkType


Expand All @@ -23,30 +23,30 @@ def __init__(self, *, data: Datum) -> None:
self._content_type = data.content_type
self._content = data.content
self.decoded_message = self._get_eventhub_content(self._content)

def _get_eventhub_content(self, content):
"""
When receiving the EventBindingData, the content field is in the form of bytes.
This content must be decoded in order to construct an EventData object from the azure.eventhub SDK.
The .NET worker uses the Azure.Core.Amqp library to do this:
This content must be decoded in order to construct an EventData object from the
azure.eventhub SDK. The .NET worker uses the Azure.Core.Amqp library to do this:
https://github.com/Azure/azure-functions-dotnet-worker/blob/main/extensions/Worker.Extensions.EventHubs/src/EventDataConverter.cs#L45
"""
if content:
try:
return uamqp.Message().decode_from_bytes(content)
except Exception as e:
raise ValueError(f"Failed to decode EventHub content: {e}") from e

return None

def get_sdk_type(self) -> Optional[EventData]:
def get_sdk_type(self) -> Optional[EventDataSDK]:
"""
When receiving an EventHub message, the content portion after being decoded
is used in the constructor to create an EventData object. This will contain
fields such as message, enqueued_time, and more.
"""
# https://github.com/Azure/azure-sdk-for-python/issues/39711
if self.decoded_message:
return EventData._from_message(self.decoded_message)
return EventDataSDK._from_message(self.decoded_message)

return None
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ class EventDataConverter(
def check_input_type_annotation(cls, pytype: type) -> bool:
if pytype is None:
return False

# The annotation is a class/type (not an object) - not iterable
if (isinstance(pytype, type)
and issubclass(pytype, EventData)):
return True

# An iterable who only has one inner type and is a subclass of SdkType
return cls._is_iterable_supported_type(pytype)

@classmethod
def _is_iterable_supported_type(cls, annotation: type) -> bool:
# Check base type from type hint. Ex: List from List[SdkType]
Expand All @@ -38,37 +38,41 @@ def _is_iterable_supported_type(cls, annotation: type) -> bool:
inner_types = get_args(annotation)
if inner_types is None or len(inner_types) != 1:
return False

inner_type = inner_types[0]

return (isinstance(inner_type, type)
return (isinstance(inner_type, type)
and issubclass(inner_type, EventData))

@classmethod
def decode(cls, data: Datum, *, trigger_metadata, pytype) -> Optional[Any]:
"""
EventHub allows for batches to be sent. This means the cardinality can be one or many
EventHub allows for batches. This means the cardinality can be one or many.
When the cardinality is one:
- The data is of type "model_binding_data" - each event is an independent function invocation
- The data is of type "model_binding_data" - each event is an independent
function invocation
When the cardinality is many:
- The data is of type "collection_model_binding_data" - all events are sent in a single function invocation
- The data is of type "collection_model_binding_data" - all events are sent
in a single function invocation
- collection_model_binding_data has 1 or more model_binding_data objects
"""
if data is None or data.type is None or pytype != EventData:
if data is None or data.type is None:
return None

# Process each model_binding_data in the collection
if data.type == "collection_model_binding_data":
try:
return [EventData(data=mbd).get_sdk_type() for mbd in data.value.model_binding_data]
return [EventData(data=mbd).get_sdk_type()
for mbd in data.value.model_binding_data]
except Exception as e:
raise ValueError("Failed to decode incoming EventHub batch: " + repr(e)) from e

raise ValueError("Failed to decode incoming EventHub batch: "
+ repr(e)) from e

# Get model_binding_data fields directly
if data.type == "model_binding_data":
return EventData(data=data.value).get_sdk_type()

raise ValueError(
"Unexpected type of data received for the 'eventhub' binding: "
+ repr(data.type)
)
)
7 changes: 5 additions & 2 deletions azurefunctions-extensions-bindings-eventhub/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ classifiers= [
]
dependencies = [
'azurefunctions-extensions-base',
'azure-eventhub~=5.13.0'
'azure-eventhub~=5.13.0',
'uamqp~=1.0'
]

[project.optional-dependencies]
Expand All @@ -35,7 +36,9 @@ dev = [
'pytest-cov',
'coverage',
'pytest-instafail',
'pre-commit'
'pre-commit',
'mypy',
'flake8'
]

[tool.setuptools.dynamic]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import pathlib
import subprocess
import sys
import unittest

ROOT_PATH = pathlib.Path(__file__).parent.parent.parent


class TestCodeQuality(unittest.TestCase):
def test_mypy(self):
try:
import mypy # NoQA
except ImportError as e:
raise unittest.SkipTest('mypy module is missing') from e

try:
subprocess.run(
[sys.executable, '-m', 'mypy', '-m',
'azurefunctions-extensions-bindings-eventhub'],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(ROOT_PATH))
except subprocess.CalledProcessError as ex:
output = ex.output.decode()
raise AssertionError(
f'mypy validation failed:\n{output}') from None

def test_flake8(self):
try:
import flake8 # NoQA
except ImportError as e:
raise unittest.SkipTest('flake8 module is missing') from e

config_path = ROOT_PATH / '.flake8'
if not config_path.exists():
raise unittest.SkipTest('could not locate the .flake8 file')

try:
subprocess.run(
[sys.executable, '-m', 'flake8',
'azurefunctions-extensions-bindings-eventhub',
'--config', str(config_path)],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(ROOT_PATH))
except subprocess.CalledProcessError as ex:
output = ex.output.decode()
raise AssertionError(
f'flake8 validation failed:\n{output}') from None
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import sys
import unittest
from typing import List, Optional

Expand All @@ -10,7 +9,8 @@

from azurefunctions.extensions.bindings.eventhub import EventData, EventDataConverter

EVENTHUB_SAMPLE_CONTENT = b"\x00Sr\xc1\x8e\x08\xa3\x1bx-opt-sequence-number-epochT\xff\xa3\x15x-opt-sequence-numberU\x04\xa3\x0cx-opt-offset\x81\x00\x00\x00\x01\x00\x00\x010\xa3\x13x-opt-enqueued-time\x00\xa3\x1dcom.microsoft:datetime-offset\x81\x08\xddW\x05\xc3Q\xcf\x10\x00St\xc1I\x02\xa1\rDiagnostic-Id\xa1700-bdc3fde4889b4e907e0c9dcb46ff8d92-21f637af293ef13b-00\x00Su\xa0\x08message1"
EVENTHUB_SAMPLE_CONTENT = b"\x00Sr\xc1\x8e\x08\xa3\x1bx-opt-sequence-number-epochT\xff\xa3\x15x-opt-sequence-numberU\x04\xa3\x0cx-opt-offset\x81\x00\x00\x00\x01\x00\x00\x010\xa3\x13x-opt-enqueued-time\x00\xa3\x1dcom.microsoft:datetime-offset\x81\x08\xddW\x05\xc3Q\xcf\x10\x00St\xc1I\x02\xa1\rDiagnostic-Id\xa1700-bdc3fde4889b4e907e0c9dcb46ff8d92-21f637af293ef13b-00\x00Su\xa0\x08message1" # noqa: E501


# Mock classes for testing
class MockMBD:
Expand All @@ -27,8 +27,8 @@ def data_type(self) -> Optional[int]:
@property
def direction(self) -> int:
return self._direction.value


class MockCMBD:
def __init__(self, model_binding_data_list: List[MockMBD]):
self.model_binding_data = model_binding_data_list
Expand Down Expand Up @@ -83,18 +83,19 @@ def test_input_empty_mbd(self):
self.assertIsNone(result)

def test_input_empty_cmbd(self):
datum: Datum = Datum(value={}, type="collection_model_binding_data")
datum: Datum = Datum(value=MockCMBD([None]),
type="collection_model_binding_data")
result: EventData = EventDataConverter.decode(
data=datum, trigger_metadata=None, pytype=EventData
)
self.assertIsNone(result)
self.assertEqual(result, [None])

def test_input_populated_mbd(self):
sample_mbd = MockMBD(
version="1.0",
source="AzureEventHubsEventData",
content_type="application/octet-stream",
content = EVENTHUB_SAMPLE_CONTENT
content=EVENTHUB_SAMPLE_CONTENT
)

datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
Expand All @@ -115,33 +116,34 @@ def test_input_populated_cmbd(self):
version="1.0",
source="AzureEventHubsEventData",
content_type="application/octet-stream",
content = EVENTHUB_SAMPLE_CONTENT
content=EVENTHUB_SAMPLE_CONTENT
)

datum: Datum = Datum(value=MockCMBD([sample_mbd, sample_mbd]), type="collection_model_binding_data")
datum: Datum = Datum(value=MockCMBD([sample_mbd, sample_mbd]),
type="collection_model_binding_data")
result: EventData = EventDataConverter.decode(
data=datum, trigger_metadata=None, pytype=EventData
)

self.assertIsNotNone(result)
self.assertIsInstance(result, EventDataSdk)

sdk_result = EventData(data=datum.value).get_sdk_type()

self.assertIsNotNone(sdk_result)
self.assertIsInstance(sdk_result, EventDataSdk)

def test_input_invalid_pytype(self):
sample_mbd = MockMBD(
version="1.0",
source="AzureEventHubsEventData",
content_type="application/octet-stream",
content = EVENTHUB_SAMPLE_CONTENT
)

datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: EventData = EventDataConverter.decode(
data=datum, trigger_metadata=None, pytype="str"
for event_data in result:
self.assertIsInstance(event_data, EventDataSdk)

sdk_results = []
for mbd in datum.value.model_binding_data:
sdk_results.append(EventData(data=mbd).get_sdk_type())

self.assertNotEqual(sdk_results, [None, None])
for event_data in sdk_results:
self.assertIsInstance(event_data, EventDataSdk)

def test_input_invalid_datum_type(self):
with self.assertRaises(ValueError) as e:
datum: Datum = Datum(value="hello", type="str")
_: EventData = EventDataConverter.decode(
data=datum, trigger_metadata=None, pytype=""
)
self.assertEqual(
e.exception.args[0],
"Unexpected type of data received for the 'eventhub' binding: 'str'",
)

self.assertIsNone(result)
3 changes: 3 additions & 0 deletions eng/templates/jobs/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ jobs:
blob_extension:
EXTENSION_DIRECTORY: 'azurefunctions-extensions-bindings-blob'
EXTENSION_NAME: 'Blob'
eventhub_extension:
EXTENSION_DIRECTORY: 'azurefunctions-extensions-bindings-eventhub'
EXTENSION_NAME: 'EventHub'
fastapi_extension:
EXTENSION_DIRECTORY: 'azurefunctions-extensions-http-fastapi'
EXTENSION_NAME: 'Http'
Expand Down
3 changes: 3 additions & 0 deletions eng/templates/official/jobs/build-artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ jobs:
blob_extension:
EXTENSION_DIRECTORY: 'azurefunctions-extensions-bindings-blob'
EXTENSION_NAME: 'Blob'
eventhub_extension:
EXTENSION_DIRECTORY: 'azurefunctions-extensions-bindings-eventhub'
EXTENSION_NAME: 'EventHub'
fastapi_extension:
EXTENSION_DIRECTORY: 'azurefunctions-extensions-http-fastapi'
EXTENSION_NAME: 'FastAPI'
Expand Down
2 changes: 1 addition & 1 deletion eng/templates/official/jobs/eventhub-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
python -m pip install -U -e .[dev]
displayName: 'Install dependencies'
- bash: |
python -m pytest -q --instafail azurefunctions-extensions-bindings-eventhub/tests/ --ignore='azurefunctions-extensions-base', --ignore='azurefunctions-extensions-http-fastapi', --ignore='azurefunctions-extensions-bindings-blob'
python -m pytest -q --instafail azurefunctions-extensions-bindings-eventhub/tests/
env:
AzureWebJobsStorage: $(AzureWebJobsStorage)
displayName: "Running EventHub $(PYTHON_VERSION) Python Extension Tests"