Skip to content

Commit e01b506

Browse files
committed
use a lock to keep incrementals from running during full reindex
1 parent fed7f2d commit e01b506

File tree

2 files changed

+277
-115
lines changed

2 files changed

+277
-115
lines changed

tests/unit/search/test_tasks.py

Lines changed: 140 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212

1313
import os
1414

15+
import celery
1516
import elasticsearch
1617
import packaging.version
1718
import pretend
1819
import pytest
20+
import redis
1921

2022
from first import first
2123

@@ -25,6 +27,7 @@
2527
reindex_project,
2628
unindex_project,
2729
_project_docs,
30+
SearchLock,
2831
)
2932

3033

@@ -125,6 +128,34 @@ def __init__(self):
125128
self.indices = FakeESIndices()
126129

127130

131+
class NotLock:
132+
def __init__(*a, **kw):
133+
pass
134+
135+
def acquire(self):
136+
return True
137+
138+
def release(self):
139+
return True
140+
141+
142+
class TestSearchLock:
143+
def test_success(self):
144+
lock_stub = pretend.stub(acquire=pretend.call_recorder(lambda: True))
145+
r = pretend.stub(lock=lambda *a, **kw: lock_stub)
146+
test_lock = SearchLock(r)
147+
test_lock.__enter__()
148+
assert lock_stub.acquire.calls == [pretend.call()]
149+
150+
def test_failure(self):
151+
lock_stub = pretend.stub(acquire=pretend.call_recorder(lambda: False))
152+
r = pretend.stub(lock=lambda *a, **kw: lock_stub)
153+
test_lock = SearchLock(r)
154+
with pytest.raises(redis.exceptions.LockError):
155+
test_lock.__enter__()
156+
assert lock_stub.acquire.calls == [pretend.call()]
157+
158+
128159
class TestReindex:
129160
def test_fails_when_raising(self, db_request, monkeypatch):
130161
docs = pretend.stub()
@@ -134,10 +165,14 @@ def project_docs(db):
134165

135166
monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs)
136167

168+
task = pretend.stub()
137169
es_client = FakeESClient()
138170

139171
db_request.registry.update({"elasticsearch.index": "warehouse"})
140-
db_request.registry.settings = {"elasticsearch.url": "http://some.url"}
172+
db_request.registry.settings = {
173+
"elasticsearch.url": "http://some.url",
174+
"celery.scheduler_url": "redis://redis:6379/0",
175+
}
141176
monkeypatch.setattr(
142177
warehouse.search.tasks.elasticsearch,
143178
"Elasticsearch",
@@ -153,18 +188,41 @@ def parallel_bulk(client, iterable, index=None):
153188
assert index == "warehouse-cbcbcbcbcb"
154189
raise TestException
155190

191+
monkeypatch.setattr(
192+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
193+
)
194+
156195
monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)
157196

158197
monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)
159198

160199
with pytest.raises(TestException):
161-
reindex(db_request)
200+
reindex(task, db_request)
162201

163202
assert es_client.indices.delete.calls == [
164203
pretend.call(index="warehouse-cbcbcbcbcb")
165204
]
166205
assert es_client.indices.put_settings.calls == []
167206

207+
def test_retry_on_lock(self, db_request, monkeypatch):
208+
task = pretend.stub(
209+
retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry))
210+
)
211+
212+
db_request.registry.settings = {"celery.scheduler_url": "redis://redis:6379/0"}
213+
214+
le = redis.exceptions.LockError()
215+
monkeypatch.setattr(
216+
redis.StrictRedis,
217+
"from_url",
218+
lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)),
219+
)
220+
221+
with pytest.raises(celery.exceptions.Retry):
222+
reindex(task, db_request)
223+
224+
assert task.retry.calls == [pretend.call(countdown=60, exc=le)]
225+
168226
def test_successfully_indexes_and_adds_new(self, db_request, monkeypatch):
169227

170228
docs = pretend.stub()
@@ -174,24 +232,31 @@ def project_docs(db):
174232

175233
monkeypatch.setattr(warehouse.search.tasks, "_project_docs", project_docs)
176234

235+
task = pretend.stub()
177236
es_client = FakeESClient()
178237

179238
db_request.registry.update(
180239
{"elasticsearch.index": "warehouse", "elasticsearch.shards": 42}
181240
)
182-
db_request.registry.settings = {"elasticsearch.url": "http://some.url"}
241+
db_request.registry.settings = {
242+
"elasticsearch.url": "http://some.url",
243+
"celery.scheduler_url": "redis://redis:6379/0",
244+
}
183245
monkeypatch.setattr(
184246
warehouse.search.tasks.elasticsearch,
185247
"Elasticsearch",
186248
lambda *a, **kw: es_client,
187249
)
250+
monkeypatch.setattr(
251+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
252+
)
188253

189254
parallel_bulk = pretend.call_recorder(lambda client, iterable, index: [None])
190255
monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)
191256

192257
monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)
193258

194-
reindex(db_request)
259+
reindex(task, db_request)
195260

196261
assert parallel_bulk.calls == [
197262
pretend.call(es_client, docs, index="warehouse-cbcbcbcbcb")
@@ -220,6 +285,7 @@ def project_docs(db):
220285

221286
def test_successfully_indexes_and_replaces(self, db_request, monkeypatch):
222287
docs = pretend.stub()
288+
task = pretend.stub()
223289

224290
def project_docs(db):
225291
return docs
@@ -238,19 +304,25 @@ def project_docs(db):
238304
"sqlalchemy.engine": db_engine,
239305
}
240306
)
241-
db_request.registry.settings = {"elasticsearch.url": "http://some.url"}
307+
db_request.registry.settings = {
308+
"elasticsearch.url": "http://some.url",
309+
"celery.scheduler_url": "redis://redis:6379/0",
310+
}
242311
monkeypatch.setattr(
243312
warehouse.search.tasks.elasticsearch,
244313
"Elasticsearch",
245314
lambda *a, **kw: es_client,
246315
)
316+
monkeypatch.setattr(
317+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
318+
)
247319

248320
parallel_bulk = pretend.call_recorder(lambda client, iterable, index: [None])
249321
monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)
250322

251323
monkeypatch.setattr(os, "urandom", lambda n: b"\xcb" * n)
252324

253-
reindex(db_request)
325+
reindex(task, db_request)
254326

255327
assert parallel_bulk.calls == [
256328
pretend.call(es_client, docs, index="warehouse-cbcbcbcbcb")
@@ -281,6 +353,7 @@ def project_docs(db):
281353
class TestPartialReindex:
282354
def test_reindex_fails_when_raising(self, db_request, monkeypatch):
283355
docs = pretend.stub()
356+
task = pretend.stub()
284357

285358
def project_docs(db, project_name=None):
286359
return docs
@@ -302,44 +375,96 @@ def parallel_bulk(client, iterable, index=None):
302375
raise TestException
303376

304377
monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)
378+
monkeypatch.setattr(
379+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
380+
)
305381

306382
with pytest.raises(TestException):
307-
reindex_project(db_request, "foo")
383+
reindex_project(task, db_request, "foo")
308384

309385
assert es_client.indices.put_settings.calls == []
310386

311-
def test_unindex_fails_when_raising(self, db_request):
387+
def test_unindex_fails_when_raising(self, db_request, monkeypatch):
388+
task = pretend.stub()
389+
312390
class TestException(Exception):
313391
pass
314392

315393
es_client = FakeESClient()
316394
es_client.delete = pretend.raiser(TestException)
395+
monkeypatch.setattr(
396+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
397+
)
317398

318399
db_request.registry.update(
319400
{"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"}
320401
)
321402

322403
with pytest.raises(TestException):
323-
unindex_project(db_request, "foo")
404+
unindex_project(task, db_request, "foo")
405+
406+
def test_unindex_accepts_defeat(self, db_request, monkeypatch):
407+
task = pretend.stub()
324408

325-
def test_unindex_accepts_defeat(self, db_request):
326409
es_client = FakeESClient()
327410
es_client.delete = pretend.call_recorder(
328411
pretend.raiser(elasticsearch.exceptions.NotFoundError)
329412
)
413+
monkeypatch.setattr(
414+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
415+
)
330416

331417
db_request.registry.update(
332418
{"elasticsearch.client": es_client, "elasticsearch.index": "warehouse"}
333419
)
334420

335-
unindex_project(db_request, "foo")
421+
unindex_project(task, db_request, "foo")
336422

337423
assert es_client.delete.calls == [
338424
pretend.call(index="warehouse", doc_type="doc", id="foo")
339425
]
340426

427+
def test_unindex_retry_on_lock(self, db_request, monkeypatch):
428+
task = pretend.stub(
429+
retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry))
430+
)
431+
432+
db_request.registry.settings = {"celery.scheduler_url": "redis://redis:6379/0"}
433+
434+
le = redis.exceptions.LockError()
435+
monkeypatch.setattr(
436+
redis.StrictRedis,
437+
"from_url",
438+
lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)),
439+
)
440+
441+
with pytest.raises(celery.exceptions.Retry):
442+
unindex_project(task, db_request, "foo")
443+
444+
assert task.retry.calls == [pretend.call(countdown=60, exc=le)]
445+
446+
def test_reindex_retry_on_lock(self, db_request, monkeypatch):
447+
task = pretend.stub(
448+
retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry))
449+
)
450+
451+
db_request.registry.settings = {"celery.scheduler_url": "redis://redis:6379/0"}
452+
453+
le = redis.exceptions.LockError()
454+
monkeypatch.setattr(
455+
redis.StrictRedis,
456+
"from_url",
457+
lambda *a, **kw: pretend.stub(lock=pretend.raiser(le)),
458+
)
459+
460+
with pytest.raises(celery.exceptions.Retry):
461+
reindex_project(task, db_request, "foo")
462+
463+
assert task.retry.calls == [pretend.call(countdown=60, exc=le)]
464+
341465
def test_successfully_indexes(self, db_request, monkeypatch):
342466
docs = pretend.stub()
467+
task = pretend.stub()
343468

344469
def project_docs(db, project_name=None):
345470
return docs
@@ -364,8 +489,11 @@ def project_docs(db, project_name=None):
364489
lambda client, iterable, index=None: [None]
365490
)
366491
monkeypatch.setattr(warehouse.search.tasks, "parallel_bulk", parallel_bulk)
492+
monkeypatch.setattr(
493+
redis.StrictRedis, "from_url", lambda *a, **kw: pretend.stub(lock=NotLock)
494+
)
367495

368-
reindex_project(db_request, "foo")
496+
reindex_project(task, db_request, "foo")
369497

370498
assert parallel_bulk.calls == [pretend.call(es_client, docs, index="warehouse")]
371499
assert es_client.indices.create.calls == []

0 commit comments

Comments
 (0)