Skip to content

Commit dfebc4e

Browse files
committed
elasticsearch: use Configurator insead of globals, hook into transaction
1 parent 44771cd commit dfebc4e

File tree

2 files changed

+42
-32
lines changed

2 files changed

+42
-32
lines changed

warehouse/packaging/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,4 @@ def includeme(config):
3636
"project",
3737
"project/{obj.project.normalized_name}",
3838
)
39+
config.include('.elasticsearch')

warehouse/packaging/elasticsearch.py

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
"""
2+
Indexes sqlalchemy changes to elasticsearch. It does not handle
3+
db cascades so they should be avoided.
4+
"""
5+
6+
import transaction
17
from sqlalchemy import event
28
from elasticsearch import Elasticsearch
39
from pyramid.threadlocal import get_current_registry
@@ -6,43 +12,46 @@
612
from warehouse.packaging.models import Release
713

814

9-
release_options = dict(index="warehouse",
10-
doc_type="release")
11-
12-
13-
def get_elasticsearch():
14-
"""Configures Elasticsearch and returns the object"""
15-
es_url = get_current_registry().settings["elasticsearch.url"]
16-
return Elasticsearch([es_url])
15+
release_defaults = dict(index="warehouse",
16+
doc_type="release")
1717

18-
19-
@event.listens_for(Release, 'after_insert')
20-
@event.listens_for(Release, 'after_update')
21-
def release_insert_update(mapper, connection, target):
22-
"""Signal insert/update events for Release model"""
23-
es = get_elasticsearch()
18+
def handle_insert(elasticsearch_url, target):
19+
es = Elasticsearch([es_url])
2420
es.index(id=target.name,
2521
body={"name": target.name,
2622
"version": target.version,
2723
"description": target.description,
2824
"summary": target.summary,
2925
"license": target.license,
3026
"download_url": target.download_url},
31-
**release_options)
32-
33-
34-
@event.listens_for(Release, 'before_delete')
35-
def release_delete(mapper, connection, target):
36-
"""Signal idelete event for Release model"""
37-
es = get_elasticsearch()
38-
es.delete(id=target.id, **release_options)
39-
40-
41-
@event.listens_for(_Session, 'after_bulk_update')
42-
def release_after_bulk_update(update_context):
43-
pass # TODO: implement
44-
45-
46-
@event.listens_for(_Session, 'after_bulk_delete')
47-
def release_after_bulk_delete(update_context):
48-
pass # TODO: implement
27+
**release_defaults)
28+
29+
def handle_delete(elasticsearch_url, target):
30+
es = Elasticsearch([es_url])
31+
es.delete(id=target.id, **release_defaults)
32+
33+
def includeme(config):
34+
"""Register elasticsearch callbacks"""
35+
elasticsearch_url = config.registry.settings["elasticsearch.url"]
36+
37+
@event.listens_for(Release, 'after_insert')
38+
@event.listens_for(Release, 'after_update')
39+
def release_insert_update(mapper, connection, target):
40+
"""Signal insert/update events for Release model"""
41+
tx = transaction.get()
42+
tx.addAfterCommitHook(handle_insert, args=(elasticsearch_url, target))
43+
44+
@event.listens_for(Release, 'before_delete')
45+
def release_delete(mapper, connection, target):
46+
"""Signal idelete event for Release model"""
47+
tx = transaction.get()
48+
tx.addAfterCommitHook(handle_delete, args=(elasticsearch_url, target))
49+
50+
@event.listens_for(_Session, 'after_bulk_update')
51+
def release_after_bulk_update(update_context):
52+
import pdb;pdb.set_trace()
53+
54+
@event.listens_for(_Session, 'before_bulk_delete')
55+
def release_after_bulk_delete(update_context):
56+
"""Get affected ids before they are deleted"""
57+
import pdb;pdb.set_trace()

0 commit comments

Comments
 (0)