Skip to content

Commit ea87640

Browse files
swilly22DvirDukhan
andauthored
Graph version (#1382)
* add graph version to graph context object * change graph version from string to uint32 * testing graph version * response with an error when graph version mismatch * release graph context on version mismatch * address PR comments * increase command arity to accomadate graph version argument * address PR comments Co-authored-by: DvirDukhan <[email protected]>
1 parent e73b8ef commit ea87640

File tree

10 files changed

+276
-48
lines changed

10 files changed

+276
-48
lines changed

src/commands/cmd_dispatcher.c

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,29 @@
44
* This file is available under the Redis Labs Source Available License Agreement
55
*/
66

7+
#include "../RG.h"
78
#include "commands.h"
89
#include "cmd_context.h"
910
#include "../RG.h"
1011
#include <assert.h>
1112
#include <strings.h>
1213

14+
#define GRAPH_VERSION_MISSING -1
15+
1316
// Command handler function pointer.
1417
typedef void(*Command_Handler)(void *args);
1518

1619
// Read configuration flags, returning REDIS_MODULE_ERR if flag parsing failed.
17-
static int _read_flags(RedisModuleString **argv, int argc, bool *compact, long long *timeout,
18-
char **errmsg) {
20+
static int _read_flags(RedisModuleString **argv, int argc, bool *compact,
21+
long long *timeout, uint *graph_version, char **errmsg) {
22+
1923
ASSERT(compact);
2024
ASSERT(timeout);
2125

2226
// set defaults
2327
*timeout = 0; // no timeout
2428
*compact = false; // verbose
29+
*graph_version = GRAPH_VERSION_MISSING;
2530

2631
// GRAPH.QUERY <GRAPH_KEY> <QUERY>
2732
// make sure we've got more than 3 arguments
@@ -37,6 +42,24 @@ static int _read_flags(RedisModuleString **argv, int argc, bool *compact, long l
3742
continue;
3843
}
3944

45+
if(!strcasecmp(arg, "version")) {
46+
long long v = GRAPH_VERSION_MISSING;
47+
int err = REDISMODULE_ERR;
48+
if(i < argc - 1) {
49+
i++; // Set the current argument to the version value.
50+
err = RedisModule_StringToLongLong(argv[i], &v);
51+
*graph_version = v;
52+
}
53+
54+
// Emit error on missing, negative, or non-numeric version values.
55+
if(err != REDISMODULE_OK || v < 0 || v > UINT_MAX) {
56+
asprintf(errmsg, "Failed to parse graph version value");
57+
return REDISMODULE_ERR;
58+
}
59+
60+
continue;
61+
}
62+
4063
// query timeout
4164
if(!strcasecmp(arg, "timeout")) {
4265
int err = REDISMODULE_ERR;
@@ -55,6 +78,20 @@ static int _read_flags(RedisModuleString **argv, int argc, bool *compact, long l
5578
return REDISMODULE_OK;
5679
}
5780

81+
// Returns false if client provided a graph version
82+
// which mismatch the current graph version
83+
static bool _verifyGraphVersion(GraphContext *gc, uint version) {
84+
// caller did not specify graph version
85+
if(version == GRAPH_VERSION_MISSING) return true;
86+
return (GraphContext_GetVersion(gc) == version);
87+
}
88+
89+
static void _rejectOnVersionMismatch(RedisModuleCtx *ctx, uint version) {
90+
RedisModule_ReplyWithArray(ctx, 2);
91+
RedisModule_ReplyWithError(ctx, "version mismatch");
92+
RedisModule_ReplyWithLongLong(ctx, version);
93+
}
94+
5895
// Return true if the command has a valid number of arguments.
5996
static inline bool _validate_command_arity(GRAPH_Commands cmd, int arity) {
6097
switch(cmd) {
@@ -63,7 +100,7 @@ static inline bool _validate_command_arity(GRAPH_Commands cmd, int arity) {
63100
case CMD_EXPLAIN:
64101
case CMD_PROFILE:
65102
// Expect a command, graph name, a query, and optional config flags.
66-
return arity >= 3 && arity <= 6;
103+
return arity >= 3 && arity <= 8;
67104
case CMD_SLOWLOG:
68105
// Expect just a command and graph name.
69106
return arity == 2;
@@ -103,17 +140,20 @@ static GRAPH_Commands determine_command(const char *cmd_name) {
103140
}
104141

105142
int CommandDispatch(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
106-
CommandCtx *context;
143+
char *errmsg;
144+
bool compact;
145+
uint version;
146+
long long timeout;
147+
CommandCtx *context = NULL;
107148

108149
RedisModuleString *graph_name = argv[1];
109150
RedisModuleString *query = (argc > 2) ? argv[2] : NULL;
110151
const char *command_name = RedisModule_StringPtrLen(argv[0], NULL);
111152
GRAPH_Commands cmd = determine_command(command_name);
153+
112154
// Parse additional query arguments.
113-
char *errmsg;
114-
bool compact;
115-
long long timeout;
116-
int res = _read_flags(argv, argc, &compact, &timeout, &errmsg);
155+
int res = _read_flags(argv, argc, &compact, &timeout, &version, &errmsg);
156+
117157
if(res == REDISMODULE_ERR) {
118158
// Emit error and exit if argument parsing failed.
119159
RedisModule_ReplyWithError(ctx, errmsg);
@@ -123,11 +163,19 @@ int CommandDispatch(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
123163
}
124164

125165
if(_validate_command_arity(cmd, argc) == false) return RedisModule_WrongArity(ctx);
166+
126167
Command_Handler handler = get_command_handler(cmd);
127168
GraphContext *gc = GraphContext_Retrieve(ctx, graph_name, true, true);
128169
// If the GraphContext is null, key access failed and an error has been emitted.
129170
if(!gc) return REDISMODULE_ERR;
130171

172+
// return incase caller provided a mismatched graph version
173+
if(!_verifyGraphVersion(gc, version)) {
174+
_rejectOnVersionMismatch(ctx, GraphContext_GetVersion(gc));
175+
GraphContext_Release(gc);
176+
return REDISMODULE_OK;
177+
}
178+
131179
/* Determin query execution context
132180
* queries issued within a LUA script or multi exec block must
133181
* run on Redis main thread, others can run on different threads. */

src/graph/graphcontext.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
#include <sys/param.h>
88
#include <pthread.h>
99
#include "graphcontext.h"
10+
#include "../RG.h"
1011
#include "../util/arr.h"
12+
#include "../util/uuid.h"
1113
#include "../query_ctx.h"
1214
#include "../redismodule.h"
1315
#include "../util/rmalloc.h"
@@ -25,6 +27,7 @@ extern RedisModuleType *GraphContextRedisModuleType;
2527

2628
// Forward declarations.
2729
static void _GraphContext_Free(void *arg);
30+
static void _GraphContext_UpdateVersion(GraphContext *gc, const char *str);
2831

2932
static inline void _GraphContext_IncreaseRefCount(GraphContext *gc) {
3033
__atomic_fetch_add(&gc->ref_count, 1, __ATOMIC_RELAXED);
@@ -53,6 +56,7 @@ GraphContext *GraphContext_New(const char *graph_name, size_t node_cap, size_t e
5356

5457
gc->ref_count = 0; // No refences.
5558
gc->index_count = 0; // No indicies.
59+
gc->version = 0; // Initial graph version.
5660

5761
// Initialize the graph's matrices and datablock storage
5862
gc->g = Graph_New(node_cap, edge_cap);
@@ -186,6 +190,32 @@ void GraphContext_Rename(GraphContext *gc, const char *name) {
186190
gc->graph_name = rm_strdup(name);
187191
}
188192

193+
XXH32_hash_t GraphContext_GetVersion(const GraphContext *gc) {
194+
ASSERT(gc != NULL);
195+
196+
return gc->version;
197+
}
198+
199+
// Update graph context version
200+
static void _GraphContext_UpdateVersion(GraphContext *gc, const char *str) {
201+
ASSERT(gc != NULL);
202+
ASSERT(str != NULL);
203+
204+
/* Update graph version by hashing 'str' representing the current
205+
* addition to the graph schema: (Label, Relationship-type, Attribute)
206+
*
207+
* Using the current graph version as a seed, by doing so we avoid
208+
* hashing the entire graph schema on each change, while guaranteeing the
209+
* exact same version across a cluster: same graph version on both
210+
* primary and replica shards. */
211+
212+
XXH32_state_t *state = XXH32_createState();
213+
XXH32_reset(state, gc->version);
214+
XXH32_update(state, str, strlen(str));
215+
gc->version = XXH32_digest(state);
216+
XXH32_freeState(state);
217+
}
218+
189219
//------------------------------------------------------------------------------
190220
// Schema API
191221
//------------------------------------------------------------------------------
@@ -232,6 +262,9 @@ Schema *GraphContext_AddSchema(GraphContext *gc, const char *label, SchemaType t
232262
gc->relation_schemas = array_append(gc->relation_schemas, schema);
233263
}
234264

265+
// new schema added, update graph version
266+
_GraphContext_UpdateVersion(gc, label);
267+
235268
return schema;
236269
}
237270

@@ -279,6 +312,9 @@ Attribute_ID GraphContext_FindOrAddAttribute(GraphContext *gc, const char *attri
279312
attribute_id,
280313
NULL);
281314
gc->string_mapping = array_append(gc->string_mapping, rm_strdup(attribute));
315+
316+
// new attribute been added, update graph version
317+
_GraphContext_UpdateVersion(gc, attribute);
282318
}
283319
}
284320

src/graph/graphcontext.h

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@
1717
#include "../serializers/decode_context.h"
1818
#include "../util/cache/cache.h"
1919

20+
/* GraphContext holds refrences to various elements of a graph object
21+
* It is the value sitting behind a Redis graph key
22+
*
23+
* the graph context is versioned, the version value itself is meaningless
24+
* it is used as a "signature" for the graph schema: (labels, relationship-types
25+
* and attribute set) client libraries which cache the mapping between graph
26+
* schema elements and their internal IDs (see COMPACT reply formatter)
27+
* can use the graph version to understand if the schema was modified
28+
* and take action accordingly */
29+
2030
typedef struct {
2131
Graph *g; // Container for all matrices and entity properties
2232
int ref_count; // Number of active references.
@@ -31,9 +41,13 @@ typedef struct {
3141
GraphEncodeContext *encoding_context; // Encode context of the graph.
3242
GraphDecodeContext *decoding_context; // Decode context of the graph.
3343
Cache **cache_pool; // Pool of execution plan caches, one per thread.
44+
XXH32_hash_t version; // Graph version.
3445
} GraphContext;
3546

36-
/* GraphContext API */
47+
//------------------------------------------------------------------------------
48+
// GraphContext API
49+
//------------------------------------------------------------------------------
50+
3751
// Creates and initializes a graph context struct.
3852
GraphContext *GraphContext_New(const char *graph_name, size_t node_cap, size_t edge_cap);
3953
/* Retrive the graph context according to the graph name
@@ -48,7 +62,16 @@ void GraphContext_MarkWriter(RedisModuleCtx *ctx, GraphContext *gc);
4862
// Mark graph as deleted, reduce graph reference count by 1.
4963
void GraphContext_Delete(GraphContext *gc);
5064

51-
/* Schema API */
65+
// Rename a graph context.
66+
void GraphContext_Rename(GraphContext *gc, const char *name);
67+
68+
// Get graph context version
69+
XXH32_hash_t GraphContext_GetVersion(const GraphContext *gc);
70+
71+
//------------------------------------------------------------------------------
72+
// Schema API
73+
//------------------------------------------------------------------------------
74+
5275
// Retrieve number of schemas created for given type.
5376
unsigned short GraphContext_SchemaCount(const GraphContext *gc, SchemaType t);
5477
// Retrieve the specific schema for the provided ID
@@ -70,7 +93,10 @@ const char *GraphContext_GetAttributeString(GraphContext *gc, Attribute_ID id);
7093
// Retrieve an attribute ID given a string, or ATTRIBUTE_NOTFOUND if attribute doesn't exist.
7194
Attribute_ID GraphContext_GetAttributeID(GraphContext *gc, const char *str);
7295

73-
/* Index API */
96+
//------------------------------------------------------------------------------
97+
// Index API
98+
//------------------------------------------------------------------------------
99+
74100
bool GraphContext_HasIndices(GraphContext *gc);
75101
// Attempt to retrieve an index on the given label and attribute
76102
Index *GraphContext_GetIndex(const GraphContext *gc, const char *label, Attribute_ID *attribute_id,
@@ -93,10 +119,10 @@ GraphContext *GraphContext_GetRegisteredGraphContext(const char *graph_name);
93119
// Remove GraphContext from global array
94120
void GraphContext_RemoveFromRegistry(GraphContext *gc);
95121

96-
// Rename a graph context.
97-
void GraphContext_Rename(GraphContext *gc, const char *name);
122+
//------------------------------------------------------------------------------
123+
// Slowlog API
124+
//------------------------------------------------------------------------------
98125

99-
/* Slowlog API */
100126
SlowLog *GraphContext_GetSlowLog(const GraphContext *gc);
101127

102128
/* Cache API - Return cache associated with graph context and current thread id. */

src/query_ctx.c

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,45 +208,51 @@ bool QueryCtx_LockForCommit(void) {
208208

209209
}
210210

211-
void QueryCtx_UnlockCommit(OpBase *writer_op) {
212-
QueryCtx *ctx = _QueryCtx_GetCtx();
213-
// Check that the writer_op is entitled to release the lock.
214-
if(ctx->internal_exec_ctx.last_writer != writer_op) return;
215-
if(!ctx->internal_exec_ctx.locked_for_commit) return;
216-
RedisModuleCtx *redis_ctx = ctx->global_exec_ctx.redis_ctx;
211+
static void _QueryCtx_UnlockCommit(QueryCtx *ctx) {
217212
GraphContext *gc = ctx->gc;
218-
if(ResultSetStat_IndicateModification(ctx->internal_exec_ctx.result_set->stats))
213+
RedisModuleCtx *redis_ctx = ctx->global_exec_ctx.redis_ctx;
214+
215+
if(ResultSetStat_IndicateModification(ctx->internal_exec_ctx.result_set->stats)) {
219216
// Replicate only in case of changes.
220217
RedisModule_Replicate(redis_ctx, ctx->global_exec_ctx.command_name, "cc!", gc->graph_name,
221218
ctx->query_data.query);
219+
}
220+
222221
ctx->internal_exec_ctx.locked_for_commit = false;
223222
// Release graph R/W lock.
224223
Graph_ReleaseLock(gc->g);
224+
225225
// Close Key.
226226
RedisModule_CloseKey(ctx->internal_exec_ctx.key);
227+
227228
// Unlock GIL.
228229
_QueryCtx_ThreadSafeContextUnlock(ctx);
229230
}
230231

232+
void QueryCtx_UnlockCommit(OpBase *writer_op) {
233+
QueryCtx *ctx = _QueryCtx_GetCtx();
234+
235+
// check that the writer_op is entitled to release the lock.
236+
if(ctx->internal_exec_ctx.last_writer != writer_op) return;
237+
238+
// already unlocked?
239+
if(!ctx->internal_exec_ctx.locked_for_commit) return;
240+
241+
_QueryCtx_UnlockCommit(ctx);
242+
}
243+
231244
void QueryCtx_ForceUnlockCommit() {
232245
QueryCtx *ctx = _QueryCtx_GetCtx();
246+
247+
// already unlocked?
233248
if(!ctx->internal_exec_ctx.locked_for_commit) return;
249+
234250
RedisModuleCtx *redis_ctx = ctx->global_exec_ctx.redis_ctx;
235-
GraphContext *gc = ctx->gc;
236251
RedisModule_Log(redis_ctx, "warning",
237252
"RedisGraph used forced unlocking commit flow for the query %s",
238253
ctx->query_data.query);
239-
if(ResultSetStat_IndicateModification(ctx->internal_exec_ctx.result_set->stats))
240-
// Replicate only in case of changes.
241-
RedisModule_Replicate(redis_ctx, ctx->global_exec_ctx.command_name, "cc!", gc->graph_name,
242-
ctx->query_data.query);
243-
ctx->internal_exec_ctx.locked_for_commit = false;
244-
// Release graph R/W lock.
245-
Graph_ReleaseLock(gc->g);
246-
// Close Key.
247-
RedisModule_CloseKey(ctx->internal_exec_ctx.key);
248-
// Unlock GIL.
249-
_QueryCtx_ThreadSafeContextUnlock(ctx);
254+
255+
_QueryCtx_UnlockCommit(ctx);
250256
}
251257

252258
inline bool QueryCtx_EncounteredError(void) {

src/resultset/resultset.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
static void _ResultSet_ReplayStats(RedisModuleCtx *ctx, ResultSet *set) {
1616
char buff[512] = {0};
17-
size_t resultset_size = 2; /* query execution time and cached execution. */
17+
size_t resultset_size = 2; // execution time, cached
1818
int buflen;
1919

2020
if(set->stats.labels_added > 0) resultset_size++;

src/resultset/resultset_statistics.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ bool ResultSetStat_IndicateModification(ResultSetStatistics stats) {
1616
|| stats.indices_created > 0
1717
|| stats.indices_deleted > 0);
1818
}
19+

0 commit comments

Comments
 (0)