Skip to content

Where to put xray_recorder.configure(...AsyncContext()) in AWS Lambda functions? #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
xjia1 opened this issue Feb 18, 2020 · 17 comments
Closed
Labels

Comments

@xjia1
Copy link

xjia1 commented Feb 18, 2020

If running inside AWS Lambda, and the function is using asyncio so I need

xray_recorder.configure(service='...', context=AsyncContext())

However if I put it in the global, or in my lambda handler, I always get errors like

cannot find the current segment/subsegment, please make sure you have a segment open
No segment found, cannot begin subsegment

I think it's because Lambda has its own segment created, presumably it's the LambdaContext defined in aws_xray_sdk/core/lambda_launcher.py?

So how should I configure it properly so that @xray_recorder.capture_async can work?

@willarmiros
Copy link
Contributor

However if I put it in the global, or in my lambda handler, I always get errors like

Are you saying that merely adding the line to configure the xray_recorder causes those errors? Or is @xray_recorder.capture_async causing the errors and that configuration is just failing to fix them?

What function are you decorating with @xray_recorder.capture_async? If you are decorating the Lambda function handler, that might be causing those errors since a segment is not generated until the beginning of the handler function.

In response to your question, the line

xray_recorder.configure(service='...', context=AsyncContext())

should be able to be included globally or in the lambda handler, I don't necessarily think it's causing the segment not found errors.

@xjia1
Copy link
Author

xjia1 commented Mar 7, 2020

Please take a look at https://github.com/stfairy/repro-xray-issue to reproduce this issue.

@willarmiros
Copy link
Contributor

Hi @stfairy,
Thank you for the repro code. After doing a little deep dive, I realized the root cause of the error is because setting the context to async_context actually overrides the default lambda_context that must be present in Lambda environments for X-Ray to work. So actually just removing the context=AsyncContext() configuration parameter and keeping everything else the same will record a subsegment for the download of each file.

That being said, this is clearly a bug to not be able to support AsyncContext and LambdaContext at once, since they are not mutually exclusive. Also, even though removing the AsyncContext works in this simple example, it will likely cause issues with more complex applications of aiohttp, we should prioritize a fix soon.

@heitorlessa
Copy link

Do you actually need that to use AsyncContext Lambda? I'm trying to learn more about Async and X-Ray operations so this might be a dummy question

In my tests, I simply got aws_xray_sdk.core.xray_recorder and all of its operations worked just fine - aioboto, aiohttp, etc.

Code exerpt run in Python3.7 runtime

import asyncio
import json

import aws_xray_sdk.core
from aws_xray_sdk.ext.aiohttp.client import aws_xray_trace_config

import aiohttp

recorder = aws_xray_sdk.core.xray_recorder
...


async def aiohttp_task():
    async with aiohttp.ClientSession(trace_configs=[aws_xray_trace_config()]) as session:
        async with session.get("https://httpbin.org/json") as resp:
            resp = await resp.json()
            print(resp)
            return resp

def lambda_handler(evt, ctx):
    ret = asyncio.run(aiohttp_task())
    return {
        "statusCode": 200,
        "body": json.dumps(ret)
    }

@srprash
Copy link
Contributor

srprash commented May 19, 2020

Hi @heitorlessa
The AsyncContext provides overriding methods for TaskFactory and some custom logic for TaskLocalStorage. Not having the AsyncContext in an async program using X-Ray may result in unintended behavior while dealing with multiple tasks and reference management. We are still investigating this issue.

@wojciech-dabrowski
Copy link

@willarmiros Any update on this? It prevents from using X-Ray with async code, which sounds ridiculous :)

@NathanielRN
Copy link
Contributor

Hi @stfairy !

Why the error occurs

In your reproduction code you first globally configure the recorder using xray_recorder.configure(service='repro_xray_issue', context=AsynContext()). However, because there is no event loop available on initialization, AsyncContext() will get a new event loop from the asyncio library:

def __init__(self, *args, loop=None, use_task_factory=True, **kwargs):
super(AsyncContext, self).__init__(*args, **kwargs)
self._loop = loop
if loop is None:
self._loop = asyncio.get_event_loop()

The issue is that the loop is not set as the running loop even though it is created. Later, when you use asyncio.run(my_async_func()) to start your running your async code, asyncio will create its own loop and set it as the running loop.

In asyncio:

def run(main, *, debug=None):
# ...
    loop = events.new_event_loop()
        try:
            events.set_event_loop(loop)
# ...

Finally, when the @xray_recorder.capture_async('## name') decorator tries to create the subsegment, it will use the AsyncContext._loop variable which does not equal ≠ the loop that asyncio created! This is important becauseAsyncContext will use its _loop variable to add subsegment information.

if _GTE_PY37:
task = asyncio.current_task(loop=self._loop)
else:
task = asyncio.Task.current_task(loop=self._loop)
if task is None:
return None

AsyncContext expects that its _loop has a task with the current segment. But because the loops aren't the same, no such segment exists on _loop, and you get a No segment found, cannot begin subsegment error.

Solutions

Using AsyncContext

One solution is to put xray_recorder.configure(...AsyncContext() after asyncio.run() but before your async function continues:

async def download_files():
    xray_recorder.configure(service='repro_xray_issue', context=AsyncContext())
    async with xray_recorder.in_segment_async('my_segment_name') as segment:
        # ...

asyncio.run(download_files())

This will let AsyncContext use the currently running loop set by asyncio as its own _loop and add subsegments successfully.

Notice that you must still create your own my_segment_name. This will work in Lambda. However, notice that this will replace the Lambda context resulting in 2 separate traces because there are 2 segments.

image

image

This solution is important when you use async code to have multiple segments. The AsyncContext will make sure that new custom subsegments make it to the segment associated with the current task (as explained above) which could be one of many concurrent tasks.

If you are not running on Lambda, this is most likely the solution you need.

Without AsyncContext

As was pointed out before, if you simply remove the context=AsyncContext() configuration you will see the following traces in X-Ray:

image

Here there is only 1 segment (the one created by LambdaContext in Lambda environments) and the async calls become subsegments of that call.

In this case, any custom subsegments will go under the 1 segment. Since a LambdaContext does not allow you to create new segments, all your subsegments would go under that segment.

Conclusion

I hope that helps and please feel free to ask anymore questions! What helped me discover this behavior was understanding how the event loops are used by asyncio and aws-xray-sdk and looking at the starter code we have on the README.md

@wojciech-dabrowski Please let me know if this explanation helped you! 🙂

@abivolmv
Copy link

abivolmv commented Jul 9, 2021

Hi ,
these solutions will not work if we use asyncio.gather() or asyncio.wait(). There is a longer discussion about it here

@NathanielRN
Copy link
Contributor

Hi @abivolmv,

Sorry to hear it's not working, would you be able to provide simple reproduction code to better describe your issue?

For example, I noticed that in the original description of that issue you posted, the user is trying to configure xray_recorder globally before they call asyncio.gather.

I'm guessing that this would cause a similar problem to the one I pointed out above, because asyncio.gather, like asyncio.run will create and begin its own loop which the AsynContext() method is not aware of because it also created its own loop which differs from asyncio.gather's loop.

In /usr/local/Cellar/[email protected]/3.9.5/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py

def gather(*coros_or_futures, loop=None, return_exceptions=False):
    ...
    if not coros_or_futures:
        if loop is None:
            # (NathanielRN): No running loop, so it makes its own which is
            # not seen from previously instantiated `AsyncContext`!
            loop = events.get_event_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)

Hopefully there's a way you can can call the asyncio.wait() method first and then configure the AsyncContext so that the problem can be solved. Let me know if this helps! 🙂

@abivolmv
Copy link

abivolmv commented Jul 9, 2021

Hi @NathanielRN , thanks for the quick reply.
I actually use .run and then .wait. So I tried to put in-between the xray_recorder.configure - is that right ?

I tried both solutions from above and either I see no segments and have the issue mentioned in the current description or it fails with the error mentioned in the linked issue.

Some sample code would be as follows (a minimalistic state of my code):

import asyncio
from io import BytesIO

import aioboto3
from aws_xray_sdk.core import xray_recorder, patch_all
from aws_xray_sdk.core.async_context import AsyncContext

# here I tried adding your solution (not added when solution to use AsyncContext() is used below)
# xray_recorder.configure(service='repro_xray_issue')  # uncomment this for second solution trial

patch_all()


def lambda_handler():
    asyncio.run(main())


async def main():
    # here I tried adding your solution (not added when solution to remove AsyncContext() is used above)
    xray_recorder.configure(service='repro_xray_issue', context=AsyncContext())    # comment this for second solution trial
    async with xray_recorder.in_segment_async('my_segment_name') as segment:  # comment this for second solution trial
        filelike1 = BytesIO()
        filelike2 = BytesIO()
        res1, res2 = await asyncio.gather(s3_get(filelike1), s3_get(filelike2))  # .wait() doesn't work either


@xray_recorder.capture_async('s3_get')  # tried with this and without also
async def s3_get(filelike):
    async with aioboto3.Session().client('s3') as s3:
        return await s3.download_fileobj('s3-bucket', 'test.txt', filelike)

if __name__ == '__main__':
    lambda_handler()

First solution will fail with error and second will not display the S3 segments in xray
If I try with :

await s3_get(filelike1)
await s3_get(filelike2)

then it works and I also see the segments in the trace. But I would like to run these calls concurrently.

@NathanielRN
Copy link
Contributor

@abivolmv Thanks for providing that repro code! Actually, you understood my post exactly and this is the code I had in mind 😄

I copied your code exactly and ran it as python3 repro_code.py. However, I actually do see the traces as I expect in X-Ray:

image

Notice that unfortunately they do render as subsegments of subsegments instead of subsegments of the segment, but that's what we expect. You can still see them running concurrently in this image.

I am running the demon locally as explained in the docs if perhaps that can be compared with how you are running it?

Finally I am using:

  • Python 3.9
  • aws-xray-sdk 2.8

If you see anything you are doing differently please call it out! Hopefully this helps 🙂

@abivolmv
Copy link

@NathanielRN I will look deeper in my code (Python 3.7, aws-xray-sdk 2.8). But in your screenshot I do not see the S3 nodes in the trace map - is that also a downside of using this solution ?

@NathanielRN
Copy link
Contributor

@abivolmv Sounds good let me know how it goes! I looked into it and it looks like we do support it 🙂 Our public AWS X-Ray docs explain how to trace the context for asynchronous work. (Note that some Async libraries like aiohttp are not supported!)

So I just added this to your code:

libraries = ['aioboto3']
patch(libraries)

then I got S3 in the service map

image

@abivolmv
Copy link

abivolmv commented Jul 14, 2021

@NathanielRN
I see the S3 segments now too. Shouldn't patch_all() patch also aiobotocore ?
Also, I noticed sometimes this works and sometimes it errors (without changing anything in the code and just hitting test button in Lambda). I took my example directly to AWS Lambda (python 3.7) to remove any doubts from my local setup. If I remove the .gather and just use :

await s3_get(filelike1)
await s3_get(filelike2)

then there are no errors ever.

import asyncio
from io import BytesIO

import aioboto3
from aws_xray_sdk.core import xray_recorder, patch_all
from aws_xray_sdk.core.async_context import AsyncContext

# here I tried adding your solution (not added when solution to use AsyncContext() is used below)
# xray_recorder.configure(service='repro_xray_issue')  # uncomment this for second solution trial

patch_all()


def lambda_handler(a,b):
    asyncio.run(main())


async def main():
    # here I tried adding your solution (not added when solution to remove AsyncContext() is used above)
    xray_recorder.configure(service='repro_xray_issue', context=AsyncContext())    # comment this for second solution trial
    async with xray_recorder.in_segment_async('my_segment_name') as segment:  # comment this for second solution trial
        filelike1 = BytesIO()
        filelike2 = BytesIO()
        res1, res2 = await asyncio.gather(s3_get(filelike1), s3_get(filelike2))  # .wait() doesn't work either


@xray_recorder.capture_async('s3_get')  # tried with this and without also
async def s3_get(filelike):
    async with aioboto3.Session().client('s3') as s3:
        return await s3.download_fileobj('s3-validation-files-003', 'test.txt', filelike)

When it works:

START RequestId: 28f14f73-5cba-4b5d-9a94-ecc36af15e98 Version: $LATEST
END RequestId: 28f14f73-5cba-4b5d-9a94-ecc36af15e98
REPORT RequestId: 28f14f73-5cba-4b5d-9a94-ecc36af15e98 Duration: 4323.84 ms Billed Duration: 4324 ms Memory Size: 128 MB Max Memory Used: 112 MB Init Duration: 1069.86 ms
XRAY TraceId: SegmentId: Sampled: true

When it doesn't:

START RequestId: 2a1b8ac0-ef8b-4e3c-a3b4-b89b057524d4 Version: $LATEST
[ERROR] AlreadyEndedException: Already ended segment and subsegment cannot be modified.
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 15, in lambda_handler
    asyncio.run(main())
  File "/var/lang/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/var/lang/lib/python3.7/asyncio/base_events.py", line 587, in run_until_complete
    return future.result()
  File "/var/task/lambda_function.py", line 24, in main
    res1, res2 = await asyncio.gather(s3_get(filelike1), s3_get(filelike2)) # .wait() doesn't work either
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/core/async_recorder.py", line 33, in call
    meta_processor=None,
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/core/async_recorder.py", line 82, in record_subsegment_async
    return_value = await wrapped(*args, **kwargs)
  File "/var/task/lambda_function.py", line 30, in s3_get
    return await s3.download_fileobj('s3-validation-files-003', 'test.txt', filelike)
  File "/opt/python/lib/python3.7/site-packages/aioboto3/s3/inject.py", line 107, in download_fileobj
    resp = await self.get_object(Bucket=Bucket, Key=Key, **ExtraArgs)
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
    stack=stack,
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/core/models/entity.py", line 106, in put_http_meta
    self._check_ended()
  File "/opt/python/lib/python3.7/site-packages/aws_xray_sdk/core/models/entity.py", line 302, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
END RequestId: 2a1b8ac0-ef8b-4e3c-a3b4-b89b057524d4
REPORT RequestId: 2a1b8ac0-ef8b-4e3c-a3b4-b89b057524d4 Duration: 4048.71 ms Billed Duration: 4049 ms Memory Size: 128 MB Max Memory Used: 113 MB Init Duration: 1005.49 ms
XRAY TraceId: SegmentId: Sampled: true

@NathanielRN
Copy link
Contributor

@abivolmv Sorry for the delay in getting back to you! Thanks for providing that code, I actually had a chance to try your code out and I found that you're right it inconsistently fails and succeeds 😓

Theory of what went wrong

In my investigations I suspected that it might have something to do with our Async ContextManager:

async def __aexit__(self, exc_type, exc_val, exc_tb):
return self.__exit__(exc_type, exc_val, exc_tb)

I did some digging around and found that PEP 492 says that the __aexit__ function should be returning an "awaitable" in our return function, but I don't know that the code above necessarily does that.

I managed to avoid the "Traceback Exception" by changing the code like this:

class AsyncSegmentContextManager(SegmentContextManager):
    async def __aenter__(self):
        return self.__enter__()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        async def nested():
            self.__exit__(exc_type, exc_val, exc_tb)

        task = await asyncio.gather(nested())

        return task

This way the traces show up on X-Ray but they are blank with no data in them! It seems like the code silently fails in this case instead 😕

I think our team would need time to understand just how asyncio works. Specifically, I'm suspicious that we need to find a way to "schedule the self.exit task" on the asyncio loop instead of running it immediately (so that the previous async calls using aioboto3 have a chance to finish).

Work arounds

However, I noticed that if I do not patch aiobotocore, I don't get an exception. This is contrary to my expectations because I thought @xray_recorder.capture_async( would still cause the above AsyncSegmentContextManager code path to execute and since I thought AsyncSegmentContextManager has the mistake I'm surprised the error does not show up 😕

Either way, the custom s3_get traces do show up in X-Ray if you don't patch aioboto3 (which by extension patches aiobotocore).

Action Items

We are tracking this issue internally and once we have a chance to investigate we will get back to you! Since the question is more about using asyncio with Lambda instead of AsyncContext as this issue originally asked, would you mind creating a new issue that links back here so that we can track it separately?

Thanks for your help with this 🙂

@parth-chudasama
Copy link

Hi, is there any update on this issue?
I am using asyncio and aiohttp in Lambda, currently I am able to get separate traces for the requests I make, however it does not work as a part of lambda trace

@NathanielRN
Copy link
Contributor

Closing this in favor of #310

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

8 participants