Skip to content

Adding RedisGraph support #1673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions redis/commands/graph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from ..helpers import quote_string, random_string, stringify_param_value
from .commands import GraphCommands
from .edge import Edge # noqa
from .node import Node # noqa
from .path import Path # noqa


class Graph(GraphCommands):
"""
Graph, collection of nodes and edges.
"""

def __init__(self, client, name=random_string()):
"""
Create a new graph.
"""
self.NAME = name # Graph key
self.client = client
self.execute_command = client.execute_command

self.nodes = {}
self.edges = []
self._labels = [] # List of node labels.
self._properties = [] # List of properties.
self._relationshipTypes = [] # List of relation types.
self.version = 0 # Graph version

@property
def name(self):
return self.NAME

def _clear_schema(self):
self._labels = []
self._properties = []
self._relationshipTypes = []

def _refresh_schema(self):
self._clear_schema()
self._refresh_labels()
self._refresh_relations()
self._refresh_attributes()

def _refresh_labels(self):
lbls = self.labels()

# Unpack data.
self._labels = [None] * len(lbls)
for i, l in enumerate(lbls):
self._labels[i] = l[0]

def _refresh_relations(self):
rels = self.relationshipTypes()

# Unpack data.
self._relationshipTypes = [None] * len(rels)
for i, r in enumerate(rels):
self._relationshipTypes[i] = r[0]

def _refresh_attributes(self):
props = self.propertyKeys()

# Unpack data.
self._properties = [None] * len(props)
for i, p in enumerate(props):
self._properties[i] = p[0]

def get_label(self, idx):
"""
Returns a label by it's index

Args:

idx:
The index of the label
"""
try:
label = self._labels[idx]
except IndexError:
# Refresh labels.
self._refresh_labels()
label = self._labels[idx]
return label

def get_relation(self, idx):
"""
Returns a relationship type by it's index

Args:

idx:
The index of the relation
"""
try:
relationship_type = self._relationshipTypes[idx]
except IndexError:
# Refresh relationship types.
self._refresh_relations()
relationship_type = self._relationshipTypes[idx]
return relationship_type

def get_property(self, idx):
"""
Returns a property by it's index

Args:

idx:
The index of the property
"""
try:
propertie = self._properties[idx]
except IndexError:
# Refresh properties.
self._refresh_attributes()
propertie = self._properties[idx]
return propertie

def add_node(self, node):
"""
Adds a node to the graph.
"""
if node.alias is None:
node.alias = random_string()
self.nodes[node.alias] = node

def add_edge(self, edge):
"""
Adds an edge to the graph.
"""
if not (self.nodes[edge.src_node.alias] and self.nodes[edge.dest_node.alias]):
raise AssertionError("Both edge's end must be in the graph")

self.edges.append(edge)

def _build_params_header(self, params):
if not isinstance(params, dict):
raise TypeError("'params' must be a dict")
# Header starts with "CYPHER"
params_header = "CYPHER "
for key, value in params.items():
params_header += str(key) + "=" + stringify_param_value(value) + " "
return params_header

# Procedures.
def call_procedure(self, procedure, *args, read_only=False, **kwagrs):
args = [quote_string(arg) for arg in args]
q = f"CALL {procedure}({','.join(args)})"

y = kwagrs.get("y", None)
if y:
q += f" YIELD {','.join(y)}"

return self.query(q, read_only=read_only)

def labels(self):
return self.call_procedure("db.labels", read_only=True).result_set

def relationshipTypes(self):
return self.call_procedure("db.relationshipTypes", read_only=True).result_set

def propertyKeys(self):
return self.call_procedure("db.propertyKeys", read_only=True).result_set
200 changes: 200 additions & 0 deletions redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from redis import DataError
from redis.exceptions import ResponseError

from .exceptions import VersionMismatchException
from .query_result import QueryResult


class GraphCommands:
def commit(self):
"""
Create entire graph.
For more information see `CREATE <https://oss.redis.com/redisgraph/master/commands/#create>`_. # noqa
"""
if len(self.nodes) == 0 and len(self.edges) == 0:
return None

query = "CREATE "
for _, node in self.nodes.items():
query += str(node) + ","

query += ",".join([str(edge) for edge in self.edges])

# Discard leading comma.
if query[-1] == ",":
query = query[:-1]

return self.query(query)

def query(self, q, params=None, timeout=None, read_only=False, profile=False):
"""
Executes a query against the graph.
For more information see `GRAPH.QUERY <https://oss.redis.com/redisgraph/master/commands/#graphquery>`_. # noqa

Args:

-------
q :
The query.
params : dict
Query parameters.
timeout : int
Maximum runtime for read queries in milliseconds.
read_only : bool
Executes a readonly query if set to True.
profile : bool
Return details on results produced by and time
spent in each operation.
"""

# maintain original 'q'
query = q

# handle query parameters
if params is not None:
query = self._build_params_header(params) + query

# construct query command
# ask for compact result-set format
# specify known graph version
if profile:
cmd = "GRAPH.PROFILE"
else:
cmd = "GRAPH.RO_QUERY" if read_only else "GRAPH.QUERY"
command = [cmd, self.name, query, "--compact"]

# include timeout is specified
if timeout:
if not isinstance(timeout, int):
raise Exception("Timeout argument must be a positive integer")
command += ["timeout", timeout]

# issue query
try:
response = self.execute_command(*command)
return QueryResult(self, response, profile)
except ResponseError as e:
if "wrong number of arguments" in str(e):
print(
"Note: RedisGraph Python requires server version 2.2.8 or above"
) # noqa
if "unknown command" in str(e) and read_only:
# `GRAPH.RO_QUERY` is unavailable in older versions.
return self.query(q, params, timeout, read_only=False)
raise e
except VersionMismatchException as e:
# client view over the graph schema is out of sync
# set client version and refresh local schema
self.version = e.version
self._refresh_schema()
# re-issue query
return self.query(q, params, timeout, read_only)

def merge(self, pattern):
"""
Merge pattern.
For more information see `MERGE <https://oss.redis.com/redisgraph/master/commands/#merge>`_. # noqa
"""
query = "MERGE "
query += str(pattern)

return self.query(query)

def delete(self):
"""
Deletes graph.
For more information see `DELETE <https://oss.redis.com/redisgraph/master/commands/#delete>`_. # noqa
"""
self._clear_schema()
return self.execute_command("GRAPH.DELETE", self.name)

# declared here, to override the built in redis.db.flush()
def flush(self):
"""
Commit the graph and reset the edges and the nodes to zero length.
"""
self.commit()
self.nodes = {}
self.edges = []

def explain(self, query, params=None):
"""
Get the execution plan for given query,
Returns an array of operations.
For more information see `GRAPH.EXPLAIN <https://oss.redis.com/redisgraph/master/commands/#graphexplain>`_. # noqa

Args:

-------
query:
The query that will be executed.
params: dict
Query parameters.
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return "\n".join(plan)

def bulk(self, **kwargs):
"""Internal only. Not supported."""
raise NotImplementedError(
"GRAPH.BULK is internal only. "
"Use https://github.com/redisgraph/redisgraph-bulk-loader."
)

def profile(self, query):
"""
Execute a query and produce an execution plan augmented with metrics
for each operation's execution. Return a string representation of a
query execution plan, with details on results produced by and time
spent in each operation.
For more information see `GRAPH.PROFILE <https://oss.redis.com/redisgraph/master/commands/#graphprofile>`_. # noqa
"""
return self.query(query, profile=True)

def slowlog(self):
"""
Get a list containing up to 10 of the slowest queries issued
against the given graph ID.
For more information see `GRAPH.SLOWLOG <https://oss.redis.com/redisgraph/master/commands/#graphslowlog>`_. # noqa

Each item in the list has the following structure:
1. A unix timestamp at which the log entry was processed.
2. The issued command.
3. The issued query.
4. The amount of time needed for its execution, in milliseconds.
"""
return self.execute_command("GRAPH.SLOWLOG", self.name)

def config(self, name, value=None, set=False):
"""
Retrieve or update a RedisGraph configuration.
For more information see `GRAPH.CONFIG <https://oss.redis.com/redisgraph/master/commands/#graphconfig>`_. # noqa

Args:

name : str
The name of the configuration
value :
The value we want to ser (can be used only when `set` is on)
set : bool
Turn on to set a configuration. Default behavior is get.
"""
params = ["SET" if set else "GET", name]
if value is not None:
if set:
params.append(value)
else:
raise DataError(
"``value`` can be provided only when ``set`` is True"
) # noqa
return self.execute_command("GRAPH.CONFIG", *params)

def list_keys(self):
"""
Lists all graph keys in the keyspace.
For more information see `GRAPH.LIST <https://oss.redis.com/redisgraph/master/commands/#graphlist>`_. # noqa
"""
return self.execute_command("GRAPH.LIST")
Loading