diff --git a/.flake8 b/.flake8 index 94a2f1926..142c1bea0 100644 --- a/.flake8 +++ b/.flake8 @@ -6,7 +6,7 @@ ignore = W503,E402,E731 exclude = .git, __pycache__, build, dist, .eggs, .github, .local, docs/, - Samples, azure_functions_worker/protos/, + Samples, azure_functions_worker/protos/, proxy_worker/protos/, azure_functions_worker/_thirdparty/typing_inspect.py, tests/unittests/test_typing_inspect.py, tests/unittests/broken_functions/syntax_error/main.py, diff --git a/azure_functions_worker/protos/.gitignore b/azure_functions_worker/protos/.gitignore index f43e6c214..49d7060ef 100644 --- a/azure_functions_worker/protos/.gitignore +++ b/azure_functions_worker/protos/.gitignore @@ -1,3 +1,2 @@ -/_src *_pb2.py *_pb2_grpc.py diff --git a/eng/templates/jobs/ci-emulator-tests.yml b/eng/templates/jobs/ci-emulator-tests.yml index abc84f394..968585017 100644 --- a/eng/templates/jobs/ci-emulator-tests.yml +++ b/eng/templates/jobs/ci-emulator-tests.yml @@ -21,6 +21,8 @@ jobs: PYTHON_VERSION: '3.11' Python312: PYTHON_VERSION: '3.12' + Python313: + PYTHON_VERSION: '3.13' steps: - task: UsePythonVersion@0 inputs: diff --git a/eng/templates/jobs/ci-unit-tests.yml b/eng/templates/jobs/ci-unit-tests.yml index 5ff54888c..11acf05cd 100644 --- a/eng/templates/jobs/ci-unit-tests.yml +++ b/eng/templates/jobs/ci-unit-tests.yml @@ -16,7 +16,8 @@ jobs: PYTHON_VERSION: '3.11' Python312: PYTHON_VERSION: '3.12' - + Python313: + PYTHON_VERSION: '3.13' steps: - task: UsePythonVersion@0 inputs: @@ -34,8 +35,24 @@ jobs: displayName: 'Install dependencies' condition: and(eq(variables.isSdkRelease, false), eq(variables.isExtensionsRelease, false), eq(variables['USETESTPYTHONSDK'], false), eq(variables['USETESTPYTHONEXTENSIONS'], false)) - bash: | - python -m pytest -q -n auto --dist loadfile --reruns 4 --instafail --cov=./azure_functions_worker --cov-report xml --cov-branch tests/unittests + PY_VER="$(PYTHON_VERSION)" + echo "Python version: $PY_VER" + + # Extract minor version + PY_MINOR="${PY_VER#*.}" + + if [ "$PY_MINOR" -ge 13 ]; then + echo "Running proxy_worker tests (Python >= 3.13)..." + python -m pytest -q -n auto --dist loadfile --reruns 4 --instafail \ + --cov=./proxy_worker --cov-report xml --cov-branch tests/unittest_proxy + else + echo "Running unittests (Python < 3.13)..." + python -m pytest -q -n auto --dist loadfile --reruns 4 --instafail \ + --cov=./azure_functions_worker --cov-report xml --cov-branch tests/unittests + fi displayName: "Running $(PYTHON_VERSION) Unit Tests" # Skip running tests for SDK and Extensions release branches. Public pipeline doesn't have permissions to download artifact. condition: and(eq(variables.isSdkRelease, false), eq(variables.isExtensionsRelease, false), eq(variables['USETESTPYTHONSDK'], false), eq(variables['USETESTPYTHONEXTENSIONS'], false)) + env: + PYTHON_VERSION: $(PYTHON_VERSION) \ No newline at end of file diff --git a/eng/templates/official/jobs/build-artifacts.yml b/eng/templates/official/jobs/build-artifacts.yml index 631115b4c..bc0c5de1a 100644 --- a/eng/templates/official/jobs/build-artifacts.yml +++ b/eng/templates/official/jobs/build-artifacts.yml @@ -24,6 +24,9 @@ jobs: Python312V4: pythonVersion: '3.12' workerPath: 'python/prodV4/worker.py' + Python313V4: + pythonVersion: '3.13' + workerPath: 'python/proxyV4/worker.py' templateContext: outputParentDirectory: $(Build.ArtifactStagingDirectory) outputs: @@ -62,6 +65,9 @@ jobs: Python312V4: pythonVersion: '3.12' workerPath: 'python/prodV4/worker.py' + Python313V4: + pythonVersion: '3.13' + workerPath: 'python/proxyV4/worker.py' templateContext: outputParentDirectory: $(Build.ArtifactStagingDirectory) outputs: @@ -100,6 +106,9 @@ jobs: Python312V4: pythonVersion: '3.12' workerPath: 'python/prodV4/worker.py' + Python313V4: + pythonVersion: '3.13' + workerPath: 'python/proxyV4/worker.py' templateContext: outputParentDirectory: $(Build.ArtifactStagingDirectory) outputs: @@ -137,6 +146,9 @@ jobs: Python312V4: pythonVersion: '3.12' workerPath: 'python/prodV4/worker.py' + Python313V4: + pythonVersion: '3.13' + workerPath: 'python/proxyV4/worker.py' templateContext: outputParentDirectory: $(Build.ArtifactStagingDirectory) outputs: @@ -168,6 +180,9 @@ jobs: Python312V4: pythonVersion: '3.12' workerPath: 'python/prodV4/worker.py' + Python313V4: + pythonVersion: '3.13' + workerPath: 'python/proxyV4/worker.py' templateContext: outputParentDirectory: $(Build.ArtifactStagingDirectory) outputs: diff --git a/eng/templates/official/jobs/ci-e2e-tests.yml b/eng/templates/official/jobs/ci-e2e-tests.yml index edd898f65..1a45e06ce 100644 --- a/eng/templates/official/jobs/ci-e2e-tests.yml +++ b/eng/templates/official/jobs/ci-e2e-tests.yml @@ -63,6 +63,15 @@ jobs: SQL_CONNECTION: $(LinuxSqlConnectionString312) EVENTGRID_URI: $(LinuxEventGridTopicUriString312) EVENTGRID_CONNECTION: $(LinuxEventGridConnectionKeyString312) + Python313: + PYTHON_VERSION: '3.13' + STORAGE_CONNECTION: $(LinuxStorageConnectionString312) + COSMOSDB_CONNECTION: $(LinuxCosmosDBConnectionString312) + EVENTHUB_CONNECTION: $(LinuxEventHubConnectionString312) + SERVICEBUS_CONNECTION: $(LinuxServiceBusConnectionString312) + SQL_CONNECTION: $(LinuxSqlConnectionString312) + EVENTGRID_URI: $(LinuxEventGridTopicUriString312) + EVENTGRID_CONNECTION: $(LinuxEventGridConnectionKeyString312) steps: - task: UsePythonVersion@0 inputs: @@ -120,7 +129,14 @@ jobs: Write-Host "pipelineVarSet: $pipelineVarSet" $branch = "$(Build.SourceBranch)" Write-Host "Branch: $branch" - if($branch.StartsWith("refs/heads/sdk/") -or $pipelineVarSet -eq "true") + + $PY_VER = "$(PYTHON_VERSION)" + Write-Host "Python version: $PY_VER" + # Extract minor version as integers + $PY_MINO = "${PY_VER#*.}" + Write-Host "Branch: PY_MINOR" + + if($branch.StartsWith("refs/heads/sdk/") -or $pipelineVarSet -eq "true" -or $PY_MINOR -ge 13 ) { Write-Host "##vso[task.setvariable variable=skipTest;]true" } @@ -129,7 +145,6 @@ jobs: Write-Host "##vso[task.setvariable variable=skipTest;]false" } displayName: 'Set skipTest variable' - condition: or(eq(variables.isSdkRelease, true), eq(variables['USETESTPYTHONSDK'], true)) - powershell: | Write-Host "skipTest: $(skipTest)" displayName: 'Display skipTest variable' @@ -145,4 +160,5 @@ jobs: AzureWebJobsEventGridTopicUri: $(EVENTGRID_URI) AzureWebJobsEventGridConnectionKey: $(EVENTGRID_CONNECTION) skipTest: $(skipTest) + PYAZURE_WEBHOST_DEBUG: true displayName: "Running $(PYTHON_VERSION) Python E2E Tests" diff --git a/pack/Microsoft.Azure.Functions.V4.PythonWorker.nuspec b/pack/Microsoft.Azure.Functions.V4.PythonWorker.nuspec index b3ce47d0c..98b0c832a 100644 --- a/pack/Microsoft.Azure.Functions.V4.PythonWorker.nuspec +++ b/pack/Microsoft.Azure.Functions.V4.PythonWorker.nuspec @@ -38,6 +38,11 @@ + + + + + diff --git a/pack/scripts/mac_arm64_deps.sh b/pack/scripts/mac_arm64_deps.sh index 2d70bafad..9c08cce46 100644 --- a/pack/scripts/mac_arm64_deps.sh +++ b/pack/scripts/mac_arm64_deps.sh @@ -13,4 +13,10 @@ python -m invoke -c test_setup build-protos cd .. cp .artifactignore "$BUILD_SOURCESDIRECTORY/deps" -cp -r azure_functions_worker/protos "$BUILD_SOURCESDIRECTORY/deps/azure_functions_worker" \ No newline at end of file + +version_minor=$(echo $1 | cut -d '.' -f 2) +if [[ $version_minor -lt 13 ]]; then + cp -r azure_functions_worker/protos "$BUILD_SOURCESDIRECTORY/deps/azure_functions_worker" +else + cp -r proxy_worker/protos "$BUILD_SOURCESDIRECTORY/deps/proxy_worker" +fi \ No newline at end of file diff --git a/pack/scripts/nix_deps.sh b/pack/scripts/nix_deps.sh index 2d70bafad..9c08cce46 100644 --- a/pack/scripts/nix_deps.sh +++ b/pack/scripts/nix_deps.sh @@ -13,4 +13,10 @@ python -m invoke -c test_setup build-protos cd .. cp .artifactignore "$BUILD_SOURCESDIRECTORY/deps" -cp -r azure_functions_worker/protos "$BUILD_SOURCESDIRECTORY/deps/azure_functions_worker" \ No newline at end of file + +version_minor=$(echo $1 | cut -d '.' -f 2) +if [[ $version_minor -lt 13 ]]; then + cp -r azure_functions_worker/protos "$BUILD_SOURCESDIRECTORY/deps/azure_functions_worker" +else + cp -r proxy_worker/protos "$BUILD_SOURCESDIRECTORY/deps/proxy_worker" +fi \ No newline at end of file diff --git a/pack/scripts/win_deps.ps1 b/pack/scripts/win_deps.ps1 index a7be372e7..b4c95203d 100644 --- a/pack/scripts/win_deps.ps1 +++ b/pack/scripts/win_deps.ps1 @@ -1,3 +1,9 @@ +param ( + [string]$pythonVersion +) +$versionParts = $pythonVersion -split '\.' # Splitting by dot +$versionMinor = [int]$versionParts[1] + python -m venv .env .env\Scripts\Activate.ps1 python -m pip install --upgrade pip @@ -5,7 +11,6 @@ python -m pip install --upgrade pip python -m pip install . $depsPath = Join-Path -Path $env:BUILD_SOURCESDIRECTORY -ChildPath "deps" -$protosPath = Join-Path -Path $depsPath -ChildPath "azure_functions_worker/protos" python -m pip install . azure-functions --no-compile --target $depsPath.ToString() @@ -15,4 +20,11 @@ python -m invoke -c test_setup build-protos cd .. Copy-Item -Path ".artifactignore" -Destination $depsPath.ToString() -Copy-Item -Path "azure_functions_worker/protos/*" -Destination $protosPath.ToString() -Recurse -Force + +if ($versionMinor -lt 13) { + $protosPath = Join-Path -Path $depsPath -ChildPath "azure_functions_worker/protos" + Copy-Item -Path "azure_functions_worker/protos/*" -Destination $protosPath.ToString() -Recurse -Force +} else { + $protosPath = Join-Path -Path $depsPath -ChildPath "proxy_worker/protos" + Copy-Item -Path "proxy_worker/protos/*" -Destination $protosPath.ToString() -Recurse -Force +} diff --git a/pack/templates/macos_64_env_gen.yml b/pack/templates/macos_64_env_gen.yml index 90a3578d7..75f33bc5f 100644 --- a/pack/templates/macos_64_env_gen.yml +++ b/pack/templates/macos_64_env_gen.yml @@ -8,10 +8,19 @@ steps: inputs: versionSpec: ${{ parameters.pythonVersion }} addToPath: true +- bash: | + major=$(echo $(pythonVersion) | cut -d. -f1) + minor=$(echo $(pythonVersion) | cut -d. -f2) + echo "##vso[task.setvariable variable=pythonMajor]$major" + echo "##vso[task.setvariable variable=pythonMinor]$minor" + echo $pythonMinor + displayName: 'Parse pythonVersion' - task: ShellScript@2 inputs: disableAutoCwd: true scriptPath: 'pack/scripts/mac_arm64_deps.sh' + args: '${{ parameters.pythonVersion }}' + displayName: 'Install Dependencies' - bash: | pip install pip-audit pip-audit -r requirements.txt @@ -41,4 +50,30 @@ steps: !pkg_resources/** !*.dist-info/** !werkzeug/debug/shared/debugger.js + !proxy_worker/** + targetFolder: '$(Build.ArtifactStagingDirectory)' + condition: in(variables['pythonMinor'], '7', '8', '9', '10', '11', '12') + displayName: 'Copy azure_functions_worker files' +- task: CopyFiles@2 + inputs: + sourceFolder: '$(Build.SourcesDirectory)/deps' + contents: | + ** + !grpc_tools/**/* + !grpcio_tools*/* + !build/** + !docs/** + !pack/** + !python/** + !tests/** + !setuptools*/** + !_distutils_hack/** + !distutils-precedence.pth + !pkg_resources/** + !*.dist-info/** + !werkzeug/debug/shared/debugger.js + !azure_functions_worker/** + !dateutil/** targetFolder: '$(Build.ArtifactStagingDirectory)' + condition: in(variables['pythonMinor'], '13') + displayName: 'Copy proxy_worker files' diff --git a/pack/templates/nix_env_gen.yml b/pack/templates/nix_env_gen.yml index ae3cf4330..db3820153 100644 --- a/pack/templates/nix_env_gen.yml +++ b/pack/templates/nix_env_gen.yml @@ -8,10 +8,19 @@ steps: inputs: versionSpec: ${{ parameters.pythonVersion }} addToPath: true +- bash: | + major=$(echo $(pythonVersion) | cut -d. -f1) + minor=$(echo $(pythonVersion) | cut -d. -f2) + echo "##vso[task.setvariable variable=pythonMajor]$major" + echo "##vso[task.setvariable variable=pythonMinor]$minor" + echo $pythonMinor + displayName: 'Parse pythonVersion' - task: ShellScript@2 inputs: disableAutoCwd: true scriptPath: 'pack/scripts/nix_deps.sh' + args: '${{ parameters.pythonVersion }}' + displayName: 'Install Dependencies' - bash: | pip install pip-audit pip-audit -r requirements.txt @@ -41,4 +50,30 @@ steps: !pkg_resources/** !*.dist-info/** !werkzeug/debug/shared/debugger.js + !proxy_worker/** + targetFolder: '$(Build.ArtifactStagingDirectory)' + condition: in(variables['pythonMinor'], '7', '8', '9', '10', '11', '12') + displayName: 'Copy azure_functions_worker files' +- task: CopyFiles@2 + inputs: + sourceFolder: '$(Build.SourcesDirectory)/deps' + contents: | + ** + !grpc_tools/**/* + !grpcio_tools*/* + !build/** + !docs/** + !pack/** + !python/** + !tests/** + !setuptools*/** + !_distutils_hack/** + !distutils-precedence.pth + !pkg_resources/** + !*.dist-info/** + !werkzeug/debug/shared/debugger.js + !dateutil/** + !azure_functions_worker/** targetFolder: '$(Build.ArtifactStagingDirectory)' + condition: in(variables['pythonMinor'], '13') + displayName: 'Copy proxy_worker files' diff --git a/pack/templates/win_env_gen.yml b/pack/templates/win_env_gen.yml index 2eee3411a..b85bf9f89 100644 --- a/pack/templates/win_env_gen.yml +++ b/pack/templates/win_env_gen.yml @@ -9,9 +9,17 @@ steps: versionSpec: ${{ parameters.pythonVersion }} architecture: ${{ parameters.architecture }} addToPath: true +- bash: | + major=$(echo $(pythonVersion) | cut -d. -f1) + minor=$(echo $(pythonVersion) | cut -d. -f2) + echo "##vso[task.setvariable variable=pythonMajor]$major" + echo "##vso[task.setvariable variable=pythonMinor]$minor" + echo $pythonMinor + displayName: 'Parse pythonVersion' - task: PowerShell@2 inputs: filePath: 'pack\scripts\win_deps.ps1' + arguments: '${{ parameters.pythonVersion }}' - bash: | pip install pip-audit pip-audit -r requirements.txt @@ -41,4 +49,30 @@ steps: !pkg_resources\** !*.dist-info\** !werkzeug\debug\shared\debugger.js + !proxy_worker\** + targetFolder: '$(Build.ArtifactStagingDirectory)' + condition: in(variables['pythonMinor'], '7', '8', '9', '10', '11', '12') + displayName: 'Copy azure_functions_worker files' +- task: CopyFiles@2 + inputs: + sourceFolder: '$(Build.SourcesDirectory)\deps' + contents: | + ** + !grpc_tools\**\* + !grpcio_tools*\* + !build\** + !docs\** + !pack\** + !python\** + !tests\** + !setuptools*\** + !_distutils_hack\** + !distutils-precedence.pth + !pkg_resources\** + !*.dist-info\** + !werkzeug\debug\shared\debugger.js + !dateutil\** + !azure_functions_worker\** targetFolder: '$(Build.ArtifactStagingDirectory)' + condition: in(variables['pythonMinor'], '13') + displayName: 'Copy proxy_worker files' diff --git a/proxy_worker/__init__.py b/proxy_worker/__init__.py new file mode 100644 index 000000000..5b7f7a925 --- /dev/null +++ b/proxy_worker/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/proxy_worker/__main__.py b/proxy_worker/__main__.py new file mode 100644 index 000000000..5141dd60a --- /dev/null +++ b/proxy_worker/__main__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +from proxy_worker import start_worker + +if __name__ == '__main__': + start_worker.start() diff --git a/proxy_worker/dispatcher.py b/proxy_worker/dispatcher.py new file mode 100644 index 000000000..257b68b14 --- /dev/null +++ b/proxy_worker/dispatcher.py @@ -0,0 +1,521 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import concurrent.futures +import logging +import os +import queue +import sys +import threading +import traceback +import typing +from asyncio import AbstractEventLoop +from dataclasses import dataclass +from typing import Any, Optional + +import grpc +from proxy_worker import protos +from proxy_worker.logging import ( + CONSOLE_LOG_PREFIX, + disable_console_logging, + enable_console_logging, + error_logger, + is_system_log_category, + logger, +) +from proxy_worker.utils.common import ( + get_app_setting, + get_script_file_name, + is_envvar_true, +) +from proxy_worker.utils.constants import ( + PYTHON_ENABLE_DEBUG_LOGGING, + PYTHON_THREADPOOL_THREAD_COUNT, +) +from proxy_worker.version import VERSION + +from .utils.dependency import DependencyManager + +# Library worker import reloaded in init and reload request +_library_worker = None + + +class ContextEnabledTask(asyncio.Task): + AZURE_INVOCATION_ID = '__azure_function_invocation_id__' + + def __init__(self, coro, loop, context=None, **kwargs): + super().__init__(coro, loop=loop, context=context, **kwargs) + + current_task = asyncio.current_task(loop) + if current_task is not None: + invocation_id = getattr( + current_task, self.AZURE_INVOCATION_ID, None) + if invocation_id is not None: + self.set_azure_invocation_id(invocation_id) + + def set_azure_invocation_id(self, invocation_id: str) -> None: + setattr(self, self.AZURE_INVOCATION_ID, invocation_id) + + +_invocation_id_local = threading.local() + + +def get_current_invocation_id() -> Optional[Any]: + loop = asyncio._get_running_loop() + if loop is not None: + current_task = asyncio.current_task(loop) + if current_task is not None: + task_invocation_id = getattr(current_task, + ContextEnabledTask.AZURE_INVOCATION_ID, + None) + if task_invocation_id is not None: + return task_invocation_id + + return getattr(_invocation_id_local, 'invocation_id', None) + + +class AsyncLoggingHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + # Since we disable console log after gRPC channel is initiated, + # we should redirect all the messages into dispatcher. + + # When dispatcher receives an exception, it should switch back + # to console logging. However, it is possible that + # __current_dispatcher__ is set to None as there are still messages + # buffered in this handler, not calling the emit yet. + msg = self.format(record) + try: + Dispatcher.current.on_logging(record, msg) + except RuntimeError as runtime_error: + # This will cause 'Dispatcher not found' failure. + # Logging such of an issue will cause infinite loop of gRPC logging + # To mitigate, we should suppress the 2nd level error logging here + # and use print function to report exception instead. + print(f'{CONSOLE_LOG_PREFIX} ERROR: {str(runtime_error)}', + file=sys.stderr, flush=True) + + +@dataclass +class WorkerRequest: + name: str + request: str + properties: Optional[dict[str, typing.Any]] = None + + +class DispatcherMeta(type): + __current_dispatcher__: Optional["Dispatcher"] = None + + @property + def current(cls): + disp = cls.__current_dispatcher__ + if disp is None: + raise RuntimeError('no currently running Dispatcher is found') + return disp + + +class Dispatcher(metaclass=DispatcherMeta): + _GRPC_STOP_RESPONSE = object() + + def __init__(self, loop: AbstractEventLoop, host: str, port: int, + worker_id: str, request_id: str, + grpc_connect_timeout: float, + grpc_max_msg_len: int = -1) -> None: + self._loop = loop + self._host = host + self._port = port + self._request_id = request_id + self._worker_id = worker_id + self._grpc_connect_timeout: float = grpc_connect_timeout + self._grpc_max_msg_len: int = grpc_max_msg_len + self._old_task_factory: Optional[Any] = None + + self._grpc_resp_queue: queue.Queue = queue.Queue() + self._grpc_connected_fut = loop.create_future() + self._grpc_thread: Optional[threading.Thread] = threading.Thread( + name='grpc_local-thread', target=self.__poll_grpc) + + self._sync_call_tp: Optional[concurrent.futures.Executor] = ( + self._create_sync_call_tp(self._get_sync_tp_max_workers())) + + def on_logging(self, record: logging.LogRecord, + formatted_msg: str) -> None: + if record.levelno >= logging.CRITICAL: + log_level = protos.RpcLog.Critical + elif record.levelno >= logging.ERROR: + log_level = protos.RpcLog.Error + elif record.levelno >= logging.WARNING: + log_level = protos.RpcLog.Warning + elif record.levelno >= logging.INFO: + log_level = protos.RpcLog.Information + elif record.levelno >= logging.DEBUG: + log_level = protos.RpcLog.Debug + else: + log_level = getattr(protos.RpcLog, 'None') + + if is_system_log_category(record.name): + log_category = protos.RpcLog.RpcLogCategory.Value('System') + else: # customers using logging will yield 'root' in record.name + log_category = protos.RpcLog.RpcLogCategory.Value('User') + + log = dict( + level=log_level, + message=formatted_msg, + category=record.name, + log_category=log_category + ) + + invocation_id = get_current_invocation_id() + if invocation_id is not None: + log['invocation_id'] = invocation_id + + self._grpc_resp_queue.put_nowait( + protos.StreamingMessage( + request_id=self.request_id, + rpc_log=protos.RpcLog(**log))) + + @property + def request_id(self) -> str: + return self._request_id + + @property + def worker_id(self) -> str: + return self._worker_id + + @classmethod + async def connect(cls, host: str, port: int, worker_id: str, + request_id: str, connect_timeout: float): + loop = asyncio.events.get_event_loop() + disp = cls(loop, host, port, worker_id, request_id, connect_timeout) + # Safety check for mypy + if disp._grpc_thread is not None: + disp._grpc_thread.start() + await disp._grpc_connected_fut + logger.info('Successfully opened gRPC channel to %s:%s ', host, port) + return disp + + def __poll_grpc(self): + options = [] + if self._grpc_max_msg_len: + options.append(('grpc_local.max_receive_message_length', + self._grpc_max_msg_len)) + options.append(('grpc_local.max_send_message_length', + self._grpc_max_msg_len)) + + channel = grpc.insecure_channel( + f'{self._host}:{self._port}', options) + + try: + grpc.channel_ready_future(channel).result( + timeout=self._grpc_connect_timeout) + except Exception as ex: + self._loop.call_soon_threadsafe( + self._grpc_connected_fut.set_exception, ex) + return + else: + self._loop.call_soon_threadsafe( + self._grpc_connected_fut.set_result, True) + + stub = protos.FunctionRpcStub(channel) + + def gen(resp_queue): + while True: + msg = resp_queue.get() + if msg is self._GRPC_STOP_RESPONSE: + grpc_req_stream.cancel() + return + yield msg + + grpc_req_stream = stub.EventStream(gen(self._grpc_resp_queue)) + try: + for req in grpc_req_stream: + self._loop.call_soon_threadsafe( + self._loop.create_task, self._dispatch_grpc_request(req)) + except Exception as ex: + if ex is grpc_req_stream: + # Yes, this is how grpc_req_stream iterator exits. + return + error_logger.exception( + 'unhandled error in gRPC thread. Exception: {0}'.format( + ''.join(traceback.format_exception(ex)))) + raise + + async def _dispatch_grpc_request(self, request): + content_type = request.WhichOneof("content") + + match content_type: + case "worker_init_request": + request_handler = self._handle__worker_init_request + case "function_environment_reload_request": + request_handler = self._handle__function_environment_reload_request + case "functions_metadata_request": + request_handler = self._handle__functions_metadata_request + case "function_load_request": + request_handler = self._handle__function_load_request + case "worker_status_request": + request_handler = self._handle__worker_status_request + case "invocation_request": + request_handler = self._handle__invocation_request + case _: + # Don't crash on unknown messages. Log the error and return. + logger.error("Unknown StreamingMessage content type: %s", content_type) + return + + resp = await request_handler(request) + self._grpc_resp_queue.put_nowait(resp) + + async def dispatch_forever(self): # sourcery skip: swap-if-expression + if DispatcherMeta.__current_dispatcher__ is not None: + raise RuntimeError('there can be only one running dispatcher per ' + 'process') + + self._old_task_factory = self._loop.get_task_factory() + + DispatcherMeta.__current_dispatcher__ = self + try: + forever = self._loop.create_future() + + self._grpc_resp_queue.put_nowait( + protos.StreamingMessage( + request_id=self.request_id, + start_stream=protos.StartStream( + worker_id=self.worker_id))) + + # In Python 3.11+, constructing a task has an optional context + # parameter. Allow for this param to be passed to ContextEnabledTask + self._loop.set_task_factory( + lambda loop, coro, context=None, **kwargs: ContextEnabledTask( + coro, loop=loop, context=context, **kwargs)) + + # Detach console logging before enabling GRPC channel logging + logger.info('Detaching console logging.') + disable_console_logging() + + # Attach gRPC logging to the root logger. Since gRPC channel is + # established, should use it for system and user logs + logging_handler = AsyncLoggingHandler() + root_logger = logging.getLogger() + + log_level = logging.INFO if not is_envvar_true( + PYTHON_ENABLE_DEBUG_LOGGING) else logging.DEBUG + + root_logger.setLevel(log_level) + root_logger.addHandler(logging_handler) + logger.info('Switched to gRPC logging.') + logging_handler.flush() + + try: + await forever + finally: + logger.warning('Detaching gRPC logging due to exception.') + logging_handler.flush() + root_logger.removeHandler(logging_handler) + + # Reenable console logging when there's an exception + enable_console_logging() + logger.warning('Switched to console logging due to exception.') + finally: + DispatcherMeta.__current_dispatcher__ = None + + self._loop.set_task_factory(self._old_task_factory) + self.stop() + + def stop(self) -> None: + if self._grpc_thread is not None: + self._grpc_resp_queue.put_nowait(self._GRPC_STOP_RESPONSE) + self._grpc_thread.join() + self._grpc_thread = None + + self._stop_sync_call_tp() + + def _stop_sync_call_tp(self): + """Deallocate the current synchronous thread pool and assign + self._sync_call_tp to None. If the thread pool does not exist, + this will be a no op. + """ + if getattr(self, '_sync_call_tp', None): + assert self._sync_call_tp is not None # mypy fix + self._sync_call_tp.shutdown() + self._sync_call_tp = None + + @staticmethod + def _create_sync_call_tp(max_worker: Optional[int]) -> concurrent.futures.Executor: + """Create a thread pool executor with max_worker. This is a wrapper + over ThreadPoolExecutor constructor. Consider calling this method after + _stop_sync_call_tp() to ensure only 1 synchronous thread pool is + running. + """ + return concurrent.futures.ThreadPoolExecutor( + max_workers=max_worker + ) + + @staticmethod + def _get_sync_tp_max_workers() -> typing.Optional[int]: + def tp_max_workers_validator(value: str) -> bool: + try: + int_value = int(value) + except ValueError: + logger.warning('%s must be an integer', + PYTHON_THREADPOOL_THREAD_COUNT) + return False + + if int_value < 1: + logger.warning( + '%s must be set to a value between 1 and sys.maxint. ' + 'Reverting to default value for max_workers', + PYTHON_THREADPOOL_THREAD_COUNT, + 1) + return False + return True + + max_workers = get_app_setting(setting=PYTHON_THREADPOOL_THREAD_COUNT, + validator=tp_max_workers_validator) + + # We can box the app setting as int for earlier python versions. + return int(max_workers) if max_workers else None + + @staticmethod + def reload_library_worker(directory: str): + global _library_worker + v2_scriptfile = os.path.join(directory, get_script_file_name()) + if os.path.exists(v2_scriptfile): + try: + import azure_functions_worker_v2 # NoQA + _library_worker = azure_functions_worker_v2 + logger.debug("azure_functions_worker_v2 import succeeded: %s", + _library_worker.__file__) + except ImportError: + logger.debug("azure_functions_worker_v2 library not found: : %s", + traceback.format_exc()) + else: + try: + import azure_functions_worker_v1 # NoQA + _library_worker = azure_functions_worker_v1 + logger.debug("azure_functions_worker_v1 import succeeded: %s", + _library_worker.__file__) # type: ignore[union-attr] + except ImportError: + logger.debug("azure_functions_worker_v1 library not found: %s", + traceback.format_exc()) + + async def _handle__worker_init_request(self, request): + logger.info('Received WorkerInitRequest, ' + 'python version %s, ' + 'worker version %s, ' + 'request ID %s. ' + 'To enable debug level logging, please refer to ' + 'https://aka.ms/python-enable-debug-logging', + sys.version, + VERSION, + self.request_id) + + if DependencyManager.is_in_linux_consumption(): + import azure_functions_worker_v2 + + if DependencyManager.should_load_cx_dependencies(): + DependencyManager.prioritize_customer_dependencies() + + directory = request.worker_init_request.function_app_directory + self.reload_library_worker(directory) + + init_request = WorkerRequest(name="WorkerInitRequest", + request=request, + properties={"protos": protos, + "host": self._host}) + init_response = await ( + _library_worker.worker_init_request( # type: ignore[union-attr] + init_request)) + + return protos.StreamingMessage( + request_id=self.request_id, + worker_init_response=init_response) + + async def _handle__function_environment_reload_request(self, request): + logger.info('Received FunctionEnvironmentReloadRequest, ' + 'request ID: %s, ' + 'To enable debug level logging, please refer to ' + 'https://aka.ms/python-enable-debug-logging', + self.request_id) + + func_env_reload_request = \ + request.function_environment_reload_request + directory = func_env_reload_request.function_app_directory + + DependencyManager.prioritize_customer_dependencies(directory) + self.reload_library_worker(directory) + + env_reload_request = WorkerRequest(name="FunctionEnvironmentReloadRequest", + request=request, + properties={"protos": protos, + "host": self._host}) + env_reload_response = await ( + _library_worker.function_environment_reload_request( # type: ignore[union-attr] # noqa + env_reload_request)) + + return protos.StreamingMessage( + request_id=self.request_id, + function_environment_reload_response=env_reload_response) + + async def _handle__worker_status_request(self, request): + # Logging is not necessary in this request since the response is used + # for host to judge scale decisions of out-of-proc languages. + # Having log here will reduce the responsiveness of the worker. + return protos.StreamingMessage( + request_id=request.request_id, + worker_status_response=protos.WorkerStatusResponse()) + + async def _handle__functions_metadata_request(self, request): + logger.info( + 'Received WorkerMetadataRequest, request ID %s, ' + 'worker id: %s', + self.request_id, self.worker_id) + + metadata_request = WorkerRequest(name="WorkerMetadataRequest", request=request) + metadata_response = await ( + _library_worker.functions_metadata_request( # type: ignore[union-attr] + metadata_request)) + + return protos.StreamingMessage( + request_id=request.request_id, + function_metadata_response=metadata_response) + + async def _handle__function_load_request(self, request): + func_request = request.function_load_request + function_id = func_request.function_id + function_metadata = func_request.metadata + function_name = function_metadata.name + + logger.info( + 'Received WorkerLoadRequest, request ID %s, function_id: %s,' + 'function_name: %s, worker_id: %s', + self.request_id, function_id, function_name, self.worker_id) + + load_request = WorkerRequest(name="FunctionLoadRequest ", request=request) + load_response = await ( + _library_worker.function_load_request( # type: ignore[union-attr] + load_request)) + + return protos.StreamingMessage( + request_id=self.request_id, + function_load_response=load_response) + + async def _handle__invocation_request(self, request): + invoc_request = request.invocation_request + invocation_id = invoc_request.invocation_id + function_id = invoc_request.function_id + + logger.info( + 'Received FunctionInvocationRequest, request ID %s, function_id: %s,' + 'invocation_id: %s, worker_id: %s', + self.request_id, function_id, invocation_id, self.worker_id) + + invocation_request = WorkerRequest(name="FunctionInvocationRequest", + request=request, + properties={ + "threadpool": self._sync_call_tp}) + invocation_response = await ( + _library_worker.invocation_request( # type: ignore[union-attr] + invocation_request)) + + return protos.StreamingMessage( + request_id=self.request_id, + invocation_response=invocation_response) diff --git a/proxy_worker/logging.py b/proxy_worker/logging.py new file mode 100644 index 000000000..8f765640b --- /dev/null +++ b/proxy_worker/logging.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import logging +import logging.handlers +import sys +from typing import Optional + +# Logging Prefixes +SYSTEM_LOG_PREFIX = "proxy_worker" +SDK_LOG_PREFIX = "azure.functions" +SYSTEM_ERROR_LOG_PREFIX = "proxy_worker_errors" +CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog" + + +logger: logging.Logger = logging.getLogger(SYSTEM_LOG_PREFIX) +error_logger: logging.Logger = ( + logging.getLogger(SYSTEM_ERROR_LOG_PREFIX)) + +handler: Optional[logging.Handler] = None +error_handler: Optional[logging.Handler] = None + + +def setup(log_level, log_destination): + # Since handler and error_handler are moved to the global scope, + # before assigning to these handlers, we should define 'global' keyword + global handler + global error_handler + + if log_level == 'TRACE': + log_level = 'DEBUG' + + formatter = logging.Formatter(f'{CONSOLE_LOG_PREFIX}' + ' %(levelname)s: %(message)s') + + if log_destination is None: + # With no explicit log destination we do split logging, + # errors go into stderr, everything else -- to stdout. + error_handler = logging.StreamHandler(sys.stderr) + error_handler.setFormatter(formatter) + error_handler.setLevel(getattr(logging, log_level)) + + handler = logging.StreamHandler(sys.stdout) + + elif log_destination in ('stdout', 'stderr'): + handler = logging.StreamHandler(getattr(sys, log_destination)) + + elif log_destination == 'syslog': + handler = logging.handlers.SysLogHandler() + + else: + handler = logging.FileHandler(log_destination) + + if error_handler is None: + error_handler = handler + + handler.setFormatter(formatter) + handler.setLevel(getattr(logging, log_level)) + + logger.addHandler(handler) + logger.setLevel(getattr(logging, log_level)) + + error_logger.addHandler(error_handler) + error_logger.setLevel(getattr(logging, log_level)) + + +def disable_console_logging() -> None: + # We should only remove the sys.stdout stream, as error_logger is used for + # unexpected critical error logs handling. + if logger and handler: + handler.flush() + logger.removeHandler(handler) + + +def enable_console_logging() -> None: + if logger and handler: + logger.addHandler(handler) + + +def is_system_log_category(ctg: str) -> bool: + """Check if the logging namespace belongs to system logs. Category starts + with the following name will be treated as system logs. + 1. 'proxy_worker' (Worker Info) + 2. 'azure_functions_worker_errors' (Worker Error) + 3. 'azure.functions' (SDK) + + Expected behaviors for sytem logs and customer logs are listed below: + local_console customer_app_insight functions_kusto_table + system_log false false true + customer_log true true false + """ + return ctg.startswith(SYSTEM_LOG_PREFIX) or ctg.startswith(SDK_LOG_PREFIX) diff --git a/proxy_worker/protos/.gitignore b/proxy_worker/protos/.gitignore new file mode 100644 index 000000000..49d7060ef --- /dev/null +++ b/proxy_worker/protos/.gitignore @@ -0,0 +1,2 @@ +*_pb2.py +*_pb2_grpc.py diff --git a/proxy_worker/protos/__init__.py b/proxy_worker/protos/__init__.py new file mode 100644 index 000000000..e9c4f2397 --- /dev/null +++ b/proxy_worker/protos/__init__.py @@ -0,0 +1,43 @@ +from .FunctionRpc_pb2_grpc import ( # NoQA + FunctionRpcStub, + FunctionRpcServicer, + add_FunctionRpcServicer_to_server) + +from .FunctionRpc_pb2 import ( # NoQA + StreamingMessage, + StartStream, + WorkerInitRequest, + WorkerInitResponse, + RpcFunctionMetadata, + FunctionLoadRequest, + FunctionLoadResponse, + FunctionEnvironmentReloadRequest, + FunctionEnvironmentReloadResponse, + InvocationRequest, + InvocationResponse, + WorkerHeartbeat, + WorkerStatusRequest, + WorkerStatusResponse, + BindingInfo, + StatusResult, + RpcException, + ParameterBinding, + TypedData, + RpcHttp, + RpcHttpCookie, + RpcLog, + RpcSharedMemory, + RpcDataType, + CloseSharedMemoryResourcesRequest, + CloseSharedMemoryResourcesResponse, + FunctionsMetadataRequest, + FunctionMetadataResponse, + WorkerMetadata, + RpcRetryOptions) + +from .shared.NullableTypes_pb2 import ( + NullableString, + NullableBool, + NullableDouble, + NullableTimestamp +) diff --git a/proxy_worker/protos/_src/.gitignore b/proxy_worker/protos/_src/.gitignore new file mode 100644 index 000000000..940794e60 --- /dev/null +++ b/proxy_worker/protos/_src/.gitignore @@ -0,0 +1,288 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. +## +## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# User-specific files (MonoDevelop/Xamarin Studio) +*.userprefs + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ +[Ll]og/ + +# Visual Studio 2015 cache/options directory +.vs/ +# Uncomment if you have tasks that create the project's static files in wwwroot +#wwwroot/ + +# MSTest test Results +[Tt]est[Rr]esult*/ +[Bb]uild[Ll]og.* + +# NUNIT +*.VisualState.xml +TestResult.xml + +# Build Results of an ATL Project +[Dd]ebugPS/ +[Rr]eleasePS/ +dlldata.c + +# .NET Core +project.lock.json +project.fragment.lock.json +artifacts/ +**/Properties/launchSettings.json + +*_i.c +*_p.c +*_i.h +*.ilk +*.meta +*.obj +*.pch +*.pdb +*.pgc +*.pgd +*.rsp +*.sbr +*.tlb +*.tli +*.tlh +*.tmp +*.tmp_proj +*.log +*.vspscc +*.vssscc +.builds +*.pidb +*.svclog +*.scc + +# Chutzpah Test files +_Chutzpah* + +# Visual C++ cache files +ipch/ +*.aps +*.ncb +*.opendb +*.opensdf +*.sdf +*.cachefile +*.VC.db +*.VC.VC.opendb + +# Visual Studio profiler +*.psess +*.vsp +*.vspx +*.sap + +# TFS 2012 Local Workspace +$tf/ + +# Guidance Automation Toolkit +*.gpState + +# ReSharper is a .NET coding add-in +_ReSharper*/ +*.[Rr]e[Ss]harper +*.DotSettings.user + +# JustCode is a .NET coding add-in +.JustCode + +# TeamCity is a build add-in +_TeamCity* + +# DotCover is a Code Coverage Tool +*.dotCover + +# Visual Studio code coverage results +*.coverage +*.coveragexml + +# NCrunch +_NCrunch_* +.*crunch*.local.xml +nCrunchTemp_* + +# MightyMoose +*.mm.* +AutoTest.Net/ + +# Web workbench (sass) +.sass-cache/ + +# Installshield output folder +[Ee]xpress/ + +# DocProject is a documentation generator add-in +DocProject/buildhelp/ +DocProject/Help/*.HxT +DocProject/Help/*.HxC +DocProject/Help/*.hhc +DocProject/Help/*.hhk +DocProject/Help/*.hhp +DocProject/Help/Html2 +DocProject/Help/html + +# Click-Once directory +publish/ + +# Publish Web Output +*.[Pp]ublish.xml +*.azurePubxml +# TODO: Comment the next line if you want to checkin your web deploy settings +# but database connection strings (with potential passwords) will be unencrypted +*.pubxml +*.publishproj + +# Microsoft Azure Web App publish settings. Comment the next line if you want to +# checkin your Azure Web App publish settings, but sensitive information contained +# in these scripts will be unencrypted +PublishScripts/ + +# NuGet Packages +*.nupkg +# The packages folder can be ignored because of Package Restore +**/packages/* +# except build/, which is used as an MSBuild target. +!**/packages/build/ +# Uncomment if necessary however generally it will be regenerated when needed +#!**/packages/repositories.config +# NuGet v3's project.json files produces more ignorable files +*.nuget.props +*.nuget.targets + +# Microsoft Azure Build Output +csx/ +*.build.csdef + +# Microsoft Azure Emulator +ecf/ +rcf/ + +# Windows Store app package directories and files +AppPackages/ +BundleArtifacts/ +Package.StoreAssociation.xml +_pkginfo.txt + +# Visual Studio cache files +# files ending in .cache can be ignored +*.[Cc]ache +# but keep track of directories ending in .cache +!*.[Cc]ache/ + +# Others +ClientBin/ +~$* +*~ +*.dbmdl +*.dbproj.schemaview +*.jfm +*.pfx +*.publishsettings +orleans.codegen.cs + +# Since there are multiple workflows, uncomment next line to ignore bower_components +# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) +#bower_components/ + +# RIA/Silverlight projects +Generated_Code/ + +# Backup & report files from converting an old project file +# to a newer Visual Studio version. Backup files are not needed, +# because we have git ;-) +_UpgradeReport_Files/ +Backup*/ +UpgradeLog*.XML +UpgradeLog*.htm + +# SQL Server files +*.mdf +*.ldf +*.ndf + +# Business Intelligence projects +*.rdl.data +*.bim.layout +*.bim_*.settings + +# Microsoft Fakes +FakesAssemblies/ + +# GhostDoc plugin setting file +*.GhostDoc.xml + +# Node.js Tools for Visual Studio +.ntvs_analysis.dat +node_modules/ + +# Typescript v1 declaration files +typings/ + +# Visual Studio 6 build log +*.plg + +# Visual Studio 6 workspace options file +*.opt + +# Visual Studio 6 auto-generated workspace file (contains which files were open etc.) +*.vbw + +# Visual Studio LightSwitch build output +**/*.HTMLClient/GeneratedArtifacts +**/*.DesktopClient/GeneratedArtifacts +**/*.DesktopClient/ModelManifest.xml +**/*.Server/GeneratedArtifacts +**/*.Server/ModelManifest.xml +_Pvt_Extensions + +# Paket dependency manager +.paket/paket.exe +paket-files/ + +# FAKE - F# Make +.fake/ + +# JetBrains Rider +.idea/ +*.sln.iml + +# CodeRush +.cr/ + +# Python Tools for Visual Studio (PTVS) +__pycache__/ +*.pyc + +# Cake - Uncomment if you are using it +# tools/** +# !tools/packages.config + +# Telerik's JustMock configuration file +*.jmconfig + +# BizTalk build output +*.btp.cs +*.btm.cs +*.odx.cs +*.xsd.cs diff --git a/proxy_worker/protos/_src/LICENSE b/proxy_worker/protos/_src/LICENSE new file mode 100644 index 000000000..21071075c --- /dev/null +++ b/proxy_worker/protos/_src/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/proxy_worker/protos/_src/README.md b/proxy_worker/protos/_src/README.md new file mode 100644 index 000000000..b22f0bb4b --- /dev/null +++ b/proxy_worker/protos/_src/README.md @@ -0,0 +1,98 @@ +# Azure Functions Languge Worker Protobuf + +This repository contains the protobuf definition file which defines the gRPC service which is used between the [Azure Functions Host](https://github.com/Azure/azure-functions-host) and the Azure Functions language workers. This repo is shared across many repos in many languages (for each worker) by using git commands. + +To use this repo in Azure Functions language workers, follow steps below to add this repo as a subtree (*Adding This Repo*). If this repo is already embedded in a language worker repo, follow the steps to update the consumed file (*Pulling Updates*). + +Learn more about Azure Function's projects on the [meta](https://github.com/azure/azure-functions) repo. + +## Adding This Repo + +From within the Azure Functions language worker repo: +1. Define remote branch for cleaner git commands + - `git remote add proto-file https://github.com/azure/azure-functions-language-worker-protobuf.git` + - `git fetch proto-file` +2. Index contents of azure-functions-worker-protobuf to language worker repo + - `git read-tree --prefix= -u proto-file/` +3. Add new path in language worker repo to .gitignore file + - In .gitignore, add path in language worker repo +4. Finalize with commit + - `git commit -m "Added subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Branch: . Commit: "` + - `git push` + +## Pulling Updates + +From within the Azure Functions language worker repo: +1. Define remote branch for cleaner git commands + - `git remote add proto-file https://github.com/azure/azure-functions-language-worker-protobuf.git` + - `git fetch proto-file` +2. Pull a specific release tag + - `git fetch proto-file refs/tags/` + - Example: `git fetch proto-file refs/tags/v1.1.0-protofile` +3. Merge updates + - Merge with an explicit path to subtree: `git merge -X subtree= --squash --allow-unrelated-histories --strategy-option theirs` + - Example: `git merge -X subtree=src/WebJobs.Script.Grpc/azure-functions-language-worker-protobuf --squash v1.1.0-protofile --allow-unrelated-histories --strategy-option theirs` +4. Finalize with commit + - `git commit -m "Updated subtree from https://github.com/azure/azure-functions-language-worker-protobuf. Tag: . Commit: "` + - `git push` + +## Releasing a Language Worker Protobuf version + +1. Draft a release in the GitHub UI + - Be sure to inculde details of the release +2. Create a release version, following semantic versioning guidelines ([semver.org](https://semver.org/)) +3. Tag the version with the pattern: `v..

-protofile` (example: `v1.1.0-protofile`) +3. Merge `dev` to `master` + +## Consuming FunctionRPC.proto +*Note: Update versionNumber before running following commands* + +## CSharp +``` +set NUGET_PATH="%UserProfile%\.nuget\packages" +set GRPC_TOOLS_PATH=%NUGET_PATH%\grpc.tools\\tools\windows_x86 +set PROTO_PATH=.\azure-functions-language-worker-protobuf\src\proto +set PROTO=.\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto +set PROTOBUF_TOOLS=%NUGET_PATH%\google.protobuf.tools\\tools +set MSGDIR=.\Messages + +if exist %MSGDIR% rmdir /s /q %MSGDIR% +mkdir %MSGDIR% + +set OUTDIR=%MSGDIR%\DotNet +mkdir %OUTDIR% +%GRPC_TOOLS_PATH%\protoc.exe %PROTO% --csharp_out %OUTDIR% --grpc_out=%OUTDIR% --plugin=protoc-gen-grpc=%GRPC_TOOLS_PATH%\grpc_csharp_plugin.exe --proto_path=%PROTO_PATH% --proto_path=%PROTOBUF_TOOLS% +``` +## JavaScript +In package.json, add to the build script the following commands to build .js files and to build .ts files. Use and install npm package `protobufjs`. + +Generate JavaScript files: +``` +pbjs -t json-module -w commonjs -o azure-functions-language-worker-protobuf/src/rpc.js azure-functions-language-worker-protobuf/src/proto/FunctionRpc.proto +``` +Generate TypeScript files: +``` +pbjs -t static-module azure-functions-language-worker-protobuf/src/proto/FunctionRpc.proto -o azure-functions-language-worker-protobuf/src/rpc_static.js && pbts -o azure-functions-language-worker-protobuf/src/rpc.d.ts azure-functions-language-worker-protobuf/src/rpc_static.js +``` + +## Java +Maven plugin : [protobuf-maven-plugin](https://www.xolstice.org/protobuf-maven-plugin/) +In pom.xml add following under configuration for this plugin +${basedir}//azure-functions-language-worker-protobuf/src/proto + +## Python +--TODO + +## Contributing + +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.microsoft.com. + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or +contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. diff --git a/proxy_worker/protos/_src/src/proto/FunctionRpc.proto b/proxy_worker/protos/_src/src/proto/FunctionRpc.proto new file mode 100644 index 000000000..f48bc7bbe --- /dev/null +++ b/proxy_worker/protos/_src/src/proto/FunctionRpc.proto @@ -0,0 +1,730 @@ +syntax = "proto3"; +// protobuf vscode extension: https://marketplace.visualstudio.com/items?itemName=zxh404.vscode-proto3 + +option java_multiple_files = true; +option java_package = "com.microsoft.azure.functions.rpc.messages"; +option java_outer_classname = "FunctionProto"; +option csharp_namespace = "Microsoft.Azure.WebJobs.Script.Grpc.Messages"; +option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc"; + +package AzureFunctionsRpcMessages; + +import "google/protobuf/duration.proto"; +import "identity/ClaimsIdentityRpc.proto"; +import "shared/NullableTypes.proto"; + +// Interface exported by the server. +service FunctionRpc { + rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {} +} + +message StreamingMessage { + // Used to identify message between host and worker + string request_id = 1; + + // Payload of the message + oneof content { + + // Worker initiates stream + StartStream start_stream = 20; + + // Host sends capabilities/init data to worker + WorkerInitRequest worker_init_request = 17; + // Worker responds after initializing with its capabilities & status + WorkerInitResponse worker_init_response = 16; + + // MESSAGE NOT USED + // Worker periodically sends empty heartbeat message to host + WorkerHeartbeat worker_heartbeat = 15; + + // Host sends terminate message to worker. + // Worker terminates if it can, otherwise host terminates after a grace period + WorkerTerminate worker_terminate = 14; + + // Host periodically sends status request to the worker + WorkerStatusRequest worker_status_request = 12; + WorkerStatusResponse worker_status_response = 13; + + // On file change event, host sends notification to worker + FileChangeEventRequest file_change_event_request = 6; + + // Worker requests a desired action (restart worker, reload function) + WorkerActionResponse worker_action_response = 7; + + // Host sends required metadata to worker to load function + FunctionLoadRequest function_load_request = 8; + // Worker responds after loading with the load result + FunctionLoadResponse function_load_response = 9; + + // Host requests a given invocation + InvocationRequest invocation_request = 4; + + // Worker responds to a given invocation + InvocationResponse invocation_response = 5; + + // Host sends cancel message to attempt to cancel an invocation. + // If an invocation is cancelled, host will receive an invocation response with status cancelled. + InvocationCancel invocation_cancel = 21; + + // Worker logs a message back to the host + RpcLog rpc_log = 2; + + FunctionEnvironmentReloadRequest function_environment_reload_request = 25; + + FunctionEnvironmentReloadResponse function_environment_reload_response = 26; + + // Ask the worker to close any open shared memory resources for a given invocation + CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27; + CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28; + + // Worker indexing message types + FunctionsMetadataRequest functions_metadata_request = 29; + FunctionMetadataResponse function_metadata_response = 30; + + // Host sends required metadata to worker to load functions + FunctionLoadRequestCollection function_load_request_collection = 31; + + // Host gets the list of function load responses + FunctionLoadResponseCollection function_load_response_collection = 32; + + // Host sends required metadata to worker to warmup the worker + WorkerWarmupRequest worker_warmup_request = 33; + + // Worker responds after warming up with the warmup result + WorkerWarmupResponse worker_warmup_response = 34; + + } +} + +// Process.Start required info +// connection details +// protocol type +// protocol version + +// Worker sends the host information identifying itself +message StartStream { + // id of the worker + string worker_id = 2; +} + +// Host requests the worker to initialize itself +message WorkerInitRequest { + // version of the host sending init request + string host_version = 1; + + // A map of host supported features/capabilities + map capabilities = 2; + + // inform worker of supported categories and their levels + // i.e. Worker = Verbose, Function.MyFunc = None + map log_categories = 3; + + // Full path of worker.config.json location + string worker_directory = 4; + + // base directory for function app + string function_app_directory = 5; +} + +// Worker responds with the result of initializing itself +message WorkerInitResponse { + // PROPERTY NOT USED + // TODO: Remove from protobuf during next breaking change release + string worker_version = 1; + + // A map of worker supported features/capabilities + map capabilities = 2; + + // Status of the response + StatusResult result = 3; + + // Worker metadata captured for telemetry purposes + WorkerMetadata worker_metadata = 4; +} + +message WorkerMetadata { + // The runtime/stack name + string runtime_name = 1; + + // The version of the runtime/stack + string runtime_version = 2; + + // The version of the worker + string worker_version = 3; + + // The worker bitness/architecture + string worker_bitness = 4; + + // Optional additional custom properties + map custom_properties = 5; +} + +// Used by the host to determine success/failure/cancellation +message StatusResult { + // Indicates Failure/Success/Cancelled + enum Status { + Failure = 0; + Success = 1; + Cancelled = 2; + } + + // Status for the given result + Status status = 4; + + // Specific message about the result + string result = 1; + + // Exception message (if exists) for the status + RpcException exception = 2; + + // Captured logs or relevant details can use the logs property + repeated RpcLog logs = 3; +} + +// MESSAGE NOT USED +// TODO: Remove from protobuf during next breaking change release +message WorkerHeartbeat {} + +// Warning before killing the process after grace_period +// Worker self terminates ..no response on this +message WorkerTerminate { + google.protobuf.Duration grace_period = 1; +} + +// Host notifies worker of file content change +message FileChangeEventRequest { + // Types of File change operations (See link for more info: https://msdn.microsoft.com/en-us/library/t6xf43e0(v=vs.110).aspx) + enum Type { + Unknown = 0; + Created = 1; + Deleted = 2; + Changed = 4; + Renamed = 8; + All = 15; + } + + // type for this event + Type type = 1; + + // full file path for the file change notification + string full_path = 2; + + // Name of the function affected + string name = 3; +} + +// Indicates whether worker reloaded successfully or needs a restart +message WorkerActionResponse { + // indicates whether a restart is needed, or reload successfully + enum Action { + Restart = 0; + Reload = 1; + } + + // action for this response + Action action = 1; + + // text reason for the response + string reason = 2; +} + +// Used by the host to determine worker health +message WorkerStatusRequest { +} + +// Worker responds with status message +// TODO: Add any worker relevant status to response +message WorkerStatusResponse { +} + +message FunctionEnvironmentReloadRequest { + // Environment variables from the current process + map environment_variables = 1; + // Current directory of function app + string function_app_directory = 2; +} + +message FunctionEnvironmentReloadResponse { + // After specialization, worker sends capabilities & metadata. + // Worker metadata captured for telemetry purposes + WorkerMetadata worker_metadata = 1; + + // A map of worker supported features/capabilities + map capabilities = 2; + + // Status of the response + StatusResult result = 3; +} + +// Tell the out-of-proc worker to close any shared memory maps it allocated for given invocation +message CloseSharedMemoryResourcesRequest { + repeated string map_names = 1; +} + +// Response from the worker indicating which of the shared memory maps have been successfully closed and which have not been closed +// The key (string) is the map name and the value (bool) is true if it was closed, false if not +message CloseSharedMemoryResourcesResponse { + map close_map_results = 1; +} + +// Host tells the worker to load a list of Functions +message FunctionLoadRequestCollection { + repeated FunctionLoadRequest function_load_requests = 1; +} + +// Host gets the list of function load responses +message FunctionLoadResponseCollection { + repeated FunctionLoadResponse function_load_responses = 1; +} + +// Load request of a single Function +message FunctionLoadRequest { + // unique function identifier (avoid name collisions, facilitate reload case) + string function_id = 1; + + // Metadata for the request + RpcFunctionMetadata metadata = 2; + + // A flag indicating if managed dependency is enabled or not + bool managed_dependency_enabled = 3; +} + +// Worker tells host result of reload +message FunctionLoadResponse { + // unique function identifier + string function_id = 1; + + // Result of load operation + StatusResult result = 2; + // TODO: return type expected? + + // Result of load operation + bool is_dependency_downloaded = 3; +} + +// Information on how a Function should be loaded and its bindings +message RpcFunctionMetadata { + // TODO: do we want the host's name - the language worker might do a better job of assignment than the host + string name = 4; + + // base directory for the Function + string directory = 1; + + // Script file specified + string script_file = 2; + + // Entry point specified + string entry_point = 3; + + // Bindings info + map bindings = 6; + + // Is set to true for proxy + bool is_proxy = 7; + + // Function indexing status + StatusResult status = 8; + + // Function language + string language = 9; + + // Raw binding info + repeated string raw_bindings = 10; + + // unique function identifier (avoid name collisions, facilitate reload case) + string function_id = 13; + + // A flag indicating if managed dependency is enabled or not + bool managed_dependency_enabled = 14; + + // The optional function execution retry strategy to use on invocation failures. + RpcRetryOptions retry_options = 15; + + // Properties for function metadata + // They're usually specific to a worker and largely passed along to the controller API for use + // outside the host + map properties = 16; +} + +// Host tells worker it is ready to receive metadata +message FunctionsMetadataRequest { + // base directory for function app + string function_app_directory = 1; +} + +// Worker sends function metadata back to host +message FunctionMetadataResponse { + // list of function indexing responses + repeated RpcFunctionMetadata function_metadata_results = 1; + + // status of overall metadata request + StatusResult result = 2; + + // if set to true then host will perform indexing + bool use_default_metadata_indexing = 3; +} + +// Host requests worker to invoke a Function +message InvocationRequest { + // Unique id for each invocation + string invocation_id = 1; + + // Unique id for each Function + string function_id = 2; + + // Input bindings (include trigger) + repeated ParameterBinding input_data = 3; + + // binding metadata from trigger + map trigger_metadata = 4; + + // Populates activityId, tracestate and tags from host + RpcTraceContext trace_context = 5; + + // Current retry context + RetryContext retry_context = 6; +} + +// Host sends ActivityId, traceStateString and Tags from host +message RpcTraceContext { + // This corresponds to Activity.Current?.Id + string trace_parent = 1; + + // This corresponds to Activity.Current?.TraceStateString + string trace_state = 2; + + // This corresponds to Activity.Current?.Tags + map attributes = 3; +} + +// Host sends retry context for a function invocation +message RetryContext { + // Current retry count + int32 retry_count = 1; + + // Max retry count + int32 max_retry_count = 2; + + // Exception that caused the retry + RpcException exception = 3; +} + +// Host requests worker to cancel invocation +message InvocationCancel { + // Unique id for invocation + string invocation_id = 2; + + // PROPERTY NOT USED + google.protobuf.Duration grace_period = 1; +} + +// Worker responds with status of Invocation +message InvocationResponse { + // Unique id for invocation + string invocation_id = 1; + + // Output binding data + repeated ParameterBinding output_data = 2; + + // data returned from Function (for $return and triggers with return support) + TypedData return_value = 4; + + // Status of the invocation (success/failure/canceled) + StatusResult result = 3; +} + +message WorkerWarmupRequest { + // Full path of worker.config.json location + string worker_directory = 1; +} + +message WorkerWarmupResponse { + StatusResult result = 1; +} + +// Used to encapsulate data which could be a variety of types +message TypedData { + oneof data { + string string = 1; + string json = 2; + bytes bytes = 3; + bytes stream = 4; + RpcHttp http = 5; + sint64 int = 6; + double double = 7; + CollectionBytes collection_bytes = 8; + CollectionString collection_string = 9; + CollectionDouble collection_double = 10; + CollectionSInt64 collection_sint64 = 11; + ModelBindingData model_binding_data = 12; + CollectionModelBindingData collection_model_binding_data = 13; + } +} + +// Specify which type of data is contained in the shared memory region being read +enum RpcDataType { + unknown = 0; + string = 1; + json = 2; + bytes = 3; + stream = 4; + http = 5; + int = 6; + double = 7; + collection_bytes = 8; + collection_string = 9; + collection_double = 10; + collection_sint64 = 11; +} + +// Used to provide metadata about shared memory region to read data from +message RpcSharedMemory { + // Name of the shared memory map containing data + string name = 1; + // Offset in the shared memory map to start reading data from + int64 offset = 2; + // Number of bytes to read (starting from the offset) + int64 count = 3; + // Final type to which the read data (in bytes) is to be interpreted as + RpcDataType type = 4; +} + +// Used to encapsulate collection string +message CollectionString { + repeated string string = 1; +} + +// Used to encapsulate collection bytes +message CollectionBytes { + repeated bytes bytes = 1; +} + +// Used to encapsulate collection double +message CollectionDouble { + repeated double double = 1; +} + +// Used to encapsulate collection sint64 +message CollectionSInt64 { + repeated sint64 sint64 = 1; +} + +// Used to describe a given binding on invocation +message ParameterBinding { + // Name for the binding + string name = 1; + + oneof rpc_data { + // Data for the binding + TypedData data = 2; + + // Metadata about the shared memory region to read data from + RpcSharedMemory rpc_shared_memory = 3; + } +} + +// Used to describe a given binding on load +message BindingInfo { + // Indicates whether it is an input or output binding (or a fancy inout binding) + enum Direction { + in = 0; + out = 1; + inout = 2; + } + + // Indicates the type of the data for the binding + enum DataType { + undefined = 0; + string = 1; + binary = 2; + stream = 3; + } + + // Type of binding (e.g. HttpTrigger) + string type = 2; + + // Direction of the given binding + Direction direction = 3; + + DataType data_type = 4; + + // Properties for binding metadata + map properties = 5; +} + +// Used to send logs back to the Host +message RpcLog { + // Matching ILogger semantics + // https://github.com/aspnet/Logging/blob/9506ccc3f3491488fe88010ef8b9eb64594abf95/src/Microsoft.Extensions.Logging/Logger.cs + // Level for the Log + enum Level { + Trace = 0; + Debug = 1; + Information = 2; + Warning = 3; + Error = 4; + Critical = 5; + None = 6; + } + + // Category of the log. Defaults to User if not specified. + enum RpcLogCategory { + User = 0; + System = 1; + CustomMetric = 2; + } + + // Unique id for invocation (if exists) + string invocation_id = 1; + + // TOD: This should be an enum + // Category for the log (startup, load, invocation, etc.) + string category = 2; + + // Level for the given log message + Level level = 3; + + // Message for the given log + string message = 4; + + // Id for the even associated with this log (if exists) + string event_id = 5; + + // Exception (if exists) + RpcException exception = 6; + + // json serialized property bag + string properties = 7; + + // Category of the log. Either user(default), system, or custom metric. + RpcLogCategory log_category = 8; + + // strongly-typed (ish) property bag + map propertiesMap = 9; +} + +// Encapsulates an Exception +message RpcException { + // Source of the exception + string source = 3; + + // Stack trace for the exception + string stack_trace = 1; + + // Textual message describing the exception + string message = 2; + + // Worker specifies whether exception is a user exception, + // for purpose of application insights logging. Defaults to false. + bool is_user_exception = 4; + + // Type of exception. If it's a user exception, the type is passed along to app insights. + // Otherwise, it's ignored for now. + string type = 5; +} + +// Http cookie type. Note that only name and value are used for Http requests +message RpcHttpCookie { + // Enum that lets servers require that a cookie shouldn't be sent with cross-site requests + enum SameSite { + None = 0; + Lax = 1; + Strict = 2; + ExplicitNone = 3; + } + + // Cookie name + string name = 1; + + // Cookie value + string value = 2; + + // Specifies allowed hosts to receive the cookie + NullableString domain = 3; + + // Specifies URL path that must exist in the requested URL + NullableString path = 4; + + // Sets the cookie to expire at a specific date instead of when the client closes. + // It is generally recommended that you use "Max-Age" over "Expires". + NullableTimestamp expires = 5; + + // Sets the cookie to only be sent with an encrypted request + NullableBool secure = 6; + + // Sets the cookie to be inaccessible to JavaScript's Document.cookie API + NullableBool http_only = 7; + + // Allows servers to assert that a cookie ought not to be sent along with cross-site requests + SameSite same_site = 8; + + // Number of seconds until the cookie expires. A zero or negative number will expire the cookie immediately. + NullableDouble max_age = 9; +} + +// TODO - solidify this or remove it +message RpcHttp { + string method = 1; + string url = 2; + map headers = 3; + TypedData body = 4; + map params = 10; + string status_code = 12; + map query = 15; + bool enable_content_negotiation= 16; + TypedData rawBody = 17; + repeated RpcClaimsIdentity identities = 18; + repeated RpcHttpCookie cookies = 19; + map nullable_headers = 20; + map nullable_params = 21; + map nullable_query = 22; +} + +// Message representing Microsoft.Azure.WebJobs.ParameterBindingData +// Used for hydrating SDK-type bindings in out-of-proc workers +message ModelBindingData +{ + // The version of the binding data content + string version = 1; + + // The extension source of the binding data + string source = 2; + + // The content type of the binding data content + string content_type = 3; + + // The binding data content + bytes content = 4; +} + +// Used to encapsulate collection model_binding_data +message CollectionModelBindingData { + repeated ModelBindingData model_binding_data = 1; +} + +// Retry policy which the worker sends the host when the worker indexes +// a function. +message RpcRetryOptions +{ + // The retry strategy to use. Valid values are fixed delay or exponential backoff. + enum RetryStrategy + { + exponential_backoff = 0; + fixed_delay = 1; + } + + // The maximum number of retries allowed per function execution. + // -1 means to retry indefinitely. + int32 max_retry_count = 2; + + // The delay that's used between retries when you're using a fixed delay strategy. + google.protobuf.Duration delay_interval = 3; + + // The minimum retry delay when you're using an exponential backoff strategy + google.protobuf.Duration minimum_interval = 4; + + // The maximum retry delay when you're using an exponential backoff strategy + google.protobuf.Duration maximum_interval = 5; + + RetryStrategy retry_strategy = 6; +} \ No newline at end of file diff --git a/proxy_worker/protos/_src/src/proto/identity/ClaimsIdentityRpc.proto b/proxy_worker/protos/_src/src/proto/identity/ClaimsIdentityRpc.proto new file mode 100644 index 000000000..c3945bb8a --- /dev/null +++ b/proxy_worker/protos/_src/src/proto/identity/ClaimsIdentityRpc.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +// protobuf vscode extension: https://marketplace.visualstudio.com/items?itemName=zxh404.vscode-proto3 + +option java_package = "com.microsoft.azure.functions.rpc.messages"; + +import "shared/NullableTypes.proto"; + +// Light-weight representation of a .NET System.Security.Claims.ClaimsIdentity object. +// This is the same serialization as found in EasyAuth, and needs to be kept in sync with +// its ClaimsIdentitySlim definition, as seen in the WebJobs extension: +// https://github.com/Azure/azure-webjobs-sdk-extensions/blob/dev/src/WebJobs.Extensions.Http/ClaimsIdentitySlim.cs +message RpcClaimsIdentity { + NullableString authentication_type = 1; + NullableString name_claim_type = 2; + NullableString role_claim_type = 3; + repeated RpcClaim claims = 4; +} + +// Light-weight representation of a .NET System.Security.Claims.Claim object. +// This is the same serialization as found in EasyAuth, and needs to be kept in sync with +// its ClaimSlim definition, as seen in the WebJobs extension: +// https://github.com/Azure/azure-webjobs-sdk-extensions/blob/dev/src/WebJobs.Extensions.Http/ClaimSlim.cs +message RpcClaim { + string value = 1; + string type = 2; +} diff --git a/proxy_worker/protos/_src/src/proto/shared/NullableTypes.proto b/proxy_worker/protos/_src/src/proto/shared/NullableTypes.proto new file mode 100644 index 000000000..4fb476502 --- /dev/null +++ b/proxy_worker/protos/_src/src/proto/shared/NullableTypes.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; +// protobuf vscode extension: https://marketplace.visualstudio.com/items?itemName=zxh404.vscode-proto3 + +option java_package = "com.microsoft.azure.functions.rpc.messages"; + +import "google/protobuf/timestamp.proto"; + +message NullableString { + oneof string { + string value = 1; + } +} + +message NullableDouble { + oneof double { + double value = 1; + } +} + +message NullableBool { + oneof bool { + bool value = 1; + } +} + +message NullableTimestamp { + oneof timestamp { + google.protobuf.Timestamp value = 1; + } +} diff --git a/proxy_worker/protos/identity/__init__.py b/proxy_worker/protos/identity/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/proxy_worker/protos/shared/__init__.py b/proxy_worker/protos/shared/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/proxy_worker/start_worker.py b/proxy_worker/start_worker.py new file mode 100644 index 000000000..d468cef69 --- /dev/null +++ b/proxy_worker/start_worker.py @@ -0,0 +1,77 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +"""Main entrypoint.""" + +import argparse +import traceback + +_GRPC_CONNECTION_TIMEOUT = 5.0 + +def parse_args(): + parser = argparse.ArgumentParser( + description='Python Azure Functions Worker') + parser.add_argument('--host', + help="host address") + parser.add_argument('--port', type=int, + help='port number') + parser.add_argument('--workerId', dest='worker_id', + help='id for the worker') + parser.add_argument('--requestId', dest='request_id', + help='id of the request') + parser.add_argument('--grpcMaxMessageLength', type=int, + dest='grpc_max_msg_len') + parser.add_argument('--functions-uri', dest='functions_uri', type=str, + help='URI with IP Address and Port used to' + ' connect to the Host via gRPC.') + parser.add_argument('--functions-worker-id', + dest='functions_worker_id', type=str, + help='Worker ID assigned to this language worker.') + parser.add_argument('--functions-request-id', dest='functions_request_id', + type=str, help='Request ID used for gRPC communication ' + 'with the Host.') + parser.add_argument('--functions-grpc-max-message-length', type=int, + dest='functions_grpc_max_msg_len', + help='Max grpc_local message length for Functions') + return parser.parse_args() + + +def start(): + from .utils.dependency import DependencyManager + DependencyManager.initialize() + DependencyManager.use_worker_dependencies() + + import asyncio + + from . import logging + from .logging import error_logger, logger + + args = parse_args() + logging.setup(log_level="INFO", log_destination=None) + + logger.info("Args: %s", args) + logger.info( + 'Starting proxy worker. Worker ID: %s, Request ID: %s, Host Address: %s:%s', + args.worker_id, args.request_id, args.host, args.port) + + try: + return asyncio.run(start_async( + args.host, args.port, args.worker_id, args.request_id)) + except Exception as ex: + error_logger.exception( + 'unhandled error in functions worker: {0}'.format( + ''.join(traceback.format_exception(ex)))) + raise + + +async def start_async(host, port, worker_id, request_id): + from . import dispatcher + + disp = await dispatcher.Dispatcher.connect(host=host, port=port, + worker_id=worker_id, + request_id=request_id, + connect_timeout=_GRPC_CONNECTION_TIMEOUT) + await disp.dispatch_forever() + + +if __name__ == '__main__': + start() diff --git a/proxy_worker/utils/__init__.py b/proxy_worker/utils/__init__.py new file mode 100644 index 000000000..5b7f7a925 --- /dev/null +++ b/proxy_worker/utils/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. diff --git a/proxy_worker/utils/common.py b/proxy_worker/utils/common.py new file mode 100644 index 000000000..5b2f1e98f --- /dev/null +++ b/proxy_worker/utils/common.py @@ -0,0 +1,86 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +from typing import Callable, Optional + +from proxy_worker.utils.constants import ( + PYTHON_SCRIPT_FILE_NAME, + PYTHON_SCRIPT_FILE_NAME_DEFAULT, +) + + +def get_app_setting( + setting: str, + default_value: Optional[str] = None, + validator: Optional[Callable[[str], bool]] = None +) -> Optional[str]: + """Returns the application setting from environment variable. + + Parameters + ---------- + setting: str + The name of the application setting (e.g. FUNCTIONS_RUNTIME_VERSION) + + default_value: Optional[str] + The expected return value when the application setting is not found, + or the app setting does not pass the validator. + + validator: Optional[Callable[[str], bool]] + A function accepts the app setting value and should return True when + the app setting value is acceptable. + + Returns + ------- + Optional[str] + A string value that is set in the application setting + """ + app_setting_value = os.getenv(setting) + + # If an app setting is not configured, we return the default value + if app_setting_value is None: + return default_value + + # If there's no validator, we should return the app setting value directly + if validator is None: + return app_setting_value + + # If the app setting is set with a validator, + # On True, should return the app setting value + # On False, should return the default value + if validator(app_setting_value): + return app_setting_value + return default_value + + +def is_true_like(setting: str) -> bool: + if setting is None: + return False + + return setting.lower().strip() in {'1', 'true', 't', 'yes', 'y'} + + +def is_false_like(setting: str) -> bool: + if setting is None: + return False + + return setting.lower().strip() in {'0', 'false', 'f', 'no', 'n'} + + +def is_envvar_true(env_key: str) -> bool: + if os.getenv(env_key) is None: + return False + + return is_true_like(os.environ[env_key]) + + +def is_envvar_false(env_key: str) -> bool: + if os.getenv(env_key) is None: + return False + + return is_false_like(os.environ[env_key]) + + +def get_script_file_name(): + return get_app_setting(PYTHON_SCRIPT_FILE_NAME, + PYTHON_SCRIPT_FILE_NAME_DEFAULT) diff --git a/proxy_worker/utils/constants.py b/proxy_worker/utils/constants.py new file mode 100644 index 000000000..c5e0dd2ab --- /dev/null +++ b/proxy_worker/utils/constants.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +# App Setting constants +PYTHON_ENABLE_DEBUG_LOGGING = "PYTHON_ENABLE_DEBUG_LOGGING" +PYTHON_THREADPOOL_THREAD_COUNT = "PYTHON_THREADPOOL_THREAD_COUNT" + +# Container constants +CONTAINER_NAME = "CONTAINER_NAME" +AZURE_WEBJOBS_SCRIPT_ROOT = "AzureWebJobsScriptRoot" + +# new programming model default script file name +PYTHON_SCRIPT_FILE_NAME = "PYTHON_SCRIPT_FILE_NAME" +PYTHON_SCRIPT_FILE_NAME_DEFAULT = "function_app.py" + diff --git a/proxy_worker/utils/dependency.py b/proxy_worker/utils/dependency.py new file mode 100644 index 000000000..b5b07dbd5 --- /dev/null +++ b/proxy_worker/utils/dependency.py @@ -0,0 +1,311 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import os +import re +import sys +from types import ModuleType +from typing import List, Optional + +from ..logging import logger +from .common import is_envvar_true +from .constants import AZURE_WEBJOBS_SCRIPT_ROOT, CONTAINER_NAME + + +class DependencyManager: + """The dependency manager controls the Python packages source, preventing + worker packages interfer customer's code. + + It has two mode, in worker mode, the Python packages are loaded from worker + path, (e.g. workers/python///). In customer mode, + the packages are loaded from customer's .python_packages/ folder or from + their virtual environment. + + Azure Functions has three different set of sys.path ordering, + + Linux Consumption sys.path: [ + "/tmp/functions\\standby\\wwwroot", # Placeholder folder + "/home/site/wwwroot/.python_packages/lib/site-packages", # CX's deps + "/azure-functions-host/workers/python/3.13/LINUX/X64", # Worker's deps + "/home/site/wwwroot" # CX's Working Directory + ] + + Linux Dedicated/Premium sys.path: [ + "/home/site/wwwroot", # CX's Working Directory + "/home/site/wwwroot/.python_packages/lib/site-packages", # CX's deps + "/azure-functions-host/workers/python/3.13/LINUX/X64", # Worker's deps + ] + + Core Tools sys.path: [ + "%appdata%\\azure-functions-core-tools\\bin\\workers\\" + "python\\3.13\\WINDOWS\\X64", # Worker's deps + "C:\\Users\\user\\Project\\.venv311\\lib\\site-packages", # CX's deps + "C:\\Users\\user\\Project", # CX's Working Directory + ] + + When we first start up the Python worker, we should only loaded from + worker's deps and create module namespace (e.g. google.protobuf variable). + + Once the worker receives worker init request, we clear out the sys.path, + worker sys.modules cache and sys.path_import_cache so the libraries + will only get loaded from CX's deps path. + """ + + cx_deps_path: str = '' + cx_working_dir: str = '' + worker_deps_path: str = '' + + @classmethod + def initialize(cls): + cls.cx_deps_path = cls._get_cx_deps_path() + cls.cx_working_dir = cls._get_cx_working_dir() + cls.worker_deps_path = cls._get_worker_deps_path() + + @classmethod + def is_in_linux_consumption(cls): + return CONTAINER_NAME in os.environ + + @classmethod + def should_load_cx_dependencies(cls): + """ + Customer dependencies should be loaded when + 1) App is a dedicated app + 2) App is linux consumption but not in placeholder mode. + This can happen when the worker restarts for any reason + (OOM, timeouts etc) and env reload request is not called. + """ + return not (DependencyManager.is_in_linux_consumption() + and is_envvar_true("WEBSITE_PLACEHOLDER_MODE")) + + @classmethod + def use_worker_dependencies(cls): + """Switch the sys.path and ensure the worker imports are loaded from + Worker's dependenciess. + + This will not affect already imported namespaces, but will clear out + the module cache and ensure the upcoming modules are loaded from + worker's dependency path. + """ + + # The following log line will not show up in core tools but should + # work in kusto since core tools only collects gRPC logs. This function + # is executed even before the gRPC logging channel is ready. + logger.info('Applying use_worker_dependencies:' + ' worker_dependencies: %s,' + ' customer_dependencies: %s,' + ' working_directory: %s', cls.worker_deps_path, + cls.cx_deps_path, cls.cx_working_dir) + + cls._remove_from_sys_path(cls.cx_deps_path) + cls._remove_from_sys_path(cls.cx_working_dir) + cls._add_to_sys_path(cls.worker_deps_path, True) + logger.info('Start using worker dependencies %s. Sys.path: %s', + cls.worker_deps_path, sys.path) + + @classmethod + def prioritize_customer_dependencies(cls, cx_working_dir=None): + """Switch the sys.path and ensure the customer's code import are loaded + from CX's deppendencies. + + This will not affect already imported namespaces, but will clear out + the module cache and ensure the upcoming modules are loaded from + customer's dependency path. + + As for Linux Consumption, this will only remove worker_deps_path, + but the customer's path will be loaded in function_environment_reload. + + The search order of a module name in customer's paths is: + 1. cx_deps_path + 2. worker_deps_path + 3. cx_working_dir + """ + # Try to get the latest customer's working directory + # cx_working_dir => cls.cx_working_dir => AzureWebJobsScriptRoot + working_directory: str = '' + if cx_working_dir: + working_directory = os.path.abspath(cx_working_dir) + if not working_directory: + working_directory = cls.cx_working_dir + if not working_directory: + working_directory = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT, '') + + # Try to get the latest customer's dependency path + cx_deps_path: str = cls._get_cx_deps_path() + + if not cx_deps_path: + cx_deps_path = cls.cx_deps_path + + logger.info( + 'Applying prioritize_customer_dependencies: ' + 'worker_dependencies_path: %s, customer_dependencies_path: %s, ' + 'working_directory: %s, Linux Consumption: %s, Placeholder: %s, ' + 'sys.path: %s', + cls.worker_deps_path, cx_deps_path, working_directory, + DependencyManager.is_in_linux_consumption(), + is_envvar_true("WEBSITE_PLACEHOLDER_MODE"), sys.path) + + cls._remove_from_sys_path(cls.worker_deps_path) + cls._add_to_sys_path(cls.worker_deps_path, True) + cls._add_to_sys_path(cls.cx_deps_path, True) + cls._add_to_sys_path(working_directory, False) + + logger.info(f'Finished prioritize_customer_dependencies: {sys.path}') + + @classmethod + def _add_to_sys_path(cls, path: str, add_to_first: bool): + """This will ensure no duplicated path are added into sys.path and + clear importer cache. No action if path already exists in sys.path. + + Parameters + ---------- + path: str + The path needs to be added into sys.path. + If the path is an empty string, no action will be taken. + add_to_first: bool + Should the path added to the first entry (highest priority) + """ + if path and path not in sys.path: + if add_to_first: + sys.path.insert(0, path) + else: + sys.path.append(path) + + # Only clear path importer and sys.modules cache if path is not + # defined in sys.path + cls._clear_path_importer_cache_and_modules(path) + + @classmethod + def _remove_from_sys_path(cls, path: str): + """This will remove path from sys.path and clear importer cache. + No action if the path does not exist in sys.path. + + Parameters + ---------- + path: str + The path to be removed from sys.path. + If the path is an empty string, no action will be taken. + """ + if path and path in sys.path: + # Remove all occurances in sys.path + sys.path = list(filter(lambda p: p != path, sys.path)) + + # In case if any part of worker initialization do sys.path.pop() + # Always do a cache clear in path importer and sys.modules + cls._clear_path_importer_cache_and_modules(path) + + @classmethod + def _clear_path_importer_cache_and_modules(cls, path: str): + """Removes path from sys.path_importer_cache and clear related + sys.modules cache. No action if the path is empty or no entries + in sys.path_importer_cache or sys.modules. + + Parameters + ---------- + path: str + The path to be removed from sys.path_importer_cache. All related + modules will be cleared out from sys.modules cache. + If the path is an empty string, no action will be taken. + """ + if path and path in sys.path_importer_cache: + sys.path_importer_cache.pop(path) + + if path: + cls._remove_module_cache(path) + + @staticmethod + def _get_cx_deps_path() -> str: + """Get the directory storing the customer's third-party libraries. + + Returns + ------- + str + Core Tools: path to customer's site packages + Linux Dedicated/Premium: path to customer's site packages + Linux Consumption: empty string + """ + prefix: Optional[str] = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT) + cx_paths: List[str] = [ + p for p in sys.path + if prefix and p.startswith(prefix) and ('site-packages' in p) + ] + # Return first or default of customer path + return (cx_paths or [''])[0] + + @staticmethod + def _get_cx_working_dir() -> str: + """Get the customer's working directory. + + Returns + ------- + str + Core Tools: AzureWebJobsScriptRoot env variable + Linux Dedicated/Premium: AzureWebJobsScriptRoot env variable + Linux Consumption: empty string + """ + return os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT, '') + + @staticmethod + def _get_worker_deps_path() -> str: + """Get the worker dependency sys.path. This will always available + even in all skus. + + Returns + ------- + str + The worker packages path + """ + # 1. Try to parse the absolute path python/3.13/LINUX/X64 in sys.path + r = re.compile(r'.*python(\/|\\)\d+\.\d+(\/|\\)(WINDOWS|LINUX|OSX).*') + worker_deps_paths: List[str] = [p for p in sys.path if r.match(p)] + if worker_deps_paths: + return worker_deps_paths[0] + + # 2. If it fails to find one, try to find one from the parent path + # This is used for handling the CI/localdev environment + return os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..') + ) + + @staticmethod + def _remove_module_cache(path: str): + """Remove module cache if the module is imported from specific path. + This will not impact builtin modules + + Parameters + ---------- + path: str + The module cache to be removed if it is imported from this path. + """ + if not path: + return + + not_builtin = set(sys.modules.keys()) - set(sys.builtin_module_names) + + # Don't reload proxy_worker + to_be_cleared_from_cache = set([ + module_name for module_name in not_builtin + if not module_name.startswith('proxy_worker') + ]) + + for module_name in to_be_cleared_from_cache: + module = sys.modules.get(module_name) + if not isinstance(module, ModuleType): + continue + + # Module path can be actual file path or a pure namespace path. + # Both of these has the module path placed in __path__ property + # The property .__path__ can be None or does not exist in module + try: + # Safely check for __path__ and __file__ existence + module_paths = set() + if hasattr(module, '__path__') and module.__path__: + module_paths.update(module.__path__) + if hasattr(module, '__file__') and module.__file__: + module_paths.add(module.__file__) + + if any([p for p in module_paths if p.startswith(path)]): + sys.modules.pop(module_name) + except Exception as e: + logger.warning( + 'Attempt to remove module cache for %s but failed with ' + '%s. Using the original module cache.', + module_name, e) diff --git a/proxy_worker/version.py b/proxy_worker/version.py new file mode 100644 index 000000000..b5a827733 --- /dev/null +++ b/proxy_worker/version.py @@ -0,0 +1,4 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +VERSION = "4.36.1" diff --git a/pyproject.toml b/pyproject.toml index 3cfbd9d9a..7f9e4b0e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Operating System :: Microsoft :: Windows", "Operating System :: POSIX", "Operating System :: MacOS :: MacOS X", @@ -27,14 +29,19 @@ classifiers = [ ] dependencies = [ "azure-functions==1.23.0", - "python-dateutil ~=2.9.0", + "python-dateutil~=2.9.0", "protobuf~=3.19.3; python_version == '3.7'", - "protobuf~=4.25.3; python_version >= '3.8'", + "protobuf~=4.25.3; python_version >= '3.8' and python_version < '3.13'", + "protobuf~=5.29.0; python_version >= '3.13'", "grpcio-tools~=1.43.0; python_version == '3.7'", - "grpcio-tools~=1.59.0; python_version >= '3.8'", + "grpcio-tools~=1.59.0; python_version >= '3.8' and python_version < '3.13'", + "grpcio-tools~=1.70.0; python_version >= '3.13'", "grpcio~=1.43.0; python_version == '3.7'", - "grpcio~=1.59.0; python_version >= '3.8'", - "azurefunctions-extensions-base; python_version >= '3.8'" + "grpcio ~=1.59.0; python_version >= '3.8' and python_version < '3.13'", + "grpcio~=1.70.0; python_version >= '3.13'", + "azurefunctions-extensions-base; python_version >= '3.8'", + "azure-functions-runtime==1.0.0a3; python_version >= '3.13'", + "azure-functions-runtime-v1==1.0.0a2; python_version >= '3.13'" ] [project.urls] @@ -62,6 +69,7 @@ dev = [ "pytest-randomly", "pytest-instafail", "pytest-rerunfailures", + "pytest-asyncio", "ptvsd", "python-dotenv", "plotly", diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 000000000..b4a9533ab --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +markers = +asyncio: mark a test as an asyncio test +asyncio_mode = auto +asyncio_default_fixture_loop_scope = function \ No newline at end of file diff --git a/python/prodV4/worker.config.json b/python/prodV4/worker.config.json index 3e431ac4d..eaa3c50cb 100644 --- a/python/prodV4/worker.config.json +++ b/python/prodV4/worker.config.json @@ -1,9 +1,9 @@ { "description":{ "language":"python", - "defaultRuntimeVersion":"3.11", + "defaultRuntimeVersion":"3.12", "supportedOperatingSystems":["LINUX", "OSX", "WINDOWS"], - "supportedRuntimeVersions":["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"], + "supportedRuntimeVersions":["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"], "supportedArchitectures":["X64", "X86", "Arm64"], "extensions":[".py"], "defaultExecutablePath":"python", diff --git a/python/prodV4/worker.py b/python/prodV4/worker.py index 021fa3f03..512f7f687 100644 --- a/python/prodV4/worker.py +++ b/python/prodV4/worker.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + import os import pathlib import sys diff --git a/python/proxyV4/worker.py b/python/proxyV4/worker.py new file mode 100644 index 000000000..57889b151 --- /dev/null +++ b/python/proxyV4/worker.py @@ -0,0 +1,68 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +import pathlib +import sys + +PKGS_PATH = "/home/site/wwwroot/.python_packages" +PKGS = "lib/site-packages" + +# Azure environment variables +AZURE_WEBSITE_INSTANCE_ID = "WEBSITE_INSTANCE_ID" +AZURE_CONTAINER_NAME = "CONTAINER_NAME" +AZURE_WEBJOBS_SCRIPT_ROOT = "AzureWebJobsScriptRoot" + + +def is_azure_environment(): + """Check if the function app is running on the cloud""" + return (AZURE_CONTAINER_NAME in os.environ + or AZURE_WEBSITE_INSTANCE_ID in os.environ) + + +def validate_python_version(): + minor_version = sys.version_info[1] + if not (13 <= minor_version < 14): + raise RuntimeError(f'Unsupported Python version: 3.{minor_version}') + + +def determine_user_pkg_paths(): + """This finds the user packages when function apps are running on the cloud + User packages are defined in: + /home/site/wwwroot/.python_packages/lib/site-packages + """ + usr_packages_path = [os.path.join(PKGS_PATH, PKGS)] + return usr_packages_path + + +def add_script_root_to_sys_path(): + """Append function project root to module finding sys.path""" + functions_script_root = os.getenv(AZURE_WEBJOBS_SCRIPT_ROOT) + if functions_script_root is not None: + sys.path.append(functions_script_root) + + +if __name__ == '__main__': + validate_python_version() + func_worker_dir = str(pathlib.Path(__file__).absolute().parent) + env = os.environ + + # Setting up python path for all environments to prioritize + # third-party user packages over worker packages in PYTHONPATH + user_pkg_paths = determine_user_pkg_paths() + joined_pkg_paths = os.pathsep.join(user_pkg_paths) + env['PYTHONPATH'] = f'{joined_pkg_paths}:{func_worker_dir}' + + project_root = os.path.abspath(os.path.dirname(__file__)) + if project_root not in sys.path: + sys.path.append(project_root) + + if is_azure_environment(): + os.execve(sys.executable, + [sys.executable, '-m', 'proxy_worker'] + + sys.argv[1:], + env) + else: + add_script_root_to_sys_path() + from proxy_worker import start_worker + start_worker.start() diff --git a/python/test/worker.py b/python/test/worker.py index e2ef12d22..95790083f 100644 --- a/python/test/worker.py +++ b/python/test/worker.py @@ -1,6 +1,5 @@ import sys import os -from azure_functions_worker import main # Azure environment variables @@ -16,4 +15,10 @@ def add_script_root_to_sys_path(): if __name__ == '__main__': add_script_root_to_sys_path() - main.main() + minor_version = sys.version_info[1] + if minor_version < 13: + from azure_functions_worker import main + main.main() + else: + from proxy_worker import start_worker + start_worker.start() diff --git a/setup.cfg b/setup.cfg index 6f5a7fb98..2fe4fd01b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -3,7 +3,7 @@ addopts = --capture=no --assert=plain --strict --tb native testpaths = tests [mypy] -python_version = 3.6 +python_version = 3.13 check_untyped_defs = True warn_redundant_casts = True warn_unused_ignores = True @@ -18,3 +18,9 @@ ignore_errors = True [mypy-azure_functions_worker._thirdparty.typing_inspect] ignore_errors = True + +[mypy-proxy_worker.protos.*] +ignore_errors = True + +[mypy-proxy_worker._thirdparty.typing_inspect] +ignore_errors = True \ No newline at end of file diff --git a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings.py b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings.py index c527cb680..b8ced2834 100644 --- a/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings.py +++ b/tests/extension_tests/deferred_bindings_tests/test_deferred_bindings.py @@ -6,8 +6,9 @@ import azure.functions as func from tests.utils import testutils -from azure_functions_worker import protos -from azure_functions_worker.bindings import datumdef, meta +if sys.version_info.minor < 13: + from azure_functions_worker import protos + from azure_functions_worker.bindings import datumdef, meta # Even if the tests are skipped for <=3.8, the library is still imported as # it is used for these tests. @@ -42,6 +43,9 @@ def __init__(self, version: str, source: str, @unittest.skipIf(sys.version_info.minor <= 8, "The base extension" "is only supported for 3.9+.") +@unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+," + "this logic is in the" + "library worker.") class TestDeferredBindingsEnabled(testutils.AsyncTestCase): @testutils.retryable_test(3, 5) @@ -73,6 +77,9 @@ async def test_deferred_bindings_enabled_log(self): @unittest.skipIf(sys.version_info.minor <= 8, "The base extension" "is only supported for 3.9+.") +@unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+," + "this logic is in the" + "library worker.") class TestDeferredBindingsDisabled(testutils.AsyncTestCase): @testutils.retryable_test(3, 5) @@ -104,6 +111,9 @@ async def test_deferred_bindings_disabled_log(self): @unittest.skipIf(sys.version_info.minor <= 8, "The base extension" "is only supported for 3.9+.") +@unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+," + "this logic is in the" + "library worker.") class TestDeferredBindingsEnabledDual(testutils.AsyncTestCase): @testutils.retryable_test(3, 5) @@ -135,6 +145,9 @@ async def test_deferred_bindings_dual_enabled_log(self): @unittest.skipIf(sys.version_info.minor <= 8, "The base extension" "is only supported for 3.9+.") +@unittest.skipIf(sys.version_info.minor >= 13, "For python 3.13+," + "this logic is in the" + "library worker.") class TestDeferredBindingsHelpers(testutils.AsyncTestCase): def test_mbd_deferred_bindings_enabled_decode(self): diff --git a/tests/extension_tests/http_v2_tests/test_http_v2.py b/tests/extension_tests/http_v2_tests/test_http_v2.py index 8c1d5b48e..514633743 100644 --- a/tests/extension_tests/http_v2_tests/test_http_v2.py +++ b/tests/extension_tests/http_v2_tests/test_http_v2.py @@ -8,11 +8,16 @@ import requests from tests.utils import testutils -from azure_functions_worker.utils.common import is_envvar_true from tests.utils.constants import CONSUMPTION_DOCKER_TEST, DEDICATED_DOCKER_TEST +# This app setting is only present for Python < 3.13 from azure_functions_worker.constants import PYTHON_ENABLE_INIT_INDEXING +if sys.version_info.minor < 13: + from azure_functions_worker.utils.common import is_envvar_true +else: + from proxy_worker.utils.common import is_envvar_true + REQUEST_TIMEOUT_SEC = 5 diff --git a/tests/test_setup.py b/tests/test_setup.py index 6f9c3edd5..9b92446a6 100644 --- a/tests/test_setup.py +++ b/tests/test_setup.py @@ -27,7 +27,6 @@ import tempfile import urllib.request import zipfile -from distutils import dir_util from invoke import task @@ -38,6 +37,7 @@ WEBHOST_GITHUB_API = "https://api.github.com/repos/Azure/azure-functions-host" WEBHOST_GIT_REPO = "https://github.com/Azure/azure-functions-host/archive" WEBHOST_TAG_PREFIX = "v4." +WORKER_DIR = "azure_functions_worker" if sys.version_info.minor < 13 else "proxy_worker" def get_webhost_version() -> str: @@ -112,8 +112,8 @@ def compile_webhost(webhost_dir): subprocess.run( [ "dotnet", "build", "WebJobs.Script.sln", - "/m:1", # Disable parallel MSBuild - "/nodeReuse:false", # Prevent MSBuild node reuse + "/m:1", # Disable parallel MSBuild + "/nodeReuse:false", # Prevent MSBuild node reuse f"--property:OutputPath={webhost_dir}/bin", # Set output folder "/p:TreatWarningsAsErrors=false" ], @@ -134,10 +134,10 @@ def compile_webhost(webhost_dir): def gen_grpc(): - proto_root_dir = ROOT_DIR / "azure_functions_worker" / "protos" + proto_root_dir = ROOT_DIR / WORKER_DIR / "protos" proto_src_dir = proto_root_dir / "_src" / "src" / "proto" staging_root_dir = BUILD_DIR / "protos" - staging_dir = staging_root_dir / "azure_functions_worker" / "protos" + staging_dir = staging_root_dir / WORKER_DIR / "protos" built_protos_dir = BUILD_DIR / "built_protos" if os.path.exists(BUILD_DIR): @@ -159,12 +159,12 @@ def gen_grpc(): "-m", "grpc_tools.protoc", "-I", - os.sep.join(("azure_functions_worker", "protos")), + os.sep.join((WORKER_DIR, "protos")), "--python_out", str(built_protos_dir), "--grpc_python_out", str(built_protos_dir), - os.sep.join(("azure_functions_worker", "protos", proto)), + os.sep.join((WORKER_DIR, "protos", proto)), ], check=True, stdout=sys.stdout, @@ -183,8 +183,26 @@ def gen_grpc(): # Needed to support absolute imports in files. See # https://github.com/protocolbuffers/protobuf/issues/1491 make_absolute_imports(compiled_files) + copy_tree_merge(str(built_protos_dir), str(proto_root_dir)) - dir_util.copy_tree(str(built_protos_dir), str(proto_root_dir)) + +def copy_tree_merge(src, dst): + """ + Recursively copy all files and subdirectories from src to dst, + overwriting files if they already exist. This emulates what + distutils.dir_util.copy_tree did without removing existing directories. + """ + if not os.path.exists(dst): + os.makedirs(dst) + + for item in os.listdir(src): + s = os.path.join(src, item) + d = os.path.join(dst, item) + + if os.path.isdir(s): + copy_tree_merge(s, d) + else: + shutil.copy2(s, d) def make_absolute_imports(compiled_files): @@ -197,7 +215,7 @@ def make_absolute_imports(compiled_files): # from azure_functions_worker.protos import xxx_pb2 as.. p1 = re.sub( r"\nimport (.*?_pb2)", - r"\nfrom azure_functions_worker.protos import \g<1>", + fr"\nfrom {WORKER_DIR}.protos import \g<1>", content, ) # Convert lines of the form: @@ -205,7 +223,7 @@ def make_absolute_imports(compiled_files): # from azure_functions_worker.protos.identity import xxx_pb2.. p2 = re.sub( r"from ([a-z]*) (import.*_pb2)", - r"from azure_functions_worker.protos.\g<1> \g<2>", + fr"from {WORKER_DIR}.protos.\g<1> \g<2>", p1, ) f.write(p2) diff --git a/tests/unittest_proxy/test_dependency.py b/tests/unittest_proxy/test_dependency.py new file mode 100644 index 000000000..cea4c5af0 --- /dev/null +++ b/tests/unittest_proxy/test_dependency.py @@ -0,0 +1,65 @@ +import sys +import os +from unittest.mock import patch + +from proxy_worker.utils.dependency import DependencyManager + + +@patch("proxy_worker.utils.dependency.DependencyManager._get_cx_deps_path", + return_value="/mock/cx/site-packages") +@patch("proxy_worker.utils.dependency.DependencyManager._get_cx_working_dir", + return_value="/mock/cx") +@patch("proxy_worker.utils.dependency.DependencyManager._get_worker_deps_path", + return_value="/mock/worker") +@patch("proxy_worker.utils.dependency.logger") +def test_use_worker_dependencies(mock_logger, mock_worker, mock_cx_dir, mock_cx_deps): + sys.path = ["/mock/cx/site-packages", "/mock/cx", "/original"] + + DependencyManager.initialize() + DependencyManager.use_worker_dependencies() + + assert sys.path[0] == "/mock/worker" + assert "/mock/cx/site-packages" not in sys.path + assert "/mock/cx" not in sys.path + + mock_logger.info.assert_any_call( + 'Applying use_worker_dependencies:' + ' worker_dependencies: %s,' + ' customer_dependencies: %s,' + ' working_directory: %s', + "/mock/worker", "/mock/cx/site-packages", "/mock/cx" + ) + + +@patch("proxy_worker.utils.dependency.DependencyManager._get_cx_deps_path", + return_value="/mock/cx/site-packages") +@patch("proxy_worker.utils.dependency.DependencyManager._get_worker_deps_path", + return_value="/mock/worker") +@patch("proxy_worker.utils.dependency.DependencyManager._get_cx_working_dir", + return_value="/mock/cx") +@patch("proxy_worker.utils.dependency.DependencyManager.is_in_linux_consumption", + return_value=False) +@patch("proxy_worker.utils.dependency.is_envvar_true", return_value=False) +@patch("proxy_worker.utils.dependency.logger") +def test_prioritize_customer_dependencies(mock_logger, mock_env, mock_linux, + mock_cx_dir, mock_worker, mock_cx_deps): + sys.path = ["/mock/worker", "/some/old/path"] + + DependencyManager.initialize() + DependencyManager.prioritize_customer_dependencies("/override/cx") + + assert sys.path[0] == "/mock/cx/site-packages" + assert sys.path[1] == "/mock/worker" + expected_path = os.path.abspath("/override/cx") + assert expected_path in sys.path + + # Relaxed log validation: look for matching prefix + assert any( + "Applying prioritize_customer_dependencies" in str(call[0][0]) + for call in mock_logger.info.call_args_list + ) + + assert any( + "Finished prioritize_customer_dependencies" in str(call[0][0]) + for call in mock_logger.info.call_args_list + ) diff --git a/tests/unittest_proxy/test_dispatcher.py b/tests/unittest_proxy/test_dispatcher.py new file mode 100644 index 000000000..22c38fa0a --- /dev/null +++ b/tests/unittest_proxy/test_dispatcher.py @@ -0,0 +1,254 @@ +import asyncio +import builtins +import logging +import types +import unittest +from unittest.mock import Mock, patch, MagicMock, AsyncMock, ANY + +import pytest + +from proxy_worker.dispatcher import Dispatcher + + +class TestDispatcher(unittest.TestCase): + + @patch("proxy_worker.dispatcher.queue.Queue") + @patch("proxy_worker.dispatcher.threading.Thread") + def test_dispatcher_initialization(self, mock_thread, mock_queue): + # Arrange + mock_loop = Mock() + mock_future = Mock() + mock_loop.create_future.return_value = mock_future + + # Act + dispatcher = Dispatcher( + loop=mock_loop, + host="127.0.0.1", + port=7070, + worker_id="worker123", + request_id="req456", + grpc_connect_timeout=5.0, + grpc_max_msg_len=1024 + ) + + # Assert + self.assertEqual(dispatcher._host, "127.0.0.1") + self.assertEqual(dispatcher._port, 7070) + self.assertEqual(dispatcher._worker_id, "worker123") + self.assertEqual(dispatcher._request_id, "req456") + self.assertEqual(dispatcher._grpc_connect_timeout, 5.0) + self.assertEqual(dispatcher._grpc_max_msg_len, 1024) + self.assertEqual(dispatcher._grpc_connected_fut, mock_future) + mock_queue.assert_called_once() + mock_thread.assert_called_once() + + @patch("proxy_worker.dispatcher.protos.StreamingMessage") + @patch("proxy_worker.dispatcher.protos.RpcLog") + @patch("proxy_worker.dispatcher.is_system_log_category") + def test_on_logging_levels_and_categories(self, mock_is_system, mock_rpc_log, + mock_streaming_message): + loop = Mock() + dispatcher = Dispatcher(loop, "localhost", 5000, "worker", + "req", 5.0) + + mock_rpc_log.return_value = Mock() + mock_streaming_message.return_value = Mock() + + levels = [ + (logging.CRITICAL, mock_rpc_log.Critical), + (logging.ERROR, mock_rpc_log.Error), + (logging.WARNING, mock_rpc_log.Warning), + (logging.INFO, mock_rpc_log.Information), + (logging.DEBUG, mock_rpc_log.Debug), + (5, getattr(mock_rpc_log, 'None')), + ] + + for level, expected in levels: + record = Mock(levelno=level, name="custom.logger") + mock_is_system.return_value = level % 2 == 0 # alternate True/False + dispatcher.on_logging(record, "Test message") + + if mock_is_system.return_value: + mock_rpc_log.RpcLogCategory.Value.assert_called_with("System") + else: + mock_rpc_log.RpcLogCategory.Value.assert_called_with("User") + + +def fake_import(name, globals=None, locals=None, fromlist=(), level=0): + mock_module = types.SimpleNamespace(__file__=f"{name}.py") + mock_module.worker_init_request = AsyncMock(return_value="fake_response") + mock_module.function_environment_reload_request = AsyncMock( + return_value="mocked_env_reload_response") + if name in ["azure_functions_worker_v2", "azure_functions_worker_v1"]: + return mock_module + return builtins.__import__(name, globals, locals, fromlist, level) + + +@patch("proxy_worker.dispatcher.DependencyManager.should_load_cx_dependencies", + return_value=True) +@patch("proxy_worker.dispatcher.DependencyManager.prioritize_customer_dependencies") +@patch("proxy_worker.dispatcher.logger") +@patch("proxy_worker.dispatcher.os.path.exists", + side_effect=lambda p: p.endswith("function_app.py")) +@patch("builtins.__import__", side_effect=fake_import) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_streaming_response") +@pytest.mark.asyncio +async def test_worker_init_v2_import( + mock_streaming, mock_import, mock_exists, mock_logger, mock_prioritize, + mock_should_load +): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, "worker123", + "req789", 5.0) + request = MagicMock() + request.worker_init_request.function_app_directory = "/home/site/wwwroot" + + result = await dispatcher._handle__worker_init_request(request) + + assert result == "mocked_streaming_response" + mock_logger.debug.assert_any_call("azure_functions_worker_v2 import succeeded: %s", + ANY) + + +@patch("proxy_worker.dispatcher.DependencyManager.should_load_cx_dependencies", + return_value=True) +@patch("proxy_worker.dispatcher.DependencyManager.prioritize_customer_dependencies") +@patch("proxy_worker.dispatcher.logger") +@patch("proxy_worker.dispatcher.os.path.exists", side_effect=lambda p: False) +@patch("builtins.__import__", side_effect=fake_import) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_streaming_response") +@pytest.mark.asyncio +async def test_worker_init_fallback_to_v1( + mock_streaming, mock_import, mock_exists, mock_logger, mock_prioritize, + mock_should_load +): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, "worker123", + "req789", 5.0) + request = MagicMock() + request.worker_init_request.function_app_directory = "/home/site/wwwroot" + + result = await dispatcher._handle__worker_init_request(request) + + assert result == "mocked_streaming_response" + mock_logger.debug.assert_any_call("azure_functions_worker_v1 import succeeded: %s", + ANY) + + +@patch("proxy_worker.dispatcher.DependencyManager.prioritize_customer_dependencies") +@patch("proxy_worker.dispatcher.logger") +@patch("proxy_worker.dispatcher.os.path.exists", + side_effect=lambda p: p.endswith("function_app.py")) +@patch("builtins.__import__", side_effect=fake_import) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_reload_response") +@pytest.mark.asyncio +async def test_function_environment_reload_v2_import( + mock_streaming, mock_import, mock_exists, mock_logger, mock_prioritize +): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, + "worker123", "req789", 5.0) + request = MagicMock() + request.function_environment_reload_request.function_app_directory = \ + "/home/site/wwwroot" + + result = await dispatcher._handle__function_environment_reload_request(request) + + assert result == "mocked_reload_response" + mock_logger.debug.assert_any_call("azure_functions_worker_v2 import succeeded: %s", + ANY) + + +@patch("proxy_worker.dispatcher.DependencyManager.prioritize_customer_dependencies") +@patch("proxy_worker.dispatcher.logger") +@patch("proxy_worker.dispatcher.os.path.exists", side_effect=lambda p: False) +@patch("builtins.__import__", side_effect=fake_import) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_reload_response") +@pytest.mark.asyncio +async def test_function_environment_reload_fallback_to_v1( + mock_streaming, mock_import, mock_exists, mock_logger, mock_prioritize +): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, "worker123", + "req789", 5.0) + request = MagicMock() + request.function_environment_reload_request.function_app_directory = "/some/path" + + result = await dispatcher._handle__function_environment_reload_request(request) + + assert result == "mocked_reload_response" + mock_logger.debug.assert_any_call("azure_functions_worker_v1 import succeeded: %s", + ANY) + + +@patch("proxy_worker.dispatcher._library_worker", + new=MagicMock( + functions_metadata_request=AsyncMock(return_value="mocked_meta_resp"))) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_response") +@patch("proxy_worker.dispatcher.logger") +@pytest.mark.asyncio +async def test_handle_functions_metadata_request(mock_logger, mock_streaming): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, "worker123", + "req789", 5.0) + request = MagicMock() + request.request_id = "req789" + + result = await dispatcher._handle__functions_metadata_request(request) + + assert result == "mocked_response" + mock_logger.info.assert_called_with( + 'Received WorkerMetadataRequest, request ID %s, worker id: %s', + "req789", "worker123" + ) + + +@patch("proxy_worker.dispatcher._library_worker", + new=MagicMock( + function_load_request=AsyncMock(return_value="mocked_load_response"))) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_stream_response") +@patch("proxy_worker.dispatcher.logger") +@pytest.mark.asyncio +async def test_handle_function_load_request(mock_logger, mock_streaming): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, "worker123", + "req789", 5.0) + + request = MagicMock() + request.function_load_request.function_id = "func123" + request.function_load_request.metadata.name = "hello_function" + request.request_id = "req789" + + result = await dispatcher._handle__function_load_request(request) + + assert result == "mocked_stream_response" + mock_logger.info.assert_called_with( + 'Received WorkerLoadRequest, request ID %s, function_id: %s,function_name: %s, ' + 'worker_id: %s', "req789", "func123", "hello_function", "worker123" + ) + + +@patch("proxy_worker.dispatcher._library_worker", + new=MagicMock( + invocation_request=AsyncMock(return_value="mocked_invoc_response"))) +@patch("proxy_worker.dispatcher.protos.StreamingMessage", + return_value="mocked_streaming_response") +@patch("proxy_worker.dispatcher.logger") +@pytest.mark.asyncio +async def test_handle_invocation_request(mock_logger, mock_streaming): + dispatcher = Dispatcher(asyncio.get_event_loop(), "localhost", 7071, "worker123", + "req789", 5.0) + + request = MagicMock() + request.invocation_request.invocation_id = "inv123" + request.invocation_request.function_id = "func123" + request.request_id = "req789" + + result = await dispatcher._handle__invocation_request(request) + + assert result == "mocked_streaming_response" + mock_logger.info.assert_called_with( + 'Received FunctionInvocationRequest, request ID %s, function_id: %s,' + 'invocation_id: %s, worker_id: %s', + "req789", "func123", "inv123", "worker123" + ) diff --git a/tests/unittests/test_code_quality.py b/tests/unittests/test_code_quality.py index 54d1cc725..499ed577d 100644 --- a/tests/unittests/test_code_quality.py +++ b/tests/unittests/test_code_quality.py @@ -43,7 +43,8 @@ def test_flake8(self): try: subprocess.run( - [sys.executable, '-m', 'flake8', '--config', str(config_path)], + [sys.executable, '-m', 'flake8', '--config', str(config_path), + 'azure_functions_worker',], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, diff --git a/tests/utils/testutils.py b/tests/utils/testutils.py index c04b134c5..f90bd3258 100644 --- a/tests/utils/testutils.py +++ b/tests/utils/testutils.py @@ -50,18 +50,22 @@ WebHostDedicated, ) -from azure_functions_worker import dispatcher, protos -from azure_functions_worker.bindings.shared_memory_data_transfer import ( - FileAccessorFactory, -) -from azure_functions_worker.bindings.shared_memory_data_transfer import ( - SharedMemoryConstants as consts, -) -from azure_functions_worker.constants import ( - FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED, - UNIX_SHARED_MEMORY_DIRECTORIES, -) -from azure_functions_worker.utils.common import get_app_setting, is_envvar_true +if sys.version_info.minor < 13: + from azure_functions_worker import dispatcher, protos + from azure_functions_worker.bindings.shared_memory_data_transfer import ( + FileAccessorFactory, + ) + from azure_functions_worker.bindings.shared_memory_data_transfer import ( + SharedMemoryConstants as consts, + ) + from azure_functions_worker.constants import ( + FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED, + UNIX_SHARED_MEMORY_DIRECTORIES, + ) + from azure_functions_worker.utils.common import get_app_setting, is_envvar_true +else: + from proxy_worker import dispatcher, protos + from proxy_worker.utils.common import get_app_setting, is_envvar_true TESTS_ROOT = PROJECT_ROOT / 'tests' E2E_TESTS_FOLDER = pathlib.Path('endtoend') @@ -71,9 +75,7 @@ EMULATOR_TESTS_FOLDER = pathlib.Path('emulator_tests') EXTENSION_TESTS_FOLDER = pathlib.Path('extension_tests') WEBHOST_DLL = "Microsoft.Azure.WebJobs.Script.WebHost.dll" -DEFAULT_WEBHOST_DLL_PATH = ( - PROJECT_ROOT / 'build' / 'webhost' / 'bin' / WEBHOST_DLL -) +DEFAULT_WEBHOST_DLL_PATH = (PROJECT_ROOT / 'build' / 'webhost' / 'bin' / WEBHOST_DLL) EXTENSIONS_PATH = PROJECT_ROOT / 'build' / 'extensions' / 'bin' FUNCS_PATH = TESTS_ROOT / UNIT_TESTS_FOLDER / 'http_functions' WORKER_PATH = PROJECT_ROOT / 'python' / 'test' @@ -321,124 +323,126 @@ def _run_test(self, test, *args, **kwargs): raise test_exception -class SharedMemoryTestCase(unittest.TestCase): - """ - For tests involving shared memory data transfer usage. - """ - - def setUp(self): - self.was_shmem_env_true = is_envvar_true( - FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED) - os.environ.update( - {FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED: '1'}) - - os_name = platform.system() - if os_name == 'Darwin': - # If an existing AppSetting is specified, save it so it can be - # restored later - self.was_shmem_dirs = get_app_setting( - UNIX_SHARED_MEMORY_DIRECTORIES - ) - self._setUpDarwin() - elif os_name == 'Linux': - self._setUpLinux() - self.file_accessor = FileAccessorFactory.create_file_accessor() - - def tearDown(self): - os_name = platform.system() - if os_name == 'Darwin': - self._tearDownDarwin() - if self.was_shmem_dirs is not None: - # If an AppSetting was set before the tests ran, restore it back - os.environ.update( - {UNIX_SHARED_MEMORY_DIRECTORIES: self.was_shmem_dirs}) - elif os_name == 'Linux': - self._tearDownLinux() - - if not self.was_shmem_env_true: - os.environ.update( - {FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED: '0'}) - - def get_new_mem_map_name(self): - return str(uuid.uuid4()) - - def get_random_bytes(self, num_bytes): - return bytearray(random.getrandbits(8) for _ in range(num_bytes)) - - def get_random_string(self, num_chars): - return ''.join(random.choices(string.ascii_uppercase + string.digits, - k=num_chars)) - - def is_valid_uuid(self, uuid_to_test: str, version: int = 4) -> bool: +# This is not supported in 3.13+ +if sys.version_info.minor < 13: + class SharedMemoryTestCase(unittest.TestCase): """ - Check if uuid_to_test is a valid UUID. - Reference: https://stackoverflow.com/a/33245493/3132415 + For tests involving shared memory data transfer usage. """ - try: - uuid_obj = uuid.UUID(uuid_to_test, version=version) - except ValueError: - return False - return str(uuid_obj) == uuid_to_test - def _createSharedMemoryDirectories(self, directories): - for temp_dir in directories: - temp_dir_path = os.path.join(temp_dir, consts.UNIX_TEMP_DIR_SUFFIX) - if not os.path.exists(temp_dir_path): - os.makedirs(temp_dir_path) + def setUp(self): + self.was_shmem_env_true = is_envvar_true( + FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED) + os.environ.update( + {FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED: '1'}) + + os_name = platform.system() + if os_name == 'Darwin': + # If an existing AppSetting is specified, save it so it can be + # restored later + self.was_shmem_dirs = get_app_setting( + UNIX_SHARED_MEMORY_DIRECTORIES + ) + self._setUpDarwin() + elif os_name == 'Linux': + self._setUpLinux() + self.file_accessor = FileAccessorFactory.create_file_accessor() + + def tearDown(self): + os_name = platform.system() + if os_name == 'Darwin': + self._tearDownDarwin() + if self.was_shmem_dirs is not None: + # If an AppSetting was set before the tests ran, restore it back + os.environ.update( + {UNIX_SHARED_MEMORY_DIRECTORIES: self.was_shmem_dirs}) + elif os_name == 'Linux': + self._tearDownLinux() + + if not self.was_shmem_env_true: + os.environ.update( + {FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED: '0'}) - def _deleteSharedMemoryDirectories(self, directories): - for temp_dir in directories: - temp_dir_path = os.path.join(temp_dir, consts.UNIX_TEMP_DIR_SUFFIX) - shutil.rmtree(temp_dir_path) + def get_new_mem_map_name(self): + return str(uuid.uuid4()) - def _setUpLinux(self): - self._createSharedMemoryDirectories(consts.UNIX_TEMP_DIRS) + def get_random_bytes(self, num_bytes): + return bytearray(random.getrandbits(8) for _ in range(num_bytes)) - def _tearDownLinux(self): - self._deleteSharedMemoryDirectories(consts.UNIX_TEMP_DIRS) + def get_random_string(self, num_chars): + return ''.join(random.choices(string.ascii_uppercase + string.digits, + k=num_chars)) - def _setUpDarwin(self): - """ - Create a RAM disk on macOS. - Ref: https://stackoverflow.com/a/2033417/3132415 - """ - size_in_mb = consts.MAX_BYTES_FOR_SHARED_MEM_TRANSFER / (1024 * 1024) - size = 2048 * size_in_mb - # The following command returns the name of the created disk - cmd = ['hdiutil', 'attach', '-nomount', f'ram://{size}'] - result = subprocess.run(cmd, stdout=subprocess.PIPE) - if result.returncode != 0: - raise IOError(f'Cannot create ram disk with command: {cmd} - ' - f'{result.stdout} - {result.stderr}') - disk_name = result.stdout.strip().decode() - # We create a volume on the disk created above and mount it - volume_name = 'shm' - cmd = ['diskutil', 'eraseVolume', 'HFS+', volume_name, disk_name] - result = subprocess.run(cmd, stdout=subprocess.PIPE) - if result.returncode != 0: - raise IOError(f'Cannot create volume with command: {cmd} - ' - f'{result.stdout} - {result.stderr}') - directory = f'/Volumes/{volume_name}' - self.created_directories = [directory] - # Create directories in the volume for shared memory maps - self._createSharedMemoryDirectories(self.created_directories) - # Override the AppSetting for the duration of this test so the - # FileAccessorUnix can use these directories for creating memory maps - os.environ.update( - {UNIX_SHARED_MEMORY_DIRECTORIES: ','.join(self.created_directories)} - ) + def is_valid_uuid(self, uuid_to_test: str, version: int = 4) -> bool: + """ + Check if uuid_to_test is a valid UUID. + Reference: https://stackoverflow.com/a/33245493/3132415 + """ + try: + uuid_obj = uuid.UUID(uuid_to_test, version=version) + except ValueError: + return False + return str(uuid_obj) == uuid_to_test + + def _createSharedMemoryDirectories(self, directories): + for temp_dir in directories: + temp_dir_path = os.path.join(temp_dir, consts.UNIX_TEMP_DIR_SUFFIX) + if not os.path.exists(temp_dir_path): + os.makedirs(temp_dir_path) + + def _deleteSharedMemoryDirectories(self, directories): + for temp_dir in directories: + temp_dir_path = os.path.join(temp_dir, consts.UNIX_TEMP_DIR_SUFFIX) + shutil.rmtree(temp_dir_path) + + def _setUpLinux(self): + self._createSharedMemoryDirectories(consts.UNIX_TEMP_DIRS) + + def _tearDownLinux(self): + self._deleteSharedMemoryDirectories(consts.UNIX_TEMP_DIRS) + + def _setUpDarwin(self): + """ + Create a RAM disk on macOS. + Ref: https://stackoverflow.com/a/2033417/3132415 + """ + size_in_mb = consts.MAX_BYTES_FOR_SHARED_MEM_TRANSFER / (1024 * 1024) + size = 2048 * size_in_mb + # The following command returns the name of the created disk + cmd = ['hdiutil', 'attach', '-nomount', f'ram://{size}'] + result = subprocess.run(cmd, stdout=subprocess.PIPE) + if result.returncode != 0: + raise IOError(f'Cannot create ram disk with command: {cmd} - ' + f'{result.stdout} - {result.stderr}') + disk_name = result.stdout.strip().decode() + # We create a volume on the disk created above and mount it + volume_name = 'shm' + cmd = ['diskutil', 'eraseVolume', 'HFS+', volume_name, disk_name] + result = subprocess.run(cmd, stdout=subprocess.PIPE) + if result.returncode != 0: + raise IOError(f'Cannot create volume with command: {cmd} - ' + f'{result.stdout} - {result.stderr}') + directory = f'/Volumes/{volume_name}' + self.created_directories = [directory] + # Create directories in the volume for shared memory maps + self._createSharedMemoryDirectories(self.created_directories) + # Override the AppSetting for the duration of this test so the + # FileAccessorUnix can use these directories for creating memory maps + os.environ.update( + {UNIX_SHARED_MEMORY_DIRECTORIES: ','.join(self.created_directories)} + ) - def _tearDownDarwin(self): - # Delete the directories containing shared memory maps - self._deleteSharedMemoryDirectories(self.created_directories) - # Unmount the volume used for shared memory maps - volume_name = 'shm' - cmd = f"find /Volumes -type d -name '{volume_name}*' -print0 " \ - "| xargs -0 umount -f" - result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True) - if result.returncode != 0: - raise IOError(f'Cannot delete volume with command: {cmd} - ' - f'{result.stdout} - {result.stderr}') + def _tearDownDarwin(self): + # Delete the directories containing shared memory maps + self._deleteSharedMemoryDirectories(self.created_directories) + # Unmount the volume used for shared memory maps + volume_name = 'shm' + cmd = f"find /Volumes -type d -name '{volume_name}*' -print0 " \ + "| xargs -0 umount -f" + result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True) + if result.returncode != 0: + raise IOError(f'Cannot delete volume with command: {cmd} - ' + f'{result.stdout} - {result.stderr}') class _MockWebHostServicer(protos.FunctionRpcServicer):