From 8770ea4ddc55ffa884246a6aa8416c191db9cadb Mon Sep 17 00:00:00 2001 From: Ernest W Durbin III Date: Mon, 16 Apr 2018 09:43:32 -0400 Subject: [PATCH 1/2] wrap search index mutation in a lock --- tests/unit/search/test_tasks.py | 120 ++++++++++++++++-- warehouse/search/tasks.py | 215 ++++++++++++++++++-------------- 2 files changed, 234 insertions(+), 101 deletions(-) diff --git a/tests/unit/search/test_tasks.py b/tests/unit/search/test_tasks.py index 17fbccfe445a..3565cbcdb101 100644 --- a/tests/unit/search/test_tasks.py +++ b/tests/unit/search/test_tasks.py @@ -12,10 +12,12 @@ import os +import celery import elasticsearch import packaging.version import pretend import pytest +import redis from first import first @@ -144,6 +146,7 @@ def project_docs(db): project_docs, ) + task = pretend.stub() es_client = FakeESClient() db_request.registry.update( @@ -153,6 +156,10 @@ def project_docs(db): }, ) + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + class TestException(Exception): pass @@ -167,7 +174,7 @@ def parallel_bulk(client, iterable): monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) with pytest.raises(TestException): - reindex(db_request) + reindex(task, db_request) assert es_client.indices.delete.calls == [ pretend.call(index='warehouse-cbcbcbcbcb'), @@ -175,6 +182,29 @@ def parallel_bulk(client, iterable): assert es_client.indices.put_settings.calls == [] assert es_client.indices.forcemerge.calls == [] + def test_retry_on_lock(self, db_request, monkeypatch): + task = pretend.stub( + retry=pretend.call_recorder( + pretend.raiser(celery.exceptions.Retry) + ) + ) + + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + + le = redis.exceptions.LockError() + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=pretend.raiser(le))) + + with pytest.raises(celery.exceptions.Retry): + reindex(task, db_request) + + assert task.retry.calls == [ + pretend.call(countdown=60, exc=le) + ] + def test_successfully_indexes_and_adds_new(self, db_request, monkeypatch): docs = pretend.stub() @@ -188,6 +218,7 @@ def project_docs(db): project_docs, ) + task = pretend.stub() es_client = FakeESClient() db_request.registry.update( @@ -198,13 +229,17 @@ def project_docs(db): } ) + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) - reindex(db_request) + reindex(task, db_request) assert parallel_bulk.calls == [pretend.call(es_client, docs)] assert es_client.indices.create.calls == [ @@ -251,6 +286,7 @@ def project_docs(db): project_docs, ) + task = pretend.stub() es_client = FakeESClient() es_client.indices.indices["warehouse-aaaaaaaaaa"] = None es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"] @@ -265,13 +301,17 @@ def project_docs(db): }, ) + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) - reindex(db_request) + reindex(task, db_request) assert parallel_bulk.calls == [pretend.call(es_client, docs)] assert es_client.indices.create.calls == [ @@ -323,6 +363,7 @@ def project_docs(db, project_name=None): project_docs, ) + task = pretend.stub() es_client = FakeESClient() db_request.registry.update( @@ -332,6 +373,10 @@ def project_docs(db, project_name=None): }, ) + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + class TestException(Exception): pass @@ -344,7 +389,7 @@ def parallel_bulk(client, iterable): warehouse.search.tasks, "parallel_bulk", parallel_bulk) with pytest.raises(TestException): - reindex_project(db_request, 'foo') + reindex_project(task, db_request, 'foo') assert es_client.indices.put_settings.calls == [] assert es_client.indices.forcemerge.calls == [] @@ -353,6 +398,7 @@ def test_unindex_fails_when_raising(self, db_request): class TestException(Exception): pass + task = pretend.stub() es_client = FakeESClient() es_client.delete = pretend.raiser(TestException) @@ -363,10 +409,61 @@ class TestException(Exception): }, ) + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + with pytest.raises(TestException): - unindex_project(db_request, 'foo') + unindex_project(task, db_request, 'foo') + + def test_unindex_retry_on_lock(self, db_request, monkeypatch): + task = pretend.stub( + retry=pretend.call_recorder( + pretend.raiser(celery.exceptions.Retry) + ) + ) + + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + + le = redis.exceptions.LockError() + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=pretend.raiser(le))) + + with pytest.raises(celery.exceptions.Retry): + unindex_project(task, db_request, "foo") + + assert task.retry.calls == [ + pretend.call(countdown=60, exc=le) + ] + + def test_reindex_retry_on_lock(self, db_request, monkeypatch): + task = pretend.stub( + retry=pretend.call_recorder( + pretend.raiser(celery.exceptions.Retry) + ) + ) + + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + + le = redis.exceptions.LockError() + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=pretend.raiser(le))) + + with pytest.raises(celery.exceptions.Retry): + reindex_project(task, db_request, "foo") + + assert task.retry.calls == [ + pretend.call(countdown=60, exc=le) + ] def test_unindex_accepts_defeat(self, db_request): + task = pretend.stub() es_client = FakeESClient() es_client.delete = pretend.call_recorder( pretend.raiser(elasticsearch.exceptions.NotFoundError)) @@ -378,7 +475,11 @@ def test_unindex_accepts_defeat(self, db_request): }, ) - unindex_project(db_request, 'foo') + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + + unindex_project(task, db_request, 'foo') assert es_client.delete.calls == [ pretend.call(index="warehouse", doc_type="project", id="foo") @@ -396,6 +497,7 @@ def project_docs(db, project_name=None): project_docs, ) + task = pretend.stub() es_client = FakeESClient() es_client.indices.indices["warehouse-aaaaaaaaaa"] = None es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"] @@ -410,11 +512,15 @@ def project_docs(db, project_name=None): }, ) + db_request.registry.settings = { + "celery.scheduler_url": "redis://redis:6379/0", + } + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) - reindex_project(db_request, 'foo') + reindex_project(task, db_request, 'foo') assert parallel_bulk.calls == [pretend.call(es_client, docs)] assert es_client.indices.create.calls == [] diff --git a/warehouse/search/tasks.py b/warehouse/search/tasks.py index 8228ce991941..742442b00665 100644 --- a/warehouse/search/tasks.py +++ b/warehouse/search/tasks.py @@ -14,6 +14,7 @@ import os import elasticsearch +import redis from elasticsearch.helpers import parallel_bulk from sqlalchemy import and_, func @@ -99,102 +100,128 @@ def _project_docs(db, project_name=None): yield p.to_dict(include_meta=True) -@tasks.task(ignore_result=True, acks_late=True) -def reindex(request): +@tasks.task(bind=True, ignore_result=True, acks_late=True) +def reindex(self, request): """ Recreate the Search Index. """ - client = request.registry["elasticsearch.client"] - number_of_replicas = request.registry.get("elasticsearch.replicas", 0) - refresh_interval = request.registry.get("elasticsearch.interval", "1s") - - # We use a randomly named index so that we can do a zero downtime reindex. - # Essentially we'll use a randomly named index which we will use until all - # of the data has been reindexed, at which point we'll point an alias at - # our randomly named index, and then delete the old randomly named index. - - # Create the new index and associate all of our doc types with it. - index_base = request.registry["elasticsearch.index"] - random_token = binascii.hexlify(os.urandom(5)).decode("ascii") - new_index_name = "{}-{}".format(index_base, random_token) - doc_types = request.registry.get("search.doc_types", set()) - shards = request.registry.get("elasticsearch.shards", 1) - - # Create the new index with zero replicas and index refreshes disabled - # while we are bulk indexing. - new_index = get_index( - new_index_name, - doc_types, - using=client, - shards=shards, - replicas=0, - interval="-1", - ) - new_index.create(wait_for_active_shards=shards) - - # From this point on, if any error occurs, we want to be able to delete our - # in progress index. + r = redis.StrictRedis.from_url( + request.registry.settings["celery.scheduler_url"]) try: - request.db.execute("SET statement_timeout = '600s'") - - for _ in parallel_bulk(client, _project_docs(request.db)): - pass - except: # noqa - new_index.delete() - raise - finally: - request.db.rollback() - request.db.close() - - # Now that we've finished indexing all of our data we can optimize it and - # update the replicas and refresh intervals. - client.indices.forcemerge(index=new_index_name) - client.indices.put_settings( - index=new_index_name, - body={ - "index": { - "number_of_replicas": number_of_replicas, - "refresh_interval": refresh_interval, - } - } - ) - - # Point the alias at our new randomly named index and delete the old index. - if client.indices.exists_alias(name=index_base): - to_delete = set() - actions = [] - for name in client.indices.get_alias(name=index_base): - to_delete.add(name) - actions.append({"remove": {"index": name, "alias": index_base}}) - actions.append({"add": {"index": new_index_name, "alias": index_base}}) - client.indices.update_aliases({"actions": actions}) - client.indices.delete(",".join(to_delete)) - else: - client.indices.put_alias(name=index_base, index=new_index_name) - - -@tasks.task(ignore_result=True, acks_late=True) -def reindex_project(request, project_name): - client = request.registry["elasticsearch.client"] - doc_types = request.registry.get("search.doc_types", set()) - index_name = request.registry["elasticsearch.index"] - get_index( - index_name, - doc_types, - using=client, - shards=request.registry.get("elasticsearch.shards", 1), - replicas=request.registry.get("elasticsearch.replicas", 0), - ) - - for _ in parallel_bulk(client, _project_docs(request.db, project_name)): - pass - - -@tasks.task(ignore_result=True, acks_late=True) -def unindex_project(request, project_name): - client = request.registry["elasticsearch.client"] - index_name = request.registry["elasticsearch.index"] + with r.lock('search-index', timeout=15 * 60, blocking_timeout=30): + client = request.registry["elasticsearch.client"] + number_of_replicas = request.registry.get( + "elasticsearch.replicas", 0) + refresh_interval = request.registry.get( + "elasticsearch.interval", "1s") + + # We use a randomly named index so that we can do a zero downtime + # reindex. Essentially we'll use a randomly named index which we + # will use until all of the data has been reindexed, at which point + # we'll point an alias at our randomly named index, and then delete + # the old randomly named index. + + # Create the new index and associate all of our doc types with it. + index_base = request.registry["elasticsearch.index"] + random_token = binascii.hexlify(os.urandom(5)).decode("ascii") + new_index_name = "{}-{}".format(index_base, random_token) + doc_types = request.registry.get("search.doc_types", set()) + shards = request.registry.get("elasticsearch.shards", 1) + + # Create the new index with zero replicas and index refreshes + # disabled while we are bulk indexing. + new_index = get_index( + new_index_name, + doc_types, + using=client, + shards=shards, + replicas=0, + interval="-1", + ) + new_index.create(wait_for_active_shards=shards) + + # From this point on, if any error occurs, we want to be able to + # delete our in progress index. + try: + request.db.execute("SET statement_timeout = '600s'") + + for _ in parallel_bulk(client, _project_docs(request.db)): + pass + except: # noqa + new_index.delete() + raise + finally: + request.db.rollback() + request.db.close() + + # Now that we've finished indexing all of our data we can optimize + # it and update the replicas and refresh intervals. + client.indices.forcemerge(index=new_index_name) + client.indices.put_settings( + index=new_index_name, + body={ + "index": { + "number_of_replicas": number_of_replicas, + "refresh_interval": refresh_interval, + } + } + ) + + # Point the alias at our new randomly named index and delete the + # old index. + if client.indices.exists_alias(name=index_base): + to_delete = set() + actions = [] + for name in client.indices.get_alias(name=index_base): + to_delete.add(name) + actions.append( + {"remove": {"index": name, "alias": index_base}}) + actions.append( + {"add": {"index": new_index_name, "alias": index_base}}) + client.indices.update_aliases({"actions": actions}) + client.indices.delete(",".join(to_delete)) + else: + client.indices.put_alias(name=index_base, index=new_index_name) + except redis.exceptions.LockError as exc: + raise self.retry(countdown=60, exc=exc) + + +@tasks.task(bind=True, ignore_result=True, acks_late=True) +def reindex_project(self, request, project_name): + r = redis.StrictRedis.from_url( + request.registry.settings["celery.scheduler_url"]) + try: + with r.lock('search-index', timeout=15, blocking_timeout=1): + client = request.registry["elasticsearch.client"] + doc_types = request.registry.get("search.doc_types", set()) + index_name = request.registry["elasticsearch.index"] + get_index( + index_name, + doc_types, + using=client, + shards=request.registry.get("elasticsearch.shards", 1), + replicas=request.registry.get("elasticsearch.replicas", 0), + ) + + for _ in parallel_bulk(client, + _project_docs(request.db, project_name)): + pass + except redis.exceptions.LockError as exc: + raise self.retry(countdown=60, exc=exc) + + +@tasks.task(bind=True, ignore_result=True, acks_late=True) +def unindex_project(self, request, project_name): + r = redis.StrictRedis.from_url( + request.registry.settings["celery.scheduler_url"]) try: - client.delete(index=index_name, doc_type="project", id=project_name) - except elasticsearch.exceptions.NotFoundError: - pass + with r.lock('search-index', timeout=15, blocking_timeout=1): + client = request.registry["elasticsearch.client"] + index_name = request.registry["elasticsearch.index"] + try: + client.delete( + index=index_name, doc_type="project", id=project_name) + except elasticsearch.exceptions.NotFoundError: + pass + except redis.exceptions.LockError as exc: + raise self.retry(countdown=60, exc=exc) From b4f495293d89818de170bfc534518b55121dc8eb Mon Sep 17 00:00:00 2001 From: Ernest W Durbin III Date: Mon, 16 Apr 2018 10:14:16 -0400 Subject: [PATCH 2/2] ain't no redis in travis --- tests/unit/search/test_tasks.py | 39 +++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/tests/unit/search/test_tasks.py b/tests/unit/search/test_tasks.py index 3565cbcdb101..3fbd34a24b79 100644 --- a/tests/unit/search/test_tasks.py +++ b/tests/unit/search/test_tasks.py @@ -12,6 +12,8 @@ import os +from contextlib import contextmanager + import celery import elasticsearch import packaging.version @@ -132,6 +134,11 @@ def __init__(self): self.indices = FakeESIndices() +@contextmanager +def _not_lock(*a, **kw): + yield True + + class TestReindex: def test_fails_when_raising(self, db_request, monkeypatch): @@ -168,6 +175,10 @@ def parallel_bulk(client, iterable): assert iterable is docs raise TestException + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) @@ -233,6 +244,10 @@ def project_docs(db): "celery.scheduler_url": "redis://redis:6379/0", } + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) @@ -305,6 +320,10 @@ def project_docs(db): "celery.scheduler_url": "redis://redis:6379/0", } + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) @@ -385,6 +404,10 @@ def parallel_bulk(client, iterable): assert iterable is docs raise TestException + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) @@ -394,7 +417,7 @@ def parallel_bulk(client, iterable): assert es_client.indices.put_settings.calls == [] assert es_client.indices.forcemerge.calls == [] - def test_unindex_fails_when_raising(self, db_request): + def test_unindex_fails_when_raising(self, db_request, monkeypatch): class TestException(Exception): pass @@ -413,6 +436,10 @@ class TestException(Exception): "celery.scheduler_url": "redis://redis:6379/0", } + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + with pytest.raises(TestException): unindex_project(task, db_request, 'foo') @@ -462,7 +489,7 @@ def test_reindex_retry_on_lock(self, db_request, monkeypatch): pretend.call(countdown=60, exc=le) ] - def test_unindex_accepts_defeat(self, db_request): + def test_unindex_accepts_defeat(self, db_request, monkeypatch): task = pretend.stub() es_client = FakeESClient() es_client.delete = pretend.call_recorder( @@ -479,6 +506,10 @@ def test_unindex_accepts_defeat(self, db_request): "celery.scheduler_url": "redis://redis:6379/0", } + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + unindex_project(task, db_request, 'foo') assert es_client.delete.calls == [ @@ -516,6 +547,10 @@ def project_docs(db, project_name=None): "celery.scheduler_url": "redis://redis:6379/0", } + monkeypatch.setattr( + redis.StrictRedis, "from_url", + lambda *a, **kw: pretend.stub(lock=_not_lock)) + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk)