From ff4718d30bbe36c0874c24b7a18c90bcb8415e56 Mon Sep 17 00:00:00 2001 From: Ernest W Durbin III Date: Fri, 20 Jul 2018 10:03:18 -0400 Subject: [PATCH 1/4] Incremental search index updates 2.0 --- tests/unit/search/test_init.py | 53 +++++++++++++ tests/unit/search/test_tasks.py | 131 +++++++++++++++++++++++++++++++- warehouse/search/__init__.py | 42 ++++++++++ warehouse/search/tasks.py | 35 ++++++++- 4 files changed, 258 insertions(+), 3 deletions(-) diff --git a/tests/unit/search/test_init.py b/tests/unit/search/test_init.py index 63b91a16abc7..52bad8179bb7 100644 --- a/tests/unit/search/test_init.py +++ b/tests/unit/search/test_init.py @@ -14,6 +14,59 @@ from warehouse import search +from ...common.db.packaging import ProjectFactory, ReleaseFactory + + +def test_store_projects(db_request): + project0 = ProjectFactory.create() + project1 = ProjectFactory.create() + release1 = ReleaseFactory.create(project=project1) + config = pretend.stub() + session = pretend.stub(info={}, new={project0}, dirty=set(), deleted={release1}) + + search.store_projects_for_project_reindex(config, session, pretend.stub()) + + assert session.info["warehouse.search.project_updates"] == {project0, project1} + assert session.info["warehouse.search.project_deletes"] == set() + + +def test_store_projects_unindex(db_request): + project0 = ProjectFactory.create() + project1 = ProjectFactory.create() + config = pretend.stub() + session = pretend.stub(info={}, new={project0}, dirty=set(), deleted={project1}) + + search.store_projects_for_project_reindex(config, session, pretend.stub()) + + assert session.info["warehouse.search.project_updates"] == {project0} + assert session.info["warehouse.search.project_deletes"] == {project1} + + +def test_execute_reindex_success(app_config): + _delay = pretend.call_recorder(lambda x: None) + app_config.task = lambda x: pretend.stub(delay=_delay) + session = pretend.stub( + info={"warehouse.search.project_updates": {pretend.stub(normalized_name="foo")}} + ) + + search.execute_project_reindex(app_config, session) + + assert _delay.calls == [pretend.call("foo")] + assert "warehouse.search.project_updates" not in session.info + + +def test_execute_unindex_success(app_config): + _delay = pretend.call_recorder(lambda x: None) + app_config.task = lambda x: pretend.stub(delay=_delay) + session = pretend.stub( + info={"warehouse.search.project_deletes": {pretend.stub(normalized_name="foo")}} + ) + + search.execute_project_reindex(app_config, session) + + assert _delay.calls == [pretend.call("foo")] + assert "warehouse.search.project_deletes" not in session.info + def test_es(monkeypatch): search_obj = pretend.stub() diff --git a/tests/unit/search/test_tasks.py b/tests/unit/search/test_tasks.py index 28fd2ad35609..88ef9d11103e 100644 --- a/tests/unit/search/test_tasks.py +++ b/tests/unit/search/test_tasks.py @@ -12,6 +12,7 @@ import os +import elasticsearch import packaging.version import pretend import pytest @@ -19,7 +20,13 @@ from first import first import warehouse.search.tasks -from warehouse.search.tasks import reindex, _project_docs +from warehouse.search.tasks import ( + reindex, + reindex_project, + unindex_project, + _project_docs, +) + from ...common.db.packaging import ProjectFactory, ReleaseFactory @@ -51,6 +58,34 @@ def test_project_docs(db_session): ] +def test_single_project_doc(db_session): + projects = [ProjectFactory.create() for _ in range(2)] + releases = { + p: sorted( + [ReleaseFactory.create(project=p) for _ in range(3)], + key=lambda r: packaging.version.parse(r.version), + reverse=True, + ) + for p in projects + } + + assert list(_project_docs(db_session, project_name=projects[1].name)) == [ + { + "_id": p.normalized_name, + "_type": "doc", + "_source": { + "created": p.created, + "name": p.name, + "normalized_name": p.normalized_name, + "version": [r.version for r in prs], + "latest_version": first(prs, key=lambda r: not r.is_prerelease).version, + }, + } + for p, prs in sorted(releases.items(), key=lambda x: x[0].name.lower()) + if p.name == projects[1].name + ] + + class FakeESIndices: def __init__(self): self.indices = {} @@ -241,3 +276,97 @@ def project_docs(db): body={"index": {"number_of_replicas": 0, "refresh_interval": "1s"}}, ) ] + + +class TestPartialReindex: + def test_reindex_fails_when_raising(self, db_request, monkeypatch): + docs = pretend.stub() + + def project_docs(db, project_name=None): + return docs + + monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs) + + es_client = FakeESClient() + + db_request.registry.update( + {"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"} + ) + + class TestException(Exception): + pass + + def parallel_bulk(client, iterable): + assert client is es_client + assert iterable is docs + raise TestException + + monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) + + with pytest.raises(TestException): + reindex_project(db_request, "foo") + + assert es_client.indices.put_settings.calls == [] + + def test_unindex_fails_when_raising(self, db_request): + class TestException(Exception): + pass + + es_client = FakeESClient() + es_client.delete = pretend.raiser(TestException) + + db_request.registry.update( + {"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"} + ) + + with pytest.raises(TestException): + unindex_project(db_request, "foo") + + def test_unindex_accepts_defeat(self, db_request): + es_client = FakeESClient() + es_client.delete = pretend.call_recorder( + pretend.raiser(elasticsearch.exceptions.NotFoundError) + ) + + db_request.registry.update( + {"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"} + ) + + unindex_project(db_request, "foo") + + assert es_client.delete.calls == [ + pretend.call(index="warehouse", doc_type="project", id="foo") + ] + + def test_successfully_indexes(self, db_request, monkeypatch): + docs = pretend.stub() + + def project_docs(db, project_name=None): + return docs + + monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs) + + es_client = FakeESClient() + es_client.indices.indices["warehouse-aaaaaaaaaa"] = None + es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"] + db_engine = pretend.stub() + + db_request.registry.update( + { + "elasticsearch.client": es_client, + "elasticsearch.index": "warehouse", + "elasticsearch.shards": 42, + "sqlalchemy.engine": db_engine, + } + ) + + parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) + monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) + + reindex_project(db_request, "foo") + + assert parallel_bulk.calls == [pretend.call(es_client, docs)] + assert es_client.indices.create.calls == [] + assert es_client.indices.delete.calls == [] + assert es_client.indices.aliases == {"warehouse": ["warehouse-aaaaaaaaaa"]} + assert es_client.indices.put_settings.calls == [] diff --git a/warehouse/search/__init__.py b/warehouse/search/__init__.py index bb3cc1a5248d..36b351668749 100644 --- a/warehouse/search/__init__.py +++ b/warehouse/search/__init__.py @@ -18,7 +18,49 @@ from celery.schedules import crontab from elasticsearch_dsl import serializer +from warehouse import db from warehouse.search.utils import get_index +from warehouse.packaging.models import Project, Release + + +@db.listens_for(db.Session, "after_flush") +def store_projects_for_project_reindex(config, session, flush_context): + # We'll (ab)use the session.info dictionary to store a list of pending + # Project updates to the session. + projects_to_update = session.info.setdefault( + "warehouse.search.project_updates", set() + ) + projects_to_delete = session.info.setdefault( + "warehouse.search.project_deletes", set() + ) + + # Go through each new, changed, and deleted object and attempt to store + # a Project to reindex for when the session has been committed. + for obj in session.new | session.dirty: + if obj.__class__ == Project: + projects_to_update.add(obj) + if obj.__class__ == Release: + projects_to_update.add(obj.project) + + for obj in session.deleted: + if obj.__class__ == Project: + projects_to_delete.add(obj) + if obj.__class__ == Release: + projects_to_update.add(obj.project) + + +@db.listens_for(db.Session, "after_commit") +def execute_project_reindex(config, session): + projects_to_update = session.info.pop("warehouse.search.project_updates", set()) + projects_to_delete = session.info.pop("warehouse.search.project_deletes", set()) + + from warehouse.search.tasks import reindex_project, unindex_project + + for project in projects_to_update: + config.task(reindex_project).delay(project.normalized_name) + + for project in projects_to_delete: + config.task(unindex_project).delay(project.normalized_name) def es(request): diff --git a/warehouse/search/tasks.py b/warehouse/search/tasks.py index 0d995aa23d2b..76752b55628a 100644 --- a/warehouse/search/tasks.py +++ b/warehouse/search/tasks.py @@ -28,7 +28,7 @@ from warehouse.utils.db import windowed_query -def _project_docs(db): +def _project_docs(db, project_name=None): releases_list = ( db.query(Release.name, Release.version) @@ -38,9 +38,13 @@ def _project_docs(db): Release._pypi_ordering.desc(), ) .distinct(Release.name) - .subquery("release_list") ) + if project_name: + releases_list = releases_list.filter(Release.name == project_name) + + releases_list = releases_list.subquery() + r = aliased(Release, name="r") all_versions = ( @@ -182,3 +186,30 @@ def reindex(request): client.indices.delete(",".join(to_delete)) else: client.indices.put_alias(name=index_base, index=new_index_name) + + +@tasks.task(ignore_result=True, acks_late=True) +def reindex_project(request, project_name): + client = request.registry["elasticsearch.client"] + doc_types = request.registry.get("search.doc_types", set()) + index_name = request.registry["elasticsearch.index"] + get_index( + index_name, + doc_types, + using=client, + shards=request.registry.get("elasticsearch.shards", 1), + replicas=request.registry.get("elasticsearch.replicas", 0), + ) + + for _ in parallel_bulk(client, _project_docs(request.db, project_name)): + pass + + +@tasks.task(ignore_result=True, acks_late=True) +def unindex_project(request, project_name): + client = request.registry["elasticsearch.client"] + index_name = request.registry["elasticsearch.index"] + try: + client.delete(index=index_name, doc_type="project", id=project_name) + except elasticsearch.exceptions.NotFoundError: + pass From fed7f2d062146b1997e0ffe4a42e1be49fb05e09 Mon Sep 17 00:00:00 2001 From: Ernest W Durbin III Date: Fri, 20 Jul 2018 10:23:41 -0400 Subject: [PATCH 2/4] fixups for doctype shuffle in latest elasticsearch_dsl --- tests/unit/search/test_tasks.py | 10 ++++++---- warehouse/search/tasks.py | 6 ++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/unit/search/test_tasks.py b/tests/unit/search/test_tasks.py index 88ef9d11103e..aa8478c7ca34 100644 --- a/tests/unit/search/test_tasks.py +++ b/tests/unit/search/test_tasks.py @@ -296,7 +296,7 @@ def project_docs(db, project_name=None): class TestException(Exception): pass - def parallel_bulk(client, iterable): + def parallel_bulk(client, iterable, index=None): assert client is es_client assert iterable is docs raise TestException @@ -335,7 +335,7 @@ def test_unindex_accepts_defeat(self, db_request): unindex_project(db_request, "foo") assert es_client.delete.calls == [ - pretend.call(index="warehouse", doc_type="project", id="foo") + pretend.call(index="warehouse", doc_type="doc", id="foo") ] def test_successfully_indexes(self, db_request, monkeypatch): @@ -360,12 +360,14 @@ def project_docs(db, project_name=None): } ) - parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) + parallel_bulk = pretend.call_recorder( + lambda client, iterable, index=None: [None] + ) monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) reindex_project(db_request, "foo") - assert parallel_bulk.calls == [pretend.call(es_client, docs)] + assert parallel_bulk.calls == [pretend.call(es_client, docs, index="warehouse")] assert es_client.indices.create.calls == [] assert es_client.indices.delete.calls == [] assert es_client.indices.aliases == {"warehouse": ["warehouse-aaaaaaaaaa"]} diff --git a/warehouse/search/tasks.py b/warehouse/search/tasks.py index 76752b55628a..504714d8c362 100644 --- a/warehouse/search/tasks.py +++ b/warehouse/search/tasks.py @@ -201,7 +201,9 @@ def reindex_project(request, project_name): replicas=request.registry.get("elasticsearch.replicas", 0), ) - for _ in parallel_bulk(client, _project_docs(request.db, project_name)): + for _ in parallel_bulk( + client, _project_docs(request.db, project_name), index=index_name + ): pass @@ -210,6 +212,6 @@ def unindex_project(request, project_name): client = request.registry["elasticsearch.client"] index_name = request.registry["elasticsearch.index"] try: - client.delete(index=index_name, doc_type="project", id=project_name) + client.delete(index=index_name, doc_type="doc", id=project_name) except elasticsearch.exceptions.NotFoundError: pass From e01b506222a1dd41ab9091d69479bf8ed4ffae24 Mon Sep 17 00:00:00 2001 From: Ernest W Durbin III Date: Fri, 20 Jul 2018 12:05:13 -0400 Subject: [PATCH 3/4] use a lock to keep incrementals from running during full reindex --- tests/unit/search/test_tasks.py | 152 ++++++++++++++++++-- warehouse/search/tasks.py | 240 ++++++++++++++++++-------------- 2 files changed, 277 insertions(+), 115 deletions(-) diff --git a/tests/unit/search/test_tasks.py b/tests/unit/search/test_tasks.py index aa8478c7ca34..6cc76a4ee5d6 100644 --- a/tests/unit/search/test_tasks.py +++ b/tests/unit/search/test_tasks.py @@ -12,10 +12,12 @@ import os +import celery import elasticsearch import packaging.version import pretend import pytest +import redis from first import first @@ -25,6 +27,7 @@ reindex_project, unindex_project, _project_docs, + SearchLock, ) @@ -125,6 +128,34 @@ def __init__(self): self.indices = FakeESIndices() +class NotLock: + def __init__(*a, **kw): + pass + + def acquire(self): + return True + + def release(self): + return True + + +class TestSearchLock: + def test_success(self): + lock_stub = pretend.stub(acquire=pretend.call_recorder(lambda: True)) + r = pretend.stub(lock=lambda *a, **kw: lock_stub) + test_lock = SearchLock(r) + test_lock.__enter__() + assert lock_stub.acquire.calls == [pretend.call()] + + def test_failure(self): + lock_stub = pretend.stub(acquire=pretend.call_recorder(lambda: False)) + r = pretend.stub(lock=lambda *a, **kw: lock_stub) + test_lock = SearchLock(r) + with pytest.raises(redis.exceptions.LockError): + test_lock.__enter__() + assert lock_stub.acquire.calls == [pretend.call()] + + class TestReindex: def test_fails_when_raising(self, db_request, monkeypatch): docs = pretend.stub() @@ -134,10 +165,14 @@ def project_docs(db): monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs) + task = pretend.stub() es_client = FakeESClient() db_request.registry.update({"elasticsearch.index": "warehouse"}) - db_request.registry.settings = {"elasticsearch.url": "http://some.url"} + db_request.registry.settings = { + "elasticsearch.url": "http://some.url", + "celery.scheduler_url": "redis://redis:6379/0", + } monkeypatch.setattr( warehouse.search.tasks.elasticsearch, "Elasticsearch", @@ -153,18 +188,41 @@ def parallel_bulk(client, iterable, index=None): assert index == "warehouse-cbcbcbcbcb" raise TestException + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) + monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) with pytest.raises(TestException): - reindex(db_request) + reindex(task, db_request) assert es_client.indices.delete.calls == [ pretend.call(index="warehouse-cbcbcbcbcb") ] assert es_client.indices.put_settings.calls == [] + def test_retry_on_lock(self, db_request, monkeypatch): + task = pretend.stub( + retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry)) + ) + + db_request.registry.settings = {"celery.scheduler_url": "redis://redis:6379/0"} + + le = redis.exceptions.LockError() + monkeypatch.setattr( + redis.StrictRedis, + "from_url", + lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)), + ) + + with pytest.raises(celery.exceptions.Retry): + reindex(task, db_request) + + assert task.retry.calls == [pretend.call(countdown=60, exc=le)] + def test_successfully_indexes_and_adds_new(self, db_request, monkeypatch): docs = pretend.stub() @@ -174,24 +232,31 @@ def project_docs(db): monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs) + task = pretend.stub() es_client = FakeESClient() db_request.registry.update( {"elasticsearch.index": "warehouse", "elasticsearch.shards": 42} ) - db_request.registry.settings = {"elasticsearch.url": "http://some.url"} + db_request.registry.settings = { + "elasticsearch.url": "http://some.url", + "celery.scheduler_url": "redis://redis:6379/0", + } monkeypatch.setattr( warehouse.search.tasks.elasticsearch, "Elasticsearch", lambda *a, **kw: es_client, ) + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) parallel_bulk = pretend.call_recorder(lambda client, iterable, index: [None]) monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) - reindex(db_request) + reindex(task, db_request) assert parallel_bulk.calls == [ pretend.call(es_client, docs, index="warehouse-cbcbcbcbcb") @@ -220,6 +285,7 @@ def project_docs(db): def test_successfully_indexes_and_replaces(self, db_request, monkeypatch): docs = pretend.stub() + task = pretend.stub() def project_docs(db): return docs @@ -238,19 +304,25 @@ def project_docs(db): "sqlalchemy.engine": db_engine, } ) - db_request.registry.settings = {"elasticsearch.url": "http://some.url"} + db_request.registry.settings = { + "elasticsearch.url": "http://some.url", + "celery.scheduler_url": "redis://redis:6379/0", + } monkeypatch.setattr( warehouse.search.tasks.elasticsearch, "Elasticsearch", lambda *a, **kw: es_client, ) + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) parallel_bulk = pretend.call_recorder(lambda client, iterable, index: [None]) monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) - reindex(db_request) + reindex(task, db_request) assert parallel_bulk.calls == [ pretend.call(es_client, docs, index="warehouse-cbcbcbcbcb") @@ -281,6 +353,7 @@ def project_docs(db): class TestPartialReindex: def test_reindex_fails_when_raising(self, db_request, monkeypatch): docs = pretend.stub() + task = pretend.stub() def project_docs(db, project_name=None): return docs @@ -302,44 +375,96 @@ def parallel_bulk(client, iterable, index=None): raise TestException monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) with pytest.raises(TestException): - reindex_project(db_request, "foo") + reindex_project(task, db_request, "foo") assert es_client.indices.put_settings.calls == [] - def test_unindex_fails_when_raising(self, db_request): + def test_unindex_fails_when_raising(self, db_request, monkeypatch): + task = pretend.stub() + class TestException(Exception): pass es_client = FakeESClient() es_client.delete = pretend.raiser(TestException) + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) db_request.registry.update( {"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"} ) with pytest.raises(TestException): - unindex_project(db_request, "foo") + unindex_project(task, db_request, "foo") + + def test_unindex_accepts_defeat(self, db_request, monkeypatch): + task = pretend.stub() - def test_unindex_accepts_defeat(self, db_request): es_client = FakeESClient() es_client.delete = pretend.call_recorder( pretend.raiser(elasticsearch.exceptions.NotFoundError) ) + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) db_request.registry.update( {"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"} ) - unindex_project(db_request, "foo") + unindex_project(task, db_request, "foo") assert es_client.delete.calls == [ pretend.call(index="warehouse", doc_type="doc", id="foo") ] + def test_unindex_retry_on_lock(self, db_request, monkeypatch): + task = pretend.stub( + retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry)) + ) + + db_request.registry.settings = {"celery.scheduler_url": "redis://redis:6379/0"} + + le = redis.exceptions.LockError() + monkeypatch.setattr( + redis.StrictRedis, + "from_url", + lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)), + ) + + with pytest.raises(celery.exceptions.Retry): + unindex_project(task, db_request, "foo") + + assert task.retry.calls == [pretend.call(countdown=60, exc=le)] + + def test_reindex_retry_on_lock(self, db_request, monkeypatch): + task = pretend.stub( + retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry)) + ) + + db_request.registry.settings = {"celery.scheduler_url": "redis://redis:6379/0"} + + le = redis.exceptions.LockError() + monkeypatch.setattr( + redis.StrictRedis, + "from_url", + lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)), + ) + + with pytest.raises(celery.exceptions.Retry): + reindex_project(task, db_request, "foo") + + assert task.retry.calls == [pretend.call(countdown=60, exc=le)] + def test_successfully_indexes(self, db_request, monkeypatch): docs = pretend.stub() + task = pretend.stub() def project_docs(db, project_name=None): return docs @@ -364,8 +489,11 @@ def project_docs(db, project_name=None): lambda client, iterable, index=None: [None] ) monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk) + monkeypatch.setattr( + redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock) + ) - reindex_project(db_request, "foo") + reindex_project(task, db_request, "foo") assert parallel_bulk.calls == [pretend.call(es_client, docs, index="warehouse")] assert es_client.indices.create.calls == [] diff --git a/warehouse/search/tasks.py b/warehouse/search/tasks.py index 504714d8c362..104c27d97d01 100644 --- a/warehouse/search/tasks.py +++ b/warehouse/search/tasks.py @@ -20,6 +20,7 @@ from sqlalchemy.orm import aliased import certifi import elasticsearch +import redis from warehouse.packaging.models import Classifier, Project, Release, release_classifiers from warehouse.packaging.search import Project as ProjectDocument @@ -107,111 +108,144 @@ def _project_docs(db, project_name=None): yield doc -@tasks.task(ignore_result=True, acks_late=True) -def reindex(request): - """ - Recreate the Search Index. - """ - p = urllib.parse.urlparse(request.registry.settings["elasticsearch.url"]) - client = elasticsearch.Elasticsearch( - [urllib.parse.urlunparse(p[:2] + ("",) * 4)], - verify_certs=True, - ca_certs=certifi.where(), - timeout=30, - retry_on_timeout=True, - serializer=serializer.serializer, - ) - number_of_replicas = request.registry.get("elasticsearch.replicas", 0) - refresh_interval = request.registry.get("elasticsearch.interval", "1s") - - # We use a randomly named index so that we can do a zero downtime reindex. - # Essentially we'll use a randomly named index which we will use until all - # of the data has been reindexed, at which point we'll point an alias at - # our randomly named index, and then delete the old randomly named index. - - # Create the new index and associate all of our doc types with it. - index_base = request.registry["elasticsearch.index"] - random_token = binascii.hexlify(os.urandom(5)).decode("ascii") - new_index_name = "{}-{}".format(index_base, random_token) - doc_types = request.registry.get("search.doc_types", set()) - shards = request.registry.get("elasticsearch.shards", 1) - - # Create the new index with zero replicas and index refreshes disabled - # while we are bulk indexing. - new_index = get_index( - new_index_name, - doc_types, - using=client, - shards=shards, - replicas=0, - interval="-1", - ) - new_index.create(wait_for_active_shards=shards) - - # From this point on, if any error occurs, we want to be able to delete our - # in progress index. - try: - request.db.execute("SET statement_timeout = '600s'") - - for _ in parallel_bulk(client, _project_docs(request.db), index=new_index_name): - pass - except: # noqa - new_index.delete() - raise - finally: - request.db.rollback() - request.db.close() - - # Now that we've finished indexing all of our data we can update the - # replicas and refresh intervals. - client.indices.put_settings( - index=new_index_name, - body={ - "index": { - "number_of_replicas": number_of_replicas, - "refresh_interval": refresh_interval, - } - }, - ) +class SearchLock: + def __init__(self, redis_client, timeout=None, blocking_timeout=None): + self.lock = redis_client.lock( + "search-index", timeout=timeout, blocking_timeout=blocking_timeout + ) - # Point the alias at our new randomly named index and delete the old index. - if client.indices.exists_alias(name=index_base): - to_delete = set() - actions = [] - for name in client.indices.get_alias(name=index_base): - to_delete.add(name) - actions.append({"remove": {"index": name, "alias": index_base}}) - actions.append({"add": {"index": new_index_name, "alias": index_base}}) - client.indices.update_aliases({"actions": actions}) - client.indices.delete(",".join(to_delete)) - else: - client.indices.put_alias(name=index_base, index=new_index_name) - - -@tasks.task(ignore_result=True, acks_late=True) -def reindex_project(request, project_name): - client = request.registry["elasticsearch.client"] - doc_types = request.registry.get("search.doc_types", set()) - index_name = request.registry["elasticsearch.index"] - get_index( - index_name, - doc_types, - using=client, - shards=request.registry.get("elasticsearch.shards", 1), - replicas=request.registry.get("elasticsearch.replicas", 0), - ) + def __enter__(self): + if self.lock.acquire(): + return self + else: + raise redis.exceptions.LockError("Could not acquire lock!") - for _ in parallel_bulk( - client, _project_docs(request.db, project_name), index=index_name - ): - pass + def __exit__(self, type, value, tb): + self.lock.release() -@tasks.task(ignore_result=True, acks_late=True) -def unindex_project(request, project_name): - client = request.registry["elasticsearch.client"] - index_name = request.registry["elasticsearch.index"] +@tasks.task(bind=True, ignore_result=True, acks_late=True) +def reindex(self, request): + """ + Recreate the Search Index. + """ + r = redis.StrictRedis.from_url(request.registry.settings["celery.scheduler_url"]) + try: + with SearchLock(r, timeout=30 * 60, blocking_timeout=30): + p = urllib.parse.urlparse(request.registry.settings["elasticsearch.url"]) + client = elasticsearch.Elasticsearch( + [urllib.parse.urlunparse(p[:2] + ("",) * 4)], + verify_certs=True, + ca_certs=certifi.where(), + timeout=30, + retry_on_timeout=True, + serializer=serializer.serializer, + ) + number_of_replicas = request.registry.get("elasticsearch.replicas", 0) + refresh_interval = request.registry.get("elasticsearch.interval", "1s") + + # We use a randomly named index so that we can do a zero downtime reindex. + # Essentially we'll use a randomly named index which we will use until all + # of the data has been reindexed, at which point we'll point an alias at + # our randomly named index, and then delete the old randomly named index. + + # Create the new index and associate all of our doc types with it. + index_base = request.registry["elasticsearch.index"] + random_token = binascii.hexlify(os.urandom(5)).decode("ascii") + new_index_name = "{}-{}".format(index_base, random_token) + doc_types = request.registry.get("search.doc_types", set()) + shards = request.registry.get("elasticsearch.shards", 1) + + # Create the new index with zero replicas and index refreshes disabled + # while we are bulk indexing. + new_index = get_index( + new_index_name, + doc_types, + using=client, + shards=shards, + replicas=0, + interval="-1", + ) + new_index.create(wait_for_active_shards=shards) + + # From this point on, if any error occurs, we want to be able to delete our + # in progress index. + try: + request.db.execute("SET statement_timeout = '600s'") + + for _ in parallel_bulk( + client, _project_docs(request.db), index=new_index_name + ): + pass + except: # noqa + new_index.delete() + raise + finally: + request.db.rollback() + request.db.close() + + # Now that we've finished indexing all of our data we can update the + # replicas and refresh intervals. + client.indices.put_settings( + index=new_index_name, + body={ + "index": { + "number_of_replicas": number_of_replicas, + "refresh_interval": refresh_interval, + } + }, + ) + + # Point the alias at our new randomly named index and delete the old index. + if client.indices.exists_alias(name=index_base): + to_delete = set() + actions = [] + for name in client.indices.get_alias(name=index_base): + to_delete.add(name) + actions.append({"remove": {"index": name, "alias": index_base}}) + actions.append({"add": {"index": new_index_name, "alias": index_base}}) + client.indices.update_aliases({"actions": actions}) + client.indices.delete(",".join(to_delete)) + else: + client.indices.put_alias(name=index_base, index=new_index_name) + except redis.exceptions.LockError as exc: + raise self.retry(countdown=60, exc=exc) + + +@tasks.task(bind=True, ignore_result=True, acks_late=True) +def reindex_project(self, request, project_name): + r = redis.StrictRedis.from_url(request.registry.settings["celery.scheduler_url"]) + try: + with SearchLock(r, timeout=15, blocking_timeout=1): + client = request.registry["elasticsearch.client"] + doc_types = request.registry.get("search.doc_types", set()) + index_name = request.registry["elasticsearch.index"] + get_index( + index_name, + doc_types, + using=client, + shards=request.registry.get("elasticsearch.shards", 1), + replicas=request.registry.get("elasticsearch.replicas", 0), + ) + + for _ in parallel_bulk( + client, _project_docs(request.db, project_name), index=index_name + ): + pass + except redis.exceptions.LockError as exc: + raise self.retry(countdown=60, exc=exc) + + +@tasks.task(bind=True, ignore_result=True, acks_late=True) +def unindex_project(self, request, project_name): + r = redis.StrictRedis.from_url(request.registry.settings["celery.scheduler_url"]) try: - client.delete(index=index_name, doc_type="doc", id=project_name) - except elasticsearch.exceptions.NotFoundError: - pass + with SearchLock(r, timeout=15, blocking_timeout=1): + client = request.registry["elasticsearch.client"] + index_name = request.registry["elasticsearch.index"] + try: + client.delete(index=index_name, doc_type="doc", id=project_name) + except elasticsearch.exceptions.NotFoundError: + pass + except redis.exceptions.LockError as exc: + raise self.retry(countdown=60, exc=exc) From a51592c7cb9a1bbbfbaf9d3113b00869cbf5b991 Mon Sep 17 00:00:00 2001 From: Ernest W Durbin III Date: Sat, 21 Jul 2018 10:37:39 -0400 Subject: [PATCH 4/4] perform full reindex once daily --- warehouse/search/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warehouse/search/__init__.py b/warehouse/search/__init__.py index 36b351668749..6cdf1f7a6b74 100644 --- a/warehouse/search/__init__.py +++ b/warehouse/search/__init__.py @@ -95,4 +95,4 @@ def includeme(config): from warehouse.search.tasks import reindex - config.add_periodic_task(crontab(minute=0, hour="*/3"), reindex) + config.add_periodic_task(crontab(minute=0, hour=6), reindex)