Skip to content

wrap search index mutation in a lock #3700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 150 additions & 9 deletions tests/unit/search/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@

import os

from contextlib import contextmanager

import celery
import elasticsearch
import packaging.version
import pretend
import pytest
import redis

from first import first

Expand Down Expand Up @@ -130,6 +134,11 @@ def __init__(self):
self.indices = FakeESIndices()


@contextmanager
def _not_lock(*a, **kw):
yield True


class TestReindex:

def test_fails_when_raising(self, db_request, monkeypatch):
Expand All @@ -144,6 +153,7 @@ def project_docs(db):
project_docs,
)

task = pretend.stub()
es_client = FakeESClient()

db_request.registry.update(
Expand All @@ -153,6 +163,10 @@ def project_docs(db):
},
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

class TestException(Exception):
pass

Expand All @@ -161,20 +175,47 @@ def parallel_bulk(client, iterable):
assert iterable is docs
raise TestException

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

monkeypatch.setattr(
warehouse.search.tasks, "parallel_bulk", parallel_bulk)

monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)

with pytest.raises(TestException):
reindex(db_request)
reindex(task, db_request)

assert es_client.indices.delete.calls == [
pretend.call(index='warehouse-cbcbcbcbcb'),
]
assert es_client.indices.put_settings.calls == []
assert es_client.indices.forcemerge.calls == []

def test_retry_on_lock(self, db_request, monkeypatch):
task = pretend.stub(
retry=pretend.call_recorder(
pretend.raiser(celery.exceptions.Retry)
)
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

le = redis.exceptions.LockError()
monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)))

with pytest.raises(celery.exceptions.Retry):
reindex(task, db_request)

assert task.retry.calls == [
pretend.call(countdown=60, exc=le)
]

def test_successfully_indexes_and_adds_new(self, db_request, monkeypatch):

docs = pretend.stub()
Expand All @@ -188,6 +229,7 @@ def project_docs(db):
project_docs,
)

task = pretend.stub()
es_client = FakeESClient()

db_request.registry.update(
Expand All @@ -198,13 +240,21 @@ def project_docs(db):
}
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

parallel_bulk = pretend.call_recorder(lambda client, iterable: [None])
monkeypatch.setattr(
warehouse.search.tasks, "parallel_bulk", parallel_bulk)

monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)

reindex(db_request)
reindex(task, db_request)

assert parallel_bulk.calls == [pretend.call(es_client, docs)]
assert es_client.indices.create.calls == [
Expand Down Expand Up @@ -251,6 +301,7 @@ def project_docs(db):
project_docs,
)

task = pretend.stub()
es_client = FakeESClient()
es_client.indices.indices["warehouse-aaaaaaaaaa"] = None
es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"]
Expand All @@ -265,13 +316,21 @@ def project_docs(db):
},
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

parallel_bulk = pretend.call_recorder(lambda client, iterable: [None])
monkeypatch.setattr(
warehouse.search.tasks, "parallel_bulk", parallel_bulk)

monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)

reindex(db_request)
reindex(task, db_request)

assert parallel_bulk.calls == [pretend.call(es_client, docs)]
assert es_client.indices.create.calls == [
Expand Down Expand Up @@ -323,6 +382,7 @@ def project_docs(db, project_name=None):
project_docs,
)

task = pretend.stub()
es_client = FakeESClient()

db_request.registry.update(
Expand All @@ -332,6 +392,10 @@ def project_docs(db, project_name=None):
},
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

class TestException(Exception):
pass

Expand All @@ -340,19 +404,24 @@ def parallel_bulk(client, iterable):
assert iterable is docs
raise TestException

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

monkeypatch.setattr(
warehouse.search.tasks, "parallel_bulk", parallel_bulk)

with pytest.raises(TestException):
reindex_project(db_request, 'foo')
reindex_project(task, db_request, 'foo')

assert es_client.indices.put_settings.calls == []
assert es_client.indices.forcemerge.calls == []

def test_unindex_fails_when_raising(self, db_request):
def test_unindex_fails_when_raising(self, db_request, monkeypatch):
class TestException(Exception):
pass

task = pretend.stub()
es_client = FakeESClient()
es_client.delete = pretend.raiser(TestException)

Expand All @@ -363,10 +432,65 @@ class TestException(Exception):
},
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

with pytest.raises(TestException):
unindex_project(db_request, 'foo')
unindex_project(task, db_request, 'foo')

def test_unindex_retry_on_lock(self, db_request, monkeypatch):
task = pretend.stub(
retry=pretend.call_recorder(
pretend.raiser(celery.exceptions.Retry)
)
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

le = redis.exceptions.LockError()
monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)))

with pytest.raises(celery.exceptions.Retry):
unindex_project(task, db_request, "foo")

assert task.retry.calls == [
pretend.call(countdown=60, exc=le)
]

def test_reindex_retry_on_lock(self, db_request, monkeypatch):
task = pretend.stub(
retry=pretend.call_recorder(
pretend.raiser(celery.exceptions.Retry)
)
)

def test_unindex_accepts_defeat(self, db_request):
db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

le = redis.exceptions.LockError()
monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)))

with pytest.raises(celery.exceptions.Retry):
reindex_project(task, db_request, "foo")

assert task.retry.calls == [
pretend.call(countdown=60, exc=le)
]

def test_unindex_accepts_defeat(self, db_request, monkeypatch):
task = pretend.stub()
es_client = FakeESClient()
es_client.delete = pretend.call_recorder(
pretend.raiser(elasticsearch.exceptions.NotFoundError))
Expand All @@ -378,7 +502,15 @@ def test_unindex_accepts_defeat(self, db_request):
},
)

unindex_project(db_request, 'foo')
db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

unindex_project(task, db_request, 'foo')

assert es_client.delete.calls == [
pretend.call(index="warehouse", doc_type="project", id="foo")
Expand All @@ -396,6 +528,7 @@ def project_docs(db, project_name=None):
project_docs,
)

task = pretend.stub()
es_client = FakeESClient()
es_client.indices.indices["warehouse-aaaaaaaaaa"] = None
es_client.indices.aliases["warehouse"] = ["warehouse-aaaaaaaaaa"]
Expand All @@ -410,11 +543,19 @@ def project_docs(db, project_name=None):
},
)

db_request.registry.settings = {
"celery.scheduler_url": "redis://redis:6379/0",
}

monkeypatch.setattr(
redis.StrictRedis, "from_url",
lambda *a, **kw: pretend.stub(lock=_not_lock))

parallel_bulk = pretend.call_recorder(lambda client, iterable: [None])
monkeypatch.setattr(
warehouse.search.tasks, "parallel_bulk", parallel_bulk)

reindex_project(db_request, 'foo')
reindex_project(task, db_request, 'foo')

assert parallel_bulk.calls == [pretend.call(es_client, docs)]
assert es_client.indices.create.calls == []
Expand Down
Loading