Skip to content

Incremental search index updates 2.0 #4328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 21, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions tests/unit/search/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,59 @@

from warehouse import search

from ...common.db.packaging import ProjectFactory, ReleaseFactory


def test_store_projects(db_request):
project0 = ProjectFactory.create()
project1 = ProjectFactory.create()
release1 = ReleaseFactory.create(project=project1)
config = pretend.stub()
session = pretend.stub(info={}, new={project0}, dirty=set(), deleted={release1})

search.store_projects_for_project_reindex(config, session, pretend.stub())

assert session.info["warehouse.search.project_updates"] == {project0, project1}
assert session.info["warehouse.search.project_deletes"] == set()


def test_store_projects_unindex(db_request):
project0 = ProjectFactory.create()
project1 = ProjectFactory.create()
config = pretend.stub()
session = pretend.stub(info={}, new={project0}, dirty=set(), deleted={project1})

search.store_projects_for_project_reindex(config, session, pretend.stub())

assert session.info["warehouse.search.project_updates"] == {project0}
assert session.info["warehouse.search.project_deletes"] == {project1}


def test_execute_reindex_success(app_config):
_delay = pretend.call_recorder(lambda x: None)
app_config.task = lambda x: pretend.stub(delay=_delay)
session = pretend.stub(
info={"warehouse.search.project_updates": {pretend.stub(normalized_name="foo")}}
)

search.execute_project_reindex(app_config, session)

assert _delay.calls == [pretend.call("foo")]
assert "warehouse.search.project_updates" not in session.info


def test_execute_unindex_success(app_config):
_delay = pretend.call_recorder(lambda x: None)
app_config.task = lambda x: pretend.stub(delay=_delay)
session = pretend.stub(
info={"warehouse.search.project_deletes": {pretend.stub(normalized_name="foo")}}
)

search.execute_project_reindex(app_config, session)

assert _delay.calls == [pretend.call("foo")]
assert "warehouse.search.project_deletes" not in session.info


def test_es(monkeypatch):
search_obj = pretend.stub()
Expand Down
131 changes: 130 additions & 1 deletion tests/unit/search/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@

import os

import elasticsearch
import packaging.version
import pretend
import pytest

from first import first

import warehouse.search.tasks
from warehouse.search.tasks import reindex, _project_docs
from warehouse.search.tasks import (
reindex,
reindex_project,
unindex_project,
_project_docs,
)


from ...common.db.packaging import ProjectFactory, ReleaseFactory

Expand Down Expand Up @@ -51,6 +58,34 @@ def test_project_docs(db_session):
]


def test_single_project_doc(db_session):
projects = [ProjectFactory.create() for _ in range(2)]
releases = {
p: sorted(
[ReleaseFactory.create(project=p) for _ in range(3)],
key=lambda r: packaging.version.parse(r.version),
reverse=True,
)
for p in projects
}

assert list(_project_docs(db_session, project_name=projects[1].name)) == [
{
"_id": p.normalized_name,
"_type": "doc",
"_source": {
"created": p.created,
"name": p.name,
"normalized_name": p.normalized_name,
"version": [r.version for r in prs],
"latest_version": first(prs, key=lambda r: not r.is_prerelease).version,
},
}
for p, prs in sorted(releases.items(), key=lambda x: x[0].name.lower())
if p.name == projects[1].name
]


class FakeESIndices:
def __init__(self):
self.indices = {}
Expand Down Expand Up @@ -241,3 +276,97 @@ def project_docs(db):
body={"index": {"number_of_replicas": 0, "refresh_interval": "1s"}},
)
]


class TestPartialReindex:
def test_reindex_fails_when_raising(self, db_request, monkeypatch):
docs = pretend.stub()

def project_docs(db, project_name=None):
return docs

monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs)

es_client = FakeESClient()

db_request.registry.update(
{"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"}
)

class TestException(Exception):
pass

def parallel_bulk(client, iterable):
assert client is es_client
assert iterable is docs
raise TestException

monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)

with pytest.raises(TestException):
reindex_project(db_request, "foo")

assert es_client.indices.put_settings.calls == []

def test_unindex_fails_when_raising(self, db_request):
class TestException(Exception):
pass

es_client = FakeESClient()
es_client.delete = pretend.raiser(TestException)

db_request.registry.update(
{"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"}
)

with pytest.raises(TestException):
unindex_project(db_request, "foo")

def test_unindex_accepts_defeat(self, db_request):
es_client = FakeESClient()
es_client.delete = pretend.call_recorder(
pretend.raiser(elasticsearch.exceptions.NotFoundError)
)

db_request.registry.update(
{"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"}
)

unindex_project(db_request, "foo")

assert es_client.delete.calls == [
pretend.call(index="warehouse", doc_type="project", id="foo")
]

def test_successfully_indexes(self, db_request, monkeypatch):
docs = pretend.stub()

def project_docs(db, project_name=None):
return docs

monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs)

es_client = FakeESClient()
es_client.indices.indices["warehouse-aaaaaaaaaa"] = None
es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"]
db_engine = pretend.stub()

db_request.registry.update(
{
"elasticsearch.client": es_client,
"elasticsearch.index": "warehouse",
"elasticsearch.shards": 42,
"sqlalchemy.engine": db_engine,
}
)

parallel_bulk = pretend.call_recorder(lambda client, iterable: [None])
monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)

reindex_project(db_request, "foo")

assert parallel_bulk.calls == [pretend.call(es_client, docs)]
assert es_client.indices.create.calls == []
assert es_client.indices.delete.calls == []
assert es_client.indices.aliases == {"warehouse": ["warehouse-aaaaaaaaaa"]}
assert es_client.indices.put_settings.calls == []
42 changes: 42 additions & 0 deletions warehouse/search/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,49 @@
from celery.schedules import crontab
from elasticsearch_dsl import serializer

from warehouse import db
from warehouse.search.utils import get_index
from warehouse.packaging.models import Project, Release


@db.listens_for(db.Session, "after_flush")
def store_projects_for_project_reindex(config, session, flush_context):
# We'll (ab)use the session.info dictionary to store a list of pending
# Project updates to the session.
projects_to_update = session.info.setdefault(
"warehouse.search.project_updates", set()
)
projects_to_delete = session.info.setdefault(
"warehouse.search.project_deletes", set()
)

# Go through each new, changed, and deleted object and attempt to store
# a Project to reindex for when the session has been committed.
for obj in session.new | session.dirty:
if obj.__class__ == Project:
projects_to_update.add(obj)
if obj.__class__ == Release:
projects_to_update.add(obj.project)

for obj in session.deleted:
if obj.__class__ == Project:
projects_to_delete.add(obj)
if obj.__class__ == Release:
projects_to_update.add(obj.project)


@db.listens_for(db.Session, "after_commit")
def execute_project_reindex(config, session):
projects_to_update = session.info.pop("warehouse.search.project_updates", set())
projects_to_delete = session.info.pop("warehouse.search.project_deletes", set())

from warehouse.search.tasks import reindex_project, unindex_project

for project in projects_to_update:
config.task(reindex_project).delay(project.normalized_name)

for project in projects_to_delete:
config.task(unindex_project).delay(project.normalized_name)


def es(request):
Expand Down
35 changes: 33 additions & 2 deletions warehouse/search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from warehouse.utils.db import windowed_query


def _project_docs(db):
def _project_docs(db, project_name=None):

releases_list = (
db.query(Release.name, Release.version)
Expand All @@ -38,9 +38,13 @@ def _project_docs(db):
Release._pypi_ordering.desc(),
)
.distinct(Release.name)
.subquery("release_list")
)

if project_name:
releases_list = releases_list.filter(Release.name == project_name)

releases_list = releases_list.subquery()

r = aliased(Release, name="r")

all_versions = (
Expand Down Expand Up @@ -182,3 +186,30 @@ def reindex(request):
client.indices.delete(",".join(to_delete))
else:
client.indices.put_alias(name=index_base, index=new_index_name)


@tasks.task(ignore_result=True, acks_late=True)
def reindex_project(request, project_name):
client = request.registry["elasticsearch.client"]
doc_types = request.registry.get("search.doc_types", set())
index_name = request.registry["elasticsearch.index"]
get_index(
index_name,
doc_types,
using=client,
shards=request.registry.get("elasticsearch.shards", 1),
replicas=request.registry.get("elasticsearch.replicas", 0),
)

for _ in parallel_bulk(client, _project_docs(request.db, project_name)):
pass


@tasks.task(ignore_result=True, acks_late=True)
def unindex_project(request, project_name):
client = request.registry["elasticsearch.client"]
index_name = request.registry["elasticsearch.index"]
try:
client.delete(index=index_name, doc_type="project", id=project_name)
except elasticsearch.exceptions.NotFoundError:
pass