Skip to content

APM agent datastore_tracer with elasticsearch 8 client after upgrading from 7. #1447

@ChrisRZen

Description

@ChrisRZen

Description

Describe the problem you're encountering.
TIP: Do NOT share sensitive information, whether personal, proprietary, or otherwise!

In some cases when running API calls while also using the streaming_bulk function we are seeing NewRelic triggering failures on calls to the client due to 'datastore_tracer' not existing on a NoneType. This error does not occur when using elasticsearch client v7 and elasticsearch_dsl. The error also does not occur when not running the APM agent, and the code executes as expected regardless of our client version.

We can observe these errors in our django app when running in celery or in a shell. Once the error triggers it seems to cause all subsequent attempts to run the function to fail as well even if the same input passed previously. This is currently blocking us from upgrading to the v8 client

Expected Behavior

Tell us what you expected to happen. I expect the elasticsearch calls to not fail, and for new relic to handle errors and failed invariants gracefully.

Troubleshooting or NR Diag results

Provide any other relevant log data.
TIP: Scrub logs and diagnostic information for sensitive information.

This is an example trace we get back scrubbed a bit

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/app/python/lib/python3.13/site-packages/celery/local.py", line 182, in __call__
    return self._get_current_object()(*a, **kw)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^
  File "/app/python/lib/python3.13/site-packages/newrelic/hooks/application_celery.py", line 123, in wrapper
    return wrapped(*args, **kwargs)
  File "/app/python/lib/python3.13/site-packages/celery/app/task.py", line 411, in __call__
    return self.run(*args, **kwargs)
           ~~~~~~~~^^^^^^^^^^^^^^^^^
  File "<our file>", line 507, in index_ilm_documents
    for res in streaming_bulk_func(
               ~~~~~~~~~~~~~~~~~~~^
        es,
        ^^^
    ...<4 lines>...
        request_timeout=30,
        ^^^^^^^^^^^^^^^^^^^
    ):
    ^
  File "/app/python/lib/python3.13/site-packages/elasticsearch/helpers/actions.py", line 453, in streaming_bulk
    for data, (ok, info) in zip(
                            ~~~^
        bulk_data,
        ^^^^^^^^^^
    ...<10 lines>...
        ),
        ^^
    ):
    ^
  File "/app/python/lib/python3.13/site-packages/elasticsearch/helpers/actions.py", line 343, in _process_bulk_chunk
    resp = client.bulk(*args, operations=bulk_actions, **kwargs)  # type: ignore[arg-type]
  File "/app/python/lib/python3.13/site-packages/newrelic/hooks/datastore_elasticsearch.py", line 140, in _nr_wrapper_Elasticsearch_method_
    result = wrapped(*args, **kwargs)
  File "/app/python/lib/python3.13/site-packages/elasticsearch/_sync/client/utils.py", line 402, in wrapped
    client = client.options(**transport_options)
  File "/app/python/lib/python3.13/site-packages/newrelic/hooks/datastore_elasticsearch.py", line 142, in _nr_wrapper_Elasticsearch_method_
    tracer_settings = trace.settings.datastore_tracer
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'datastore_tracer'

Steps to Reproduce

Please be as specific as possible.
TIP: Link a sample application that demonstrates the issue.

I haven't had a chance to recreate this in a sample app. but this is the gist of what we are doing

es_client file

from django.conf import settings
from elasticsearch import Elasticsearch
# Use a global shared client per https://elasticsearch-py.readthedocs.io/en/v7.12.0/#thread-safety

ES_CLIENT = Elasticsearch(
    hosts=[settings.ES_URL],
    timeout=settings.ELASTIC_CLIENT_TIMEOUT,
    maxsize=settings.WEB_APP_THREADS,
    http_compress=True,
)

in the file that is erroring we are retrieving documents from ilm indexes and updating them

from elasticsearch.helpers import streaming_bulk

def index_docs(ids):
   es = ES_CLIENT
   def _generate_docs():
      # Chunk size can be smaller for replication. I was able to replicate with just 10 doc chunks
      for id_chunk in batch(ids, 500):
         # The failure is here. It happens relatively consistently, though I believe this 
         # loop needs to happen at least twice
         potential_updated_docs = get_updated_docs()
         original_docs = es.search(
            body=dict(query=dict(terms=dict(_id=ids))),
            _source=False,
            size=2,
            index=index_alias
         )
         id_to_index = {hit["_id"]: hit["_index"] for hit in res["hits"]["hits"]}
         yield from [
                {
                    "_op_type": "index",
                    "_index": id_to_index.get(str(obj["id"]), default_es),
                    "_id": obj["id"],
                    "_source": obj,
                }
                for obj in potential_updated_docs
         ]
   # I believe the streaming_bulk is important here. 
   # I have not been able to replicate this issue with just es.search
   # I suspect there would be an issue already if it were that simple
   for res in streaming_bulk(
      es,
      _generate_documents(),
      raise_on_exception=False,
      raise_on_error=False,
      chunk_size=500,
      request_timeout=30
   ):
       # do something
       pass

Your Environment

Include as many relevant details about your environment as possible including the running version of the Python agent, the Python version being used and any other relevant environment information.

django==5.2.4
python==3.13
newrelic==10.15.0
elasticsearch==8.18.1

Additional context

Add any other context about the problem here. For example, relevant community posts or support tickets.

I have not found any relevant posts

Metadata

Metadata

Assignees

Labels

bugIncorrect or flawed agent behavior.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions