Skip to content

Threaded bulk insert #1596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 3, 2021
Merged

Threaded bulk insert #1596

merged 9 commits into from
Mar 3, 2021

Conversation

jeffreylovitz
Copy link
Contributor

This PR moves the work of the GRAPH.BULK endpoint to RedisGraph's writer thread and separates its processing and committing logic into separate stages so as to reduce the amount of time spent holding locks.

This enhancement has been requested by users and is expected to resolve an intermittent nightly issue in enterprise CI.

// stack-allocate an array to contain the attribute key IDs
// of each relationship type file
//
// TODO: not sure how many node tokens to expect, but for large numbers
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have one node token for up to 500 megabytes of each label/reltype, so I'm not worried about stack smashing here.

@jeffreylovitz jeffreylovitz force-pushed the threaded-bulk-insert branch from e834378 to 8a2c775 Compare March 1, 2021 19:18
key = RedisModule_OpenKey(ctx, rs_graph_name, REDISMODULE_READ);
RedisModule_CloseKey(key);
if(key) {
// TODO: introduce an error in errors.h
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a thread-local ErrorCtx seems like overkill for this endpoint.

Graph_AllocateNodes(gc->g, nodes_in_query + initial_node_count);

int rc = BulkInsert(ctx, gc, argv, argc);
// TODO: 'rc' ?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a common shorthand for "return code". Do you prefer something different?

@jeffreylovitz jeffreylovitz force-pushed the threaded-bulk-insert branch from 8a2c775 to 504169b Compare March 1, 2021 19:22
@jeffreylovitz jeffreylovitz force-pushed the threaded-bulk-insert branch from 504169b to f6d9615 Compare March 1, 2021 20:11
Copy link
Contributor

@swilly22 swilly22 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this is looking good, I've done some refactoring, and on the way broke the test suite, please go over my changes and see if you have any comments, in addition let me know if you need any help debugging the test failures, thank you!

@@ -18,6 +20,12 @@
port = None
redis_graph = None

def run_bulk_loader(graphname):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider replacing all direct calls to runner.invoke with this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems superfluous to me to add logic for differentiating variable numbers of CSV node/edge files to this function.

@@ -346,42 +354,21 @@ def test08_property_types(self):
# The graph should have the correct types for all properties
self.env.assertEquals(query_result.result_set, expected_result)

# Verify that numeric, boolean, and null types are properly handled
# def test09_utf8(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you removed this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is included in the bulk loader repo, I don't remember why it is commented out here.

def run_bulk_loader(graphname):
runner = CliRunner()
runner.invoke(bulk_insert, ['--port', port,
'--nodes', '/tmp/nodes.tmp',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems more suitable for this file to be passed in as an argument

Comment on lines 367 to 374
# Instantiate a thread to run the bulk loader
threading.Thread(target=run_bulk_loader, args=(graphname,)).start()

t0 = time.time()
redis_con.ping()
t1 = time.time() - t0
# Verify that pinging the server takes less than 1 second during bulk insertion
self.env.assertLess(t1, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should continuously try pinging the server for X seconds, counting each successful PONG response
after-which wait for the thread to exit gracefully, lastly validating number of PING requests equals to number of PONG responses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A blocked server will not result in a non-PONG response; it will simply delay the response until the lock is released.

Comment on lines 362 to 365
with open('/tmp/nodes.tmp', mode='w') as csv_file:
out = csv.writer(csv_file)
for i in range(1_000_000):
out.writerow([prop_str])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure this will create a CSV representing 1,000,000 nodes all of type nodes with an ID attribute of value:
Property value to be repeated 1 million generating a multi-megabyte CSV ?
I guess a header row will make things a bit clearer.

} else if(argc == 0) {
return BULK_OK;
// Process all relationship files
if(_BulkInsert_ProcessTokens(gc, relation_token_count, &argv,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like _BulkInsert_ProcessTokens can not fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is currently correct, yes.

@jeffreylovitz jeffreylovitz force-pushed the threaded-bulk-insert branch 2 times, most recently from 4c48a7d to 0b6f816 Compare March 2, 2021 18:02
Copy link
Contributor

@swilly22 swilly22 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few questions

@@ -180,32 +180,30 @@ static int _BulkInsert_ProcessFile(GraphContext *gc, const char *data,
}
}

if(prop_indices) rm_free(prop_indices);
rm_free(prop_indices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_BulkInsert_ReadHeader can return NULL

Comment on lines 195 to 194
const char *data = RedisModule_StringPtrLen(*argv[i], &len);
const char *data = RedisModule_StringPtrLen(**argv, &len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use *argv[i] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this causes bizarre memory errors and crashes. I'm not certain why, as argv and argc are properly aligned so we should be able to use 0-n indexing, but this approach is necessary.


gc = GraphContext_Retrieve(ctx, rs_graph_name, false, false);
gc = GraphContext_Retrieve(ctx, rs_graph_name, false, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create the key in _MGraph_Bulk_Begin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the key is empty, we will not instantiate a GraphContext without this argument.

@jeffreylovitz jeffreylovitz force-pushed the threaded-bulk-insert branch from 0b6f816 to 7734766 Compare March 2, 2021 21:47
@swilly22 swilly22 merged commit 2d43f9d into master Mar 3, 2021
@swilly22 swilly22 deleted the threaded-bulk-insert branch March 3, 2021 12:28
jeffreylovitz added a commit that referenced this pull request Mar 10, 2021
* Update the bulk updater to execute on a thread

* Bulk loader endpoint locks for minimal time

* TODOs

* Use a separate thread pool for bulk operations

* Update test_thread_pools.cpp

* refactor bulk-insert

* Fix PR problems

* count number of pings during bulk-insert, only create graph context on BEGIN token

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 2d43f9d)
swilly22 added a commit that referenced this pull request Mar 15, 2021
* RedisGraph benchmark automation  (#1557)

* Added local and remote benchmark definition and automation

* [fix] Fixes per PR review. Added option to specify benchmark via BENCHMARH=<benchmark name>. Updated benchmark template

Co-authored-by: filipecosta90 <[email protected]>
(cherry picked from commit f6f1ab2)

* Updated benchmark UPDATE-BASELINE to be less restrictive in the latency KPI (#1577)

Given we're still experimenting with the benchmarks CI KPI validation, this PR increases the `OverallClientLatencies.Total.q50` to be lower than 2.0 ( before was 1.5 ) so that we can collect further data and adjust afterwards...

(cherry picked from commit 611a0f0)

* * log redisgraph version (#1567)

When pulling container image tagged as `latest` or `edge` I sometimes
don't know which version I'm running, and it would be much faster to
find out if the information was displayed at startup. This patch logs
this information.

Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit fe2e7ce)

* [add] Triggering nightly CI benchmarks; Early return on CI benchmarks for forked PRs (#1579)

(cherry picked from commit a529c1e)

* use PRIu64 to format uint64_t (#1581)

(cherry picked from commit c0e00d5)

* [fix] Fixed missing github_actor on ci nightly benchmark automation (#1583)

(cherry picked from commit 8abad84)

* Fix idx assertion (#1580)

* Fix flawed assertion in index deletion logic

* Reduce KPI for updates_baseline benchmark

* Address PR comments

* Address PR comments

Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 6bad20a)

* Always report run-time errors as the sole reply (#1590)

* Always report run-time errors as the sole reply

* Update test_timeout.py

Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit c9ba776)

* remove wrong assertion (#1591)

(cherry picked from commit 12ef8ac)

* Report 0 indices created on duplicate index creation (#1592)

(cherry picked from commit e00f2c8)

* Multi-platform build (#1587)

Multi-platform build

(cherry picked from commit 26ace7a)

* Multi-platform build, take 2 (#1598)

(cherry picked from commit acde693)

* Moved common benchmark automation code to redisbench-admin package. Improved benchmark specification file (#1597)

(cherry picked from commit ebea927)

* Added readies submodule (#1600)

* Added readies submodule

* fixes 1

(cherry picked from commit efbfeaf)

* Dockerfle: fixed artifacts copy (#1601)

(cherry picked from commit f722f2d)

* CircleCI: fixed version release (#1602)

(cherry picked from commit 9f218d6)

* CircleCI: release-related fix (#1604)

(cherry picked from commit 15cf291)

* remove redundent include (#1606)

(cherry picked from commit 7ea1c43)

* Threaded bulk insert (#1596)

* Update the bulk updater to execute on a thread

* Bulk loader endpoint locks for minimal time

* TODOs

* Use a separate thread pool for bulk operations

* Update test_thread_pools.cpp

* refactor bulk-insert

* Fix PR problems

* count number of pings during bulk-insert, only create graph context on BEGIN token

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 2d43f9d)

* Use system gcc in Ubuntu 16 (#1615)

(cherry picked from commit 0c10130)

* wrongly assumed add op had only 2 operands (#1618)

(cherry picked from commit 6b06095)

* Updated benchmark requirements version (#1616)

* Updated benchmark requirements version

* Update requirements.txt

(cherry picked from commit db080d4)

* Runtime timeouts (#1610)

* Add run-time configuration for default query timeouts

* Timeout for write queries that haven't committed changes

* define TIMEOUT_NO_TIMEOUT

* Refactor timeout logic

* Address PR comments

* Do not use timeouts for write queries

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 964b268)

* Fix typo in assertion

* bump version to 2.2.16

Co-authored-by: Roi Lipman <[email protected]>
Co-authored-by: filipe oliveira <[email protected]>
Co-authored-by: bc² <[email protected]>
Co-authored-by: Rafi Einstein <[email protected]>
jeffreylovitz added a commit that referenced this pull request Mar 15, 2021
* Update the bulk updater to execute on a thread

* Bulk loader endpoint locks for minimal time

* TODOs

* Use a separate thread pool for bulk operations

* Update test_thread_pools.cpp

* refactor bulk-insert

* Fix PR problems

* count number of pings during bulk-insert, only create graph context on BEGIN token

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 2d43f9d)
swilly22 added a commit that referenced this pull request Mar 17, 2021
* Threaded bulk insert (#1596)

* Update the bulk updater to execute on a thread

* Bulk loader endpoint locks for minimal time

* TODOs

* Use a separate thread pool for bulk operations

* Update test_thread_pools.cpp

* refactor bulk-insert

* Fix PR problems

* count number of pings during bulk-insert, only create graph context on BEGIN token

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 2d43f9d)

* set score to 1 for each document (#1607)

* set score to 1 for each document

* test fulltext search scoring

* Update proc_fulltext_query.c

* Add documentation

Co-authored-by: Jeffrey Lovitz <[email protected]>
(cherry picked from commit bd1fdca)

* wrongly assumed add op had only 2 operands (#1618)

(cherry picked from commit 6b06095)

* Updated benchmark requirements version (#1616)

* Updated benchmark requirements version

* Update requirements.txt

(cherry picked from commit db080d4)

* Runtime timeouts (#1610)

* Add run-time configuration for default query timeouts

* Timeout for write queries that haven't committed changes

* define TIMEOUT_NO_TIMEOUT

* Refactor timeout logic

* Address PR comments

* Do not use timeouts for write queries

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
(cherry picked from commit 964b268)

* Use master version of CircleCI config

* bump version to 2.4.2

Co-authored-by: Roi Lipman <[email protected]>
Co-authored-by: filipe oliveira <[email protected]>
pnxguide pushed a commit to CMU-SPEED/RedisGraph that referenced this pull request Mar 22, 2023
* Update the bulk updater to execute on a thread

* Bulk loader endpoint locks for minimal time

* TODOs

* Use a separate thread pool for bulk operations

* Update test_thread_pools.cpp

* refactor bulk-insert

* Fix PR problems

* count number of pings during bulk-insert, only create graph context on BEGIN token

Co-authored-by: swilly22 <[email protected]>
Co-authored-by: Roi Lipman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants