diff --git a/docker-compose.yml b/docker-compose.yml index c3adc6fd816e..1923d197dff8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -64,12 +64,10 @@ services: context: . args: DEVEL: "yes" - command: hupper -m celery -A warehouse worker -B -S redbeat.RedBeatScheduler -l info + command: celery -A warehouse worker -B -S redbeat.RedBeatScheduler -l info env_file: dev/environment environment: C_FORCE_ROOT: "1" - volumes: - - ./warehouse:/opt/warehouse/src/warehouse:z links: - db - redis diff --git a/tests/unit/search/test_init.py b/tests/unit/search/test_init.py index 4d8887c2d756..adf374c96ea4 100644 --- a/tests/unit/search/test_init.py +++ b/tests/unit/search/test_init.py @@ -14,84 +14,6 @@ from warehouse import search -from ...common.db.packaging import ProjectFactory, ReleaseFactory - - -def test_store_projects(db_request): - project0 = ProjectFactory.create() - project1 = ProjectFactory.create() - release1 = ReleaseFactory.create(project=project1) - config = pretend.stub() - session = pretend.stub( - info={}, - new={project0}, - dirty=set(), - deleted={release1}, - ) - - search.store_projects_for_project_reindex(config, session, pretend.stub()) - - assert session.info["warehouse.search.project_updates"] == { - project0, - project1, - } - assert session.info["warehouse.search.project_deletes"] == set() - - -def test_store_projects_unindex(db_request): - project0 = ProjectFactory.create() - project1 = ProjectFactory.create() - config = pretend.stub() - session = pretend.stub( - info={}, - new={project0}, - dirty=set(), - deleted={project1}, - ) - - search.store_projects_for_project_reindex(config, session, pretend.stub()) - - assert session.info["warehouse.search.project_updates"] == { - project0, - } - assert session.info["warehouse.search.project_deletes"] == { - project1, - } - - -def test_execute_reindex_success(app_config): - _delay = pretend.call_recorder(lambda x: None) - app_config.task = lambda x: pretend.stub(delay=_delay) - session = pretend.stub( - info={ - "warehouse.search.project_updates": { - pretend.stub(normalized_name="foo"), - }, - }, - ) - - search.execute_project_reindex(app_config, session) - - assert _delay.calls == [pretend.call("foo")] - assert "warehouse.search.project_updates" not in session.info - - -def test_execute_unindex_success(app_config): - _delay = pretend.call_recorder(lambda x: None) - app_config.task = lambda x: pretend.stub(delay=_delay) - session = pretend.stub( - info={ - "warehouse.search.project_deletes": { - pretend.stub(normalized_name="foo"), - }, - }, - ) - - search.execute_project_reindex(app_config, session) - - assert _delay.calls == [pretend.call("foo")] - assert "warehouse.search.project_deletes" not in session.info - def test_es(monkeypatch): search_obj = pretend.stub() diff --git a/tests/unit/search/test_tasks.py b/tests/unit/search/test_tasks.py index 3fbd34a24b79..67ee29305628 100644 --- a/tests/unit/search/test_tasks.py +++ b/tests/unit/search/test_tasks.py @@ -12,21 +12,14 @@ import os -from contextlib import contextmanager - -import celery -import elasticsearch import packaging.version import pretend import pytest -import redis from first import first import warehouse.search.tasks -from warehouse.search.tasks import ( - reindex, reindex_project, unindex_project, _project_docs -) +from warehouse.search.tasks import reindex, _project_docs from ...common.db.packaging import ProjectFactory, ReleaseFactory @@ -61,37 +54,6 @@ def test_project_docs(db_session): ] -def test_single_project_doc(db_session): - projects = [ProjectFactory.create() for _ in range(2)] - releases = { - p: sorted( - [ReleaseFactory.create(project=p) for _ in range(3)], - key=lambda r: packaging.version.parse(r.version), - reverse=True, - ) - for p in projects - } - - assert list(_project_docs(db_session, project_name=projects[1].name)) == [ - { - "_id": p.normalized_name, - "_type": "project", - "_source": { - "created": p.created, - "name": p.name, - "normalized_name": p.normalized_name, - "version": [r.version for r in prs], - "latest_version": first( - prs, - key=lambda r: not r.is_prerelease, - ).version, - }, - } - for p, prs in sorted(releases.items(), key=lambda x: x[0].name.lower()) - if p.name == projects[1].name - ] - - class FakeESIndices: def __init__(self): @@ -134,11 +96,6 @@ def __init__(self): self.indices = FakeESIndices() -@contextmanager -def _not_lock(*a, **kw): - yield True - - class TestReindex: def test_fails_when_raising(self, db_request, monkeypatch): @@ -153,7 +110,6 @@ def project_docs(db): project_docs, ) - task = pretend.stub() es_client = FakeESClient() db_request.registry.update( @@ -163,10 +119,6 @@ def project_docs(db): }, ) - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - class TestException(Exception): pass @@ -175,17 +127,13 @@ def parallel_bulk(client, iterable): assert iterable is docs raise TestException - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) with pytest.raises(TestException): - reindex(task, db_request) + reindex(db_request) assert es_client.indices.delete.calls == [ pretend.call(index='warehouse-cbcbcbcbcb'), @@ -193,29 +141,6 @@ def parallel_bulk(client, iterable): assert es_client.indices.put_settings.calls == [] assert es_client.indices.forcemerge.calls == [] - def test_retry_on_lock(self, db_request, monkeypatch): - task = pretend.stub( - retry=pretend.call_recorder( - pretend.raiser(celery.exceptions.Retry) - ) - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - le = redis.exceptions.LockError() - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=pretend.raiser(le))) - - with pytest.raises(celery.exceptions.Retry): - reindex(task, db_request) - - assert task.retry.calls == [ - pretend.call(countdown=60, exc=le) - ] - def test_successfully_indexes_and_adds_new(self, db_request, monkeypatch): docs = pretend.stub() @@ -229,7 +154,6 @@ def project_docs(db): project_docs, ) - task = pretend.stub() es_client = FakeESClient() db_request.registry.update( @@ -240,21 +164,13 @@ def project_docs(db): } ) - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) - reindex(task, db_request) + reindex(db_request) assert parallel_bulk.calls == [pretend.call(es_client, docs)] assert es_client.indices.create.calls == [ @@ -301,7 +217,6 @@ def project_docs(db): project_docs, ) - task = pretend.stub() es_client = FakeESClient() es_client.indices.indices["warehouse-aaaaaaaaaa"] = None es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"] @@ -316,21 +231,13 @@ def project_docs(db): }, ) - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) monkeypatch.setattr( warehouse.search.tasks, "parallel_bulk", parallel_bulk) monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n) - reindex(task, db_request) + reindex(db_request) assert parallel_bulk.calls == [pretend.call(es_client, docs)] assert es_client.indices.create.calls == [ @@ -366,202 +273,3 @@ def project_docs(db): assert es_client.indices.forcemerge.calls == [ pretend.call(index='warehouse-cbcbcbcbcb') ] - - -class TestPartialReindex: - - def test_reindex_fails_when_raising(self, db_request, monkeypatch): - docs = pretend.stub() - - def project_docs(db, project_name=None): - return docs - - monkeypatch.setattr( - warehouse.search.tasks, - "_project_docs", - project_docs, - ) - - task = pretend.stub() - es_client = FakeESClient() - - db_request.registry.update( - { - "elasticsearch.client": es_client, - "elasticsearch.index": "warehouse", - }, - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - class TestException(Exception): - pass - - def parallel_bulk(client, iterable): - assert client is es_client - assert iterable is docs - raise TestException - - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - - monkeypatch.setattr( - warehouse.search.tasks, "parallel_bulk", parallel_bulk) - - with pytest.raises(TestException): - reindex_project(task, db_request, 'foo') - - assert es_client.indices.put_settings.calls == [] - assert es_client.indices.forcemerge.calls == [] - - def test_unindex_fails_when_raising(self, db_request, monkeypatch): - class TestException(Exception): - pass - - task = pretend.stub() - es_client = FakeESClient() - es_client.delete = pretend.raiser(TestException) - - db_request.registry.update( - { - "elasticsearch.client": es_client, - "elasticsearch.index": "warehouse", - }, - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - - with pytest.raises(TestException): - unindex_project(task, db_request, 'foo') - - def test_unindex_retry_on_lock(self, db_request, monkeypatch): - task = pretend.stub( - retry=pretend.call_recorder( - pretend.raiser(celery.exceptions.Retry) - ) - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - le = redis.exceptions.LockError() - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=pretend.raiser(le))) - - with pytest.raises(celery.exceptions.Retry): - unindex_project(task, db_request, "foo") - - assert task.retry.calls == [ - pretend.call(countdown=60, exc=le) - ] - - def test_reindex_retry_on_lock(self, db_request, monkeypatch): - task = pretend.stub( - retry=pretend.call_recorder( - pretend.raiser(celery.exceptions.Retry) - ) - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - le = redis.exceptions.LockError() - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=pretend.raiser(le))) - - with pytest.raises(celery.exceptions.Retry): - reindex_project(task, db_request, "foo") - - assert task.retry.calls == [ - pretend.call(countdown=60, exc=le) - ] - - def test_unindex_accepts_defeat(self, db_request, monkeypatch): - task = pretend.stub() - es_client = FakeESClient() - es_client.delete = pretend.call_recorder( - pretend.raiser(elasticsearch.exceptions.NotFoundError)) - - db_request.registry.update( - { - "elasticsearch.client": es_client, - "elasticsearch.index": "warehouse", - }, - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - - unindex_project(task, db_request, 'foo') - - assert es_client.delete.calls == [ - pretend.call(index="warehouse", doc_type="project", id="foo") - ] - - def test_successfully_indexes(self, db_request, monkeypatch): - docs = pretend.stub() - - def project_docs(db, project_name=None): - return docs - - monkeypatch.setattr( - warehouse.search.tasks, - "_project_docs", - project_docs, - ) - - task = pretend.stub() - es_client = FakeESClient() - es_client.indices.indices["warehouse-aaaaaaaaaa"] = None - es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"] - db_engine = pretend.stub() - - db_request.registry.update( - { - "elasticsearch.client": es_client, - "elasticsearch.index": "warehouse", - "elasticsearch.shards": 42, - "sqlalchemy.engine": db_engine, - }, - ) - - db_request.registry.settings = { - "celery.scheduler_url": "redis://redis:6379/0", - } - - monkeypatch.setattr( - redis.StrictRedis, "from_url", - lambda *a, **kw: pretend.stub(lock=_not_lock)) - - parallel_bulk = pretend.call_recorder(lambda client, iterable: [None]) - monkeypatch.setattr( - warehouse.search.tasks, "parallel_bulk", parallel_bulk) - - reindex_project(task, db_request, 'foo') - - assert parallel_bulk.calls == [pretend.call(es_client, docs)] - assert es_client.indices.create.calls == [] - assert es_client.indices.delete.calls == [] - assert es_client.indices.aliases == { - "warehouse": ["warehouse-aaaaaaaaaa"], - } - assert es_client.indices.put_settings.calls == [] - assert es_client.indices.forcemerge.calls == [] diff --git a/warehouse/search/__init__.py b/warehouse/search/__init__.py index 4ed120e07520..97da477f815a 100644 --- a/warehouse/search/__init__.py +++ b/warehouse/search/__init__.py @@ -18,49 +18,7 @@ from celery.schedules import crontab from elasticsearch_dsl import serializer -from warehouse import db from warehouse.search.utils import get_index -from warehouse.packaging.models import Project, Release - - -@db.listens_for(db.Session, "after_flush") -def store_projects_for_project_reindex(config, session, flush_context): - # We'll (ab)use the session.info dictionary to store a list of pending - # Project updates to the session. - projects_to_update = session.info.setdefault( - "warehouse.search.project_updates", set()) - projects_to_delete = session.info.setdefault( - "warehouse.search.project_deletes", set()) - - # Go through each new, changed, and deleted object and attempt to store - # a Project to reindex for when the session has been committed. - for obj in (session.new | session.dirty): - if obj.__class__ == Project: - projects_to_update.add(obj) - if obj.__class__ == Release: - projects_to_update.add(obj.project) - - for obj in (session.deleted): - if obj.__class__ == Project: - projects_to_delete.add(obj) - if obj.__class__ == Release: - projects_to_update.add(obj.project) - - -@db.listens_for(db.Session, "after_commit") -def execute_project_reindex(config, session): - projects_to_update = session.info.pop( - "warehouse.search.project_updates", set()) - projects_to_delete = session.info.pop( - "warehouse.search.project_deletes", set()) - - from warehouse.search.tasks import reindex_project, unindex_project - - for project in projects_to_update: - config.task(reindex_project).delay(project.normalized_name) - - for project in projects_to_delete: - config.task(unindex_project).delay(project.normalized_name) def es(request): diff --git a/warehouse/search/tasks.py b/warehouse/search/tasks.py index 742442b00665..f027f5ba706e 100644 --- a/warehouse/search/tasks.py +++ b/warehouse/search/tasks.py @@ -13,9 +13,6 @@ import binascii import os -import elasticsearch -import redis - from elasticsearch.helpers import parallel_bulk from sqlalchemy import and_, func from sqlalchemy.orm import aliased @@ -28,7 +25,7 @@ from warehouse.utils.db import windowed_query -def _project_docs(db, project_name=None): +def _project_docs(db): releases_list = ( db.query(Release.name, Release.version) @@ -38,13 +35,9 @@ def _project_docs(db, project_name=None): Release._pypi_ordering.desc(), ) .distinct(Release.name) + .subquery("release_list") ) - if project_name: - releases_list = releases_list.filter(Release.name == project_name) - - releases_list = releases_list.subquery() - r = aliased(Release, name="r") all_versions = ( @@ -100,128 +93,75 @@ def _project_docs(db, project_name=None): yield p.to_dict(include_meta=True) -@tasks.task(bind=True, ignore_result=True, acks_late=True) -def reindex(self, request): +@tasks.task(ignore_result=True, acks_late=True) +def reindex(request): """ Recreate the Search Index. """ - r = redis.StrictRedis.from_url( - request.registry.settings["celery.scheduler_url"]) - try: - with r.lock('search-index', timeout=15 * 60, blocking_timeout=30): - client = request.registry["elasticsearch.client"] - number_of_replicas = request.registry.get( - "elasticsearch.replicas", 0) - refresh_interval = request.registry.get( - "elasticsearch.interval", "1s") - - # We use a randomly named index so that we can do a zero downtime - # reindex. Essentially we'll use a randomly named index which we - # will use until all of the data has been reindexed, at which point - # we'll point an alias at our randomly named index, and then delete - # the old randomly named index. - - # Create the new index and associate all of our doc types with it. - index_base = request.registry["elasticsearch.index"] - random_token = binascii.hexlify(os.urandom(5)).decode("ascii") - new_index_name = "{}-{}".format(index_base, random_token) - doc_types = request.registry.get("search.doc_types", set()) - shards = request.registry.get("elasticsearch.shards", 1) - - # Create the new index with zero replicas and index refreshes - # disabled while we are bulk indexing. - new_index = get_index( - new_index_name, - doc_types, - using=client, - shards=shards, - replicas=0, - interval="-1", - ) - new_index.create(wait_for_active_shards=shards) - - # From this point on, if any error occurs, we want to be able to - # delete our in progress index. - try: - request.db.execute("SET statement_timeout = '600s'") - - for _ in parallel_bulk(client, _project_docs(request.db)): - pass - except: # noqa - new_index.delete() - raise - finally: - request.db.rollback() - request.db.close() - - # Now that we've finished indexing all of our data we can optimize - # it and update the replicas and refresh intervals. - client.indices.forcemerge(index=new_index_name) - client.indices.put_settings( - index=new_index_name, - body={ - "index": { - "number_of_replicas": number_of_replicas, - "refresh_interval": refresh_interval, - } - } - ) - - # Point the alias at our new randomly named index and delete the - # old index. - if client.indices.exists_alias(name=index_base): - to_delete = set() - actions = [] - for name in client.indices.get_alias(name=index_base): - to_delete.add(name) - actions.append( - {"remove": {"index": name, "alias": index_base}}) - actions.append( - {"add": {"index": new_index_name, "alias": index_base}}) - client.indices.update_aliases({"actions": actions}) - client.indices.delete(",".join(to_delete)) - else: - client.indices.put_alias(name=index_base, index=new_index_name) - except redis.exceptions.LockError as exc: - raise self.retry(countdown=60, exc=exc) - - -@tasks.task(bind=True, ignore_result=True, acks_late=True) -def reindex_project(self, request, project_name): - r = redis.StrictRedis.from_url( - request.registry.settings["celery.scheduler_url"]) - try: - with r.lock('search-index', timeout=15, blocking_timeout=1): - client = request.registry["elasticsearch.client"] - doc_types = request.registry.get("search.doc_types", set()) - index_name = request.registry["elasticsearch.index"] - get_index( - index_name, - doc_types, - using=client, - shards=request.registry.get("elasticsearch.shards", 1), - replicas=request.registry.get("elasticsearch.replicas", 0), - ) - - for _ in parallel_bulk(client, - _project_docs(request.db, project_name)): - pass - except redis.exceptions.LockError as exc: - raise self.retry(countdown=60, exc=exc) - - -@tasks.task(bind=True, ignore_result=True, acks_late=True) -def unindex_project(self, request, project_name): - r = redis.StrictRedis.from_url( - request.registry.settings["celery.scheduler_url"]) + client = request.registry["elasticsearch.client"] + number_of_replicas = request.registry.get("elasticsearch.replicas", 0) + refresh_interval = request.registry.get("elasticsearch.interval", "1s") + + # We use a randomly named index so that we can do a zero downtime reindex. + # Essentially we'll use a randomly named index which we will use until all + # of the data has been reindexed, at which point we'll point an alias at + # our randomly named index, and then delete the old randomly named index. + + # Create the new index and associate all of our doc types with it. + index_base = request.registry["elasticsearch.index"] + random_token = binascii.hexlify(os.urandom(5)).decode("ascii") + new_index_name = "{}-{}".format(index_base, random_token) + doc_types = request.registry.get("search.doc_types", set()) + shards = request.registry.get("elasticsearch.shards", 1) + + # Create the new index with zero replicas and index refreshes disabled + # while we are bulk indexing. + new_index = get_index( + new_index_name, + doc_types, + using=client, + shards=shards, + replicas=0, + interval="-1", + ) + new_index.create(wait_for_active_shards=shards) + + # From this point on, if any error occurs, we want to be able to delete our + # in progress index. try: - with r.lock('search-index', timeout=15, blocking_timeout=1): - client = request.registry["elasticsearch.client"] - index_name = request.registry["elasticsearch.index"] - try: - client.delete( - index=index_name, doc_type="project", id=project_name) - except elasticsearch.exceptions.NotFoundError: - pass - except redis.exceptions.LockError as exc: - raise self.retry(countdown=60, exc=exc) + request.db.execute("SET statement_timeout = '600s'") + + for _ in parallel_bulk(client, _project_docs(request.db)): + pass + except: # noqa + new_index.delete() + raise + finally: + request.db.rollback() + request.db.close() + + # Now that we've finished indexing all of our data we can optimize it and + # update the replicas and refresh intervals. + client.indices.forcemerge(index=new_index_name) + client.indices.put_settings( + index=new_index_name, + body={ + "index": { + "number_of_replicas": number_of_replicas, + "refresh_interval": refresh_interval, + } + } + ) + + # Point the alias at our new randomly named index and delete the old index. + if client.indices.exists_alias(name=index_base): + to_delete = set() + actions = [] + for name in client.indices.get_alias(name=index_base): + to_delete.add(name) + actions.append({"remove": {"index": name, "alias": index_base}}) + actions.append({"add": {"index": new_index_name, "alias": index_base}}) + client.indices.update_aliases({"actions": actions}) + client.indices.delete(",".join(to_delete)) + else: + client.indices.put_alias(name=index_base, index=new_index_name)