diff --git a/graphdatascience/procedure_surface/api/articlerank_endpoints.py b/graphdatascience/procedure_surface/api/articlerank_endpoints.py index 9f2bf914b..5e6d76236 100644 --- a/graphdatascience/procedure_surface/api/articlerank_endpoints.py +++ b/graphdatascience/procedure_surface/api/articlerank_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -258,18 +258,14 @@ def write( """ @abstractmethod - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Returns an estimation of the memory consumption for that procedure. Parameters ---------- - G : Optional[Graph], default=None - The graph to estimate for - projection_config : Optional[dict[str, Any]], default=None - Configuration for graph projection + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/api/articulationpoints_endpoints.py b/graphdatascience/procedure_surface/api/articulationpoints_endpoints.py index 8e339c4d4..17336b473 100644 --- a/graphdatascience/procedure_surface/api/articulationpoints_endpoints.py +++ b/graphdatascience/procedure_surface/api/articulationpoints_endpoints.py @@ -1,11 +1,12 @@ from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict from pydantic.alias_generators import to_camel from ...graph.graph_object import Graph +from .estimation_result import EstimationResult class ArticulationPointsEndpoints(ABC): @@ -151,7 +152,6 @@ def write( concurrency: Optional[Any] = None, job_id: Optional[Any] = None, write_concurrency: Optional[Any] = None, - write_to_result_store: Optional[bool] = None, ) -> "ArticulationPointsWriteResult": """ Executes the ArticulationPoints algorithm and writes the results back to the Neo4j database. @@ -178,8 +178,6 @@ def write( An identifier for the job write_concurrency : Optional[Any], default=None The number of concurrent threads for writing - write_to_result_store : Optional[bool], default=None - Whether to write results to the result store Returns ------- @@ -187,6 +185,41 @@ def write( Algorithm metrics and statistics including the count of articulation points found """ + @abstractmethod + def estimate( + self, + G: Union[Graph, dict[str, Any]], + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + concurrency: Optional[Any] = None, + ) -> EstimationResult: + """ + Estimate the memory consumption of the Articulation Points algorithm. + + This method provides an estimate of the memory requirements for running the algorithm + on a given graph, helping with capacity planning and resource allocation. + + Parameters + ---------- + G : Union[Graph, dict[str, Any]] + The graph to be used in the estimation. Provided either as a Graph object or a configuration dictionary for the projection. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + concurrency : Optional[Any], default=None + The number of concurrent threads used for the estimation. + If not specified, uses the default concurrency level. + + Returns + ------- + EstimationResult + An object containing the result of the estimation including memory requirements + """ + pass + class ArticulationPointsMutateResult(BaseModel): """Result of running ArticulationPoints algorithm with mutate mode.""" diff --git a/graphdatascience/procedure_surface/api/base_result.py b/graphdatascience/procedure_surface/api/base_result.py new file mode 100644 index 000000000..34d65e4c8 --- /dev/null +++ b/graphdatascience/procedure_surface/api/base_result.py @@ -0,0 +1,9 @@ +from typing import Any + +from pydantic import BaseModel +from pydantic.alias_generators import to_camel + + +class BaseResult(BaseModel, alias_generator=to_camel): + def __getitem__(self, item: str) -> Any: + return getattr(self, item) diff --git a/graphdatascience/procedure_surface/api/betweenness_endpoints.py b/graphdatascience/procedure_surface/api/betweenness_endpoints.py index 5afb18315..b2450d66d 100644 --- a/graphdatascience/procedure_surface/api/betweenness_endpoints.py +++ b/graphdatascience/procedure_surface/api/betweenness_endpoints.py @@ -1,11 +1,11 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame -from pydantic import BaseModel, ConfigDict -from pydantic.alias_generators import to_camel + +from graphdatascience.procedure_surface.api.base_result import BaseResult from ...graph.graph_object import Graph from .estimation_result import EstimationResult @@ -222,20 +222,14 @@ def write( """ @abstractmethod - def estimate( - self, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. Parameters ---------- - G : Optional[Graph], optional - The graph to be used in the estimation - projection_config : Optional[dict[str, Any]], optional - Configuration dictionary for the projection. + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- @@ -244,11 +238,9 @@ def estimate( """ -class BetweennessMutateResult(BaseModel): +class BetweennessMutateResult(BaseResult): """Result of running Betweenness Centrality algorithm with mutate mode.""" - model_config = ConfigDict(alias_generator=to_camel) - node_properties_written: int pre_processing_millis: int compute_millis: int @@ -257,30 +249,20 @@ class BetweennessMutateResult(BaseModel): centrality_distribution: dict[str, Any] configuration: dict[str, Any] - def __getitem__(self, item: str) -> Any: - return getattr(self, item) - -class BetweennessStatsResult(BaseModel): +class BetweennessStatsResult(BaseResult): """Result of running Betweenness Centrality algorithm with stats mode.""" - model_config = ConfigDict(alias_generator=to_camel) - centrality_distribution: dict[str, Any] pre_processing_millis: int compute_millis: int post_processing_millis: int configuration: dict[str, Any] - def __getitem__(self, item: str) -> Any: - return getattr(self, item) - -class BetweennessWriteResult(BaseModel): +class BetweennessWriteResult(BaseResult): """Result of running Betweenness Centrality algorithm with write mode.""" - model_config = ConfigDict(alias_generator=to_camel) - node_properties_written: int pre_processing_millis: int compute_millis: int @@ -288,6 +270,3 @@ class BetweennessWriteResult(BaseModel): write_millis: int centrality_distribution: dict[str, Any] configuration: dict[str, Any] - - def __getitem__(self, item: str) -> Any: - return getattr(self, item) diff --git a/graphdatascience/procedure_surface/api/celf_endpoints.py b/graphdatascience/procedure_surface/api/celf_endpoints.py new file mode 100644 index 000000000..fc15a5d59 --- /dev/null +++ b/graphdatascience/procedure_surface/api/celf_endpoints.py @@ -0,0 +1,352 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union + +from pandas import DataFrame + +from ...graph.graph_object import Graph +from .base_result import BaseResult +from .estimation_result import EstimationResult + + +class CelfEndpoints(ABC): + """ + Abstract base class defining the API for the Cost Effective Lazy Forward (CELF) algorithm. + + The CELF algorithm aims to find k nodes that maximize the expected spread of influence + in the network. It's an optimization of the greedy algorithm for influence maximization + that uses lazy evaluation to improve performance while maintaining the same approximation + guarantees. + + The algorithm works by iteratively selecting nodes that provide the maximum marginal + influence gain, using a lazy evaluation strategy to avoid redundant computations. + """ + + @abstractmethod + def mutate( + self, + G: Graph, + seed_set_size: int, + mutate_property: str, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + ) -> CelfMutateResult: + """ + Executes the CELF algorithm and writes the results to the in-memory graph as node properties. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + seed_set_size : int + The number of nodes to select as the seed set for influence maximization + mutate_property : str + The property name to store the influence spread value for each selected node + propagation_probability : Optional[float], default=None + The probability that influence spreads from one node to another. + If not specified, uses the default value from the algorithm configuration. + monte_carlo_simulations : Optional[int], default=None + The number of Monte Carlo simulations used to estimate influence spread. + Higher values provide more accurate estimates but increase computation time. + random_seed : Optional[Any], default=None + Random seed for reproducible results. If not specified, uses a random seed. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + + Returns + ------- + CelfMutateResult + Algorithm metrics and statistics including the total influence spread + """ + pass + + @abstractmethod + def stats( + self, + G: Graph, + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + ) -> CelfStatsResult: + """ + Executes the CELF algorithm and returns statistics without writing the result to Neo4j. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + seed_set_size : int + The number of nodes to select as the seed set for influence maximization + propagation_probability : Optional[float], default=None + The probability that influence spreads from one node to another. + If not specified, uses the default value from the algorithm configuration. + monte_carlo_simulations : Optional[int], default=None + The number of Monte Carlo simulations used to estimate influence spread. + Higher values provide more accurate estimates but increase computation time. + random_seed : Optional[Any], default=None + Random seed for reproducible results. If not specified, uses a random seed. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + + Returns + ------- + CelfStatsResult + Algorithm statistics including the total influence spread + """ + pass + + @abstractmethod + def stream( + self, + G: Graph, + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + ) -> DataFrame: + """ + Executes the CELF algorithm and returns a stream of results. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + seed_set_size : int + The number of nodes to select as the seed set for influence maximization + propagation_probability : Optional[float], default=None + The probability that influence spreads from one node to another. + If not specified, uses the default value from the algorithm configuration. + monte_carlo_simulations : Optional[int], default=None + The number of Monte Carlo simulations used to estimate influence spread. + Higher values provide more accurate estimates but increase computation time. + random_seed : Optional[Any], default=None + Random seed for reproducible results. If not specified, uses a random seed. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + + Returns + ------- + DataFrame + DataFrame with nodeId and spread columns containing CELF results. + Each row represents a selected node with its corresponding influence spread value. + """ + pass + + @abstractmethod + def write( + self, + G: Graph, + seed_set_size: int, + write_property: str, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + write_concurrency: Optional[Any] = None, + ) -> CelfWriteResult: + """ + Executes the CELF algorithm and writes the results to the Neo4j database. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + seed_set_size : int + The number of nodes to select as the seed set for influence maximization + write_property : str + The property name to store the influence spread value for each selected node in the database + propagation_probability : Optional[float], default=None + The probability that influence spreads from one node to another. + If not specified, uses the default value from the algorithm configuration. + monte_carlo_simulations : Optional[int], default=None + The number of Monte Carlo simulations used to estimate influence spread. + Higher values provide more accurate estimates but increase computation time. + random_seed : Optional[Any], default=None + Random seed for reproducible results. If not specified, uses a random seed. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + write_concurrency : Optional[Any], default=None + The number of concurrent threads used during the write phase. + If not specified, uses the same value as concurrency. + + Returns + ------- + CelfWriteResult + Algorithm metrics and statistics including the total influence spread and write timing + """ + pass + + # TODO add for params for estimate in other endpoints as well (missed before) + @abstractmethod + def estimate( + self, + G: Union[Graph, dict[str, Any]], + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + concurrency: Optional[Any] = None, + ) -> EstimationResult: + """ + Estimate the memory consumption of the CELF algorithm. + + This method provides an estimate of the memory requirements for running the algorithm + on a given graph, helping with capacity planning and resource allocation. + + Parameters + ---------- + G : Union[Graph, dict[str, Any]] + The graph to be used in the estimation. Provided either as a Graph object or a configuration dictionary for the projection. + seed_set_size : int + The number of nodes to select as the seed set for influence maximization. + propagation_probability : Optional[float], default=None + The probability that influence spreads from one node to another. + If not specified, uses the default value from the algorithm configuration. + monte_carlo_simulations : Optional[int], default=None + The number of Monte Carlo simulations used to estimate influence spread. + Higher values provide more accurate estimates but increase computation time. + random_seed : Optional[Any], default=None + Random seed for reproducible results. If not specified, uses a random seed. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + concurrency : Optional[Any], default=None + The number of concurrent threads used for the estimation. + If not specified, uses the default concurrency level. + + Returns + ------- + EstimationResult + An object containing the result of the estimation including memory requirements + """ + pass + + +class CelfMutateResult(BaseResult): + """Result of running CELF algorithm with mutate mode.""" + + node_properties_written: int + mutate_millis: int + compute_millis: int + total_spread: float + node_count: int + configuration: dict[str, Any] + + +class CelfStatsResult(BaseResult): + """Result of running CELF algorithm with stats mode.""" + + compute_millis: int + total_spread: float + node_count: int + configuration: dict[str, Any] + + +class CelfWriteResult(BaseResult): + """Result of running CELF algorithm with write mode.""" + + node_properties_written: int + write_millis: int + compute_millis: int + total_spread: float + node_count: int + configuration: dict[str, Any] diff --git a/graphdatascience/procedure_surface/api/degree_endpoints.py b/graphdatascience/procedure_surface/api/degree_endpoints.py new file mode 100644 index 000000000..6d86fb519 --- /dev/null +++ b/graphdatascience/procedure_surface/api/degree_endpoints.py @@ -0,0 +1,309 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union + +from pandas import DataFrame + +from ...graph.graph_object import Graph +from .base_result import BaseResult +from .estimation_result import EstimationResult + + +class DegreeEndpoints(ABC): + """ + Abstract base class defining the API for the Degree Centrality algorithm. + + Degree centrality measures the number of incoming and outgoing relationships from a node. + It's one of the simplest centrality measures, where a node's importance is determined by + the number of direct connections it has. + """ + + @abstractmethod + def mutate( + self, + G: Graph, + mutate_property: str, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + ) -> DegreeMutateResult: + """ + Executes the Degree Centrality algorithm and writes the results to the in-memory graph as node properties. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + mutate_property : str + The property name to store the degree centrality score for each node + orientation : Optional[Any], default=None + The orientation of relationships to consider. Can be 'NATURAL', 'REVERSE', or 'UNDIRECTED'. + 'NATURAL' (default) respects the direction of relationships as they are stored in the graph. + 'REVERSE' treats each relationship as if it were directed in the opposite direction. + 'UNDIRECTED' treats all relationships as undirected, effectively counting both directions. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + relationship_weight_property : Optional[str], default=None + The property name that contains relationship weights. If specified, + weighted degree centrality is computed where each relationship contributes + its weight to the total degree. + + Returns + ------- + DegreeMutateResult + Algorithm metrics and statistics including the centrality distribution + """ + pass + + @abstractmethod + def stats( + self, + G: Graph, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + ) -> DegreeStatsResult: + """ + Executes the Degree Centrality algorithm and returns statistics without writing the result to Neo4j. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + orientation : Optional[Any], default=None + The orientation of relationships to consider. Can be 'NATURAL', 'REVERSE', or 'UNDIRECTED'. + 'NATURAL' (default) respects the direction of relationships as they are stored in the graph. + 'REVERSE' treats each relationship as if it were directed in the opposite direction. + 'UNDIRECTED' treats all relationships as undirected, effectively counting both directions. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + relationship_weight_property : Optional[str], default=None + The property name that contains relationship weights. If specified, + weighted degree centrality is computed where each relationship contributes + its weight to the total degree. + + Returns + ------- + DegreeStatsResult + Algorithm statistics including the centrality distribution + """ + pass + + @abstractmethod + def stream( + self, + G: Graph, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + ) -> DataFrame: + """ + Executes the Degree Centrality algorithm and returns a stream of results. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + orientation : Optional[Any], default=None + The orientation of relationships to consider. Can be 'NATURAL', 'REVERSE', or 'UNDIRECTED'. + 'NATURAL' (default) respects the direction of relationships as they are stored in the graph. + 'REVERSE' treats each relationship as if it were directed in the opposite direction. + 'UNDIRECTED' treats all relationships as undirected, effectively counting both directions. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + relationship_weight_property : Optional[str], default=None + The property name that contains relationship weights. If specified, + weighted degree centrality is computed where each relationship contributes + its weight to the total degree. + + Returns + ------- + DataFrame + DataFrame with nodeId and score columns containing degree centrality results. + Each row represents a node with its corresponding degree centrality score. + """ + pass + + @abstractmethod + def write( + self, + G: Graph, + write_property: str, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + write_concurrency: Optional[Any] = None, + ) -> DegreeWriteResult: + """ + Executes the Degree Centrality algorithm and writes the results to the Neo4j database. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + write_property : str + The property name to store the degree centrality score for each node in the database + orientation : Optional[Any], default=None + The orientation of relationships to consider. Can be 'NATURAL', 'REVERSE', or 'UNDIRECTED'. + 'NATURAL' (default) respects the direction of relationships as they are stored in the graph. + 'REVERSE' treats each relationship as if it were directed in the opposite direction. + 'UNDIRECTED' treats all relationships as undirected, effectively counting both directions. + relationship_types : Optional[List[str]], default=None + The relationship types used to select relationships for this algorithm run. + If not specified, all relationship types are considered. + node_labels : Optional[List[str]], default=None + The node labels used to select nodes for this algorithm run. + If not specified, all node labels are considered. + sudo : Optional[bool], default=None + Override memory estimation limits. Use with caution as this can lead to + memory issues if the estimation is significantly wrong. + log_progress : Optional[bool], default=None + Whether to log progress of the algorithm execution + username : Optional[str], default=None + The username to attribute the procedure run to for auditing purposes + concurrency : Optional[Any], default=None + The number of concurrent threads used for the algorithm execution. + If not specified, uses the default concurrency level. + job_id : Optional[Any], default=None + An identifier for the job that can be used for monitoring and cancellation + relationship_weight_property : Optional[str], default=None + The property name that contains relationship weights. If specified, + weighted degree centrality is computed where each relationship contributes + its weight to the total degree. + write_concurrency : Optional[Any], default=None + The number of concurrent threads used during the write phase. + If not specified, uses the same value as concurrency. + + Returns + ------- + DegreeWriteResult + Algorithm metrics and statistics including the centrality distribution and write timing + """ + pass + + @abstractmethod + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + """ + Estimate the memory consumption of the Degree Centrality algorithm. + + This method provides an estimate of the memory requirements for running the algorithm + on a given graph, helping with capacity planning and resource allocation. + + Parameters + ---------- + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. + + Returns + ------- + EstimationResult + An object containing the result of the estimation including memory requirements + """ + pass + + +class DegreeMutateResult(BaseResult): + """Result of running Degree Centrality algorithm with mutate mode.""" + + node_properties_written: int + pre_processing_millis: int + compute_millis: int + post_processing_millis: int + mutate_millis: int + centrality_distribution: dict[str, Any] + configuration: dict[str, Any] + + +class DegreeStatsResult(BaseResult): + """Result of running Degree Centrality algorithm with stats mode.""" + + centrality_distribution: dict[str, Any] + pre_processing_millis: int + compute_millis: int + post_processing_millis: int + configuration: dict[str, Any] + + +class DegreeWriteResult(BaseResult): + """Result of running Degree Centrality algorithm with write mode.""" + + node_properties_written: int + pre_processing_millis: int + compute_millis: int + post_processing_millis: int + write_millis: int + centrality_distribution: dict[str, Any] + configuration: dict[str, Any] diff --git a/graphdatascience/procedure_surface/api/k1coloring_endpoints.py b/graphdatascience/procedure_surface/api/k1coloring_endpoints.py index 21f3f1a99..57119980a 100644 --- a/graphdatascience/procedure_surface/api/k1coloring_endpoints.py +++ b/graphdatascience/procedure_surface/api/k1coloring_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -209,8 +209,6 @@ def write( An identifier for the job write_concurrency : Optional[Any], default=None The number of concurrent threads during the write phase - write_to_result_store : Optional[bool], default=None - Whether to write to the result store min_community_size : Optional[int], default=None Only community ids of communities with a size greater than or equal to the given value are written to Neo4j @@ -222,20 +220,14 @@ def write( pass @abstractmethod - def estimate( - self, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. Parameters ---------- - G : Optional[Graph], optional - The graph to be used in the estimation - projection_config : Optional[dict[str, Any]], optional - Configuration dictionary for the projection. + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/api/kcore_endpoints.py b/graphdatascience/procedure_surface/api/kcore_endpoints.py index 81b34f497..62b85de8e 100644 --- a/graphdatascience/procedure_surface/api/kcore_endpoints.py +++ b/graphdatascience/procedure_surface/api/kcore_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -196,20 +196,14 @@ def write( pass @abstractmethod - def estimate( - self, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. Parameters ---------- - G : Optional[Graph], optional - The graph to be used in the estimation - projection_config : Optional[dict[str, Any]], optional - Configuration dictionary for the projection. + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/api/louvain_endpoints.py b/graphdatascience/procedure_surface/api/louvain_endpoints.py index 1514cb36c..7cb526894 100644 --- a/graphdatascience/procedure_surface/api/louvain_endpoints.py +++ b/graphdatascience/procedure_surface/api/louvain_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -269,8 +269,6 @@ def write( The property name that contains weight write_concurrency : Optional[Any], default=None The number of concurrent threads during the write phase - write_to_result_store : Optional[bool], default=None - Whether to write results to the result store min_community_size : Optional[int], default=None Don't write communities with fewer nodes than this @@ -282,20 +280,14 @@ def write( pass @abstractmethod - def estimate( - self, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. Parameters ---------- - G : Optional[Graph], optional - The graph to be used in the estimation - projection_config : Optional[dict[str, Any]], optional - Configuration dictionary for the projection. + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/api/pagerank_endpoints.py b/graphdatascience/procedure_surface/api/pagerank_endpoints.py index 85dc361b5..8a4555d3c 100644 --- a/graphdatascience/procedure_surface/api/pagerank_endpoints.py +++ b/graphdatascience/procedure_surface/api/pagerank_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -262,18 +262,14 @@ def write( pass @abstractmethod - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimates the memory requirements for running the PageRank algorithm. Parameters ---------- - G : Optional[Graph], default=None - The graph to estimate for - projection_config : Optional[dict[str, Any]], default=None - Configuration for graph projection + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/api/scc_endpoints.py b/graphdatascience/procedure_surface/api/scc_endpoints.py index 6b19889d4..fe3ccc5f9 100644 --- a/graphdatascience/procedure_surface/api/scc_endpoints.py +++ b/graphdatascience/procedure_surface/api/scc_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -193,8 +193,6 @@ def write( Flag to decide whether component identifiers are mapped into a consecutive id space write_concurrency : Optional[Any], default=None The number of concurrent threads during the write phase - write_to_result_store : Optional[bool], default=None - Whether to write to the result store Returns ------- @@ -204,20 +202,14 @@ def write( pass @abstractmethod - def estimate( - self, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. Parameters ---------- - G : Optional[Graph], optional - The graph to be used in the estimation - projection_config : Optional[dict[str, Any]], optional - Configuration dictionary for the projection. + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/api/wcc_endpoints.py b/graphdatascience/procedure_surface/api/wcc_endpoints.py index c88d3a625..c9bdc04f3 100644 --- a/graphdatascience/procedure_surface/api/wcc_endpoints.py +++ b/graphdatascience/procedure_surface/api/wcc_endpoints.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame from pydantic import BaseModel, ConfigDict @@ -244,20 +244,14 @@ def write( pass @abstractmethod - def estimate( - self, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. Parameters ---------- - G : Optional[Graph], optional - The graph to be used in the estimation - projection_config : Optional[dict[str, Any]], optional - Configuration dictionary for the projection. + G : Union[Graph, dict[str, Any]] + The graph to run the algorithm on or a dictionary representing the graph. Returns ------- diff --git a/graphdatascience/procedure_surface/arrow/articlerank_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/articlerank_arrow_endpoints.py index 0a386b1ba..e48b162ec 100644 --- a/graphdatascience/procedure_surface/arrow/articlerank_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/articlerank_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -174,7 +174,5 @@ def write( return ArticleRankWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/centrality.articleRank.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/centrality.articleRank.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/articulationpoints_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/articulationpoints_arrow_endpoints.py index 063227c28..3bf5f05c2 100644 --- a/graphdatascience/procedure_surface/arrow/articulationpoints_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/articulationpoints_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -107,7 +107,6 @@ def write( concurrency: Optional[Any] = None, job_id: Optional[Any] = None, write_concurrency: Optional[Any] = None, - write_to_result_store: Optional[bool] = None, ) -> ArticulationPointsWriteResult: config = self._node_property_endpoints.create_base_config( G, @@ -118,7 +117,6 @@ def write( relationship_types=relationship_types, sudo=sudo, username=username, - write_to_result_store=write_to_result_store, ) result = self._node_property_endpoints.run_job_and_write( @@ -128,6 +126,10 @@ def write( return ArticulationPointsWriteResult(**result) def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None + self, + G: Union[Graph, dict[str, Any]], + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + concurrency: Optional[Any] = None, ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/centrality.articulationPoints.estimate", G, projection_config) + return self._node_property_endpoints.estimate("v2/centrality.articulationPoints.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/betweenness_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/betweenness_arrow_endpoints.py index 87ffaacfd..a7fc499aa 100644 --- a/graphdatascience/procedure_surface/arrow/betweenness_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/betweenness_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -135,7 +135,6 @@ def write( job_id: Optional[Any] = None, relationship_weight_property: Optional[str] = None, write_concurrency: Optional[Any] = None, - write_to_result_store: Optional[bool] = None, ) -> BetweennessWriteResult: config = self._node_property_endpoints.create_base_config( G, @@ -149,7 +148,6 @@ def write( sampling_seed=sampling_seed, sudo=sudo, username=username, - write_to_result_store=write_to_result_store, ) result = self._node_property_endpoints.run_job_and_write( @@ -158,7 +156,5 @@ def write( return BetweennessWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/centrality.betweenness.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/centrality.betweenness.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/celf_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/celf_arrow_endpoints.py new file mode 100644 index 000000000..37bc23454 --- /dev/null +++ b/graphdatascience/procedure_surface/arrow/celf_arrow_endpoints.py @@ -0,0 +1,173 @@ +from typing import Any, List, Optional, Union + +from pandas import DataFrame + +from ...arrow_client.authenticated_flight_client import AuthenticatedArrowClient +from ...arrow_client.v2.write_back_client import WriteBackClient +from ...graph.graph_object import Graph +from ..api.celf_endpoints import CelfEndpoints, CelfMutateResult, CelfStatsResult, CelfWriteResult +from ..api.estimation_result import EstimationResult +from .node_property_endpoints import NodePropertyEndpoints + + +class CelfArrowEndpoints(CelfEndpoints): + def __init__(self, arrow_client: AuthenticatedArrowClient, write_back_client: Optional[WriteBackClient] = None): + self._node_property_endpoints = NodePropertyEndpoints(arrow_client, write_back_client) + + def mutate( + self, + G: Graph, + seed_set_size: int, + mutate_property: str, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + ) -> CelfMutateResult: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + relationship_types=relationship_types, + sudo=sudo, + seed_set_size=seed_set_size, + propagation_probability=propagation_probability, + monte_carlo_simulations=monte_carlo_simulations, + random_seed=random_seed, + ) + + result = self._node_property_endpoints.run_job_and_mutate("v2/centrality.celf", G, config, mutate_property) + + return CelfMutateResult(**result) + + def stats( + self, + G: Graph, + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + ) -> CelfStatsResult: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + relationship_types=relationship_types, + sudo=sudo, + seed_set_size=seed_set_size, + propagation_probability=propagation_probability, + monte_carlo_simulations=monte_carlo_simulations, + random_seed=random_seed, + ) + + computation_result = self._node_property_endpoints.run_job_and_get_summary("v2/centrality.celf", G, config) + + return CelfStatsResult(**computation_result) + + def stream( + self, + G: Graph, + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + ) -> DataFrame: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + relationship_types=relationship_types, + sudo=sudo, + seed_set_size=seed_set_size, + propagation_probability=propagation_probability, + monte_carlo_simulations=monte_carlo_simulations, + random_seed=random_seed, + ) + + return self._node_property_endpoints.run_job_and_stream("v2/centrality.celf", G, config) + + def write( + self, + G: Graph, + seed_set_size: int, + write_property: str, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + write_concurrency: Optional[int] = None, + ) -> CelfWriteResult: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + relationship_types=relationship_types, + sudo=sudo, + seed_set_size=seed_set_size, + propagation_probability=propagation_probability, + monte_carlo_simulations=monte_carlo_simulations, + random_seed=random_seed, + ) + + result = self._node_property_endpoints.run_job_and_write( + "v2/centrality.celf", G, config, write_concurrency=write_concurrency, concurrency=concurrency + ) + + return CelfWriteResult(**result) + + def estimate( + self, + G: Union[Graph, dict[str, Any]], + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + concurrency: Optional[Any] = None, + ) -> EstimationResult: + algo_config = self._node_property_endpoints.create_estimate_config( + seed_set_size=seed_set_size, + propagation_probability=propagation_probability, + monte_carlo_simulations=monte_carlo_simulations, + random_seed=random_seed, + relationship_types=relationship_types, + node_labels=node_labels, + concurrency=concurrency, + ) + + return self._node_property_endpoints.estimate("v2/centrality.celf.estimate", G, algo_config) diff --git a/graphdatascience/procedure_surface/arrow/degree_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/degree_arrow_endpoints.py new file mode 100644 index 000000000..ba257796e --- /dev/null +++ b/graphdatascience/procedure_surface/arrow/degree_arrow_endpoints.py @@ -0,0 +1,137 @@ +from typing import Any, List, Optional, Union + +from pandas import DataFrame + +from ...arrow_client.authenticated_flight_client import AuthenticatedArrowClient +from ...arrow_client.v2.write_back_client import WriteBackClient +from ...graph.graph_object import Graph +from ..api.degree_endpoints import DegreeEndpoints, DegreeMutateResult, DegreeStatsResult, DegreeWriteResult +from ..api.estimation_result import EstimationResult +from .node_property_endpoints import NodePropertyEndpoints + + +class DegreeArrowEndpoints(DegreeEndpoints): + def __init__(self, arrow_client: AuthenticatedArrowClient, write_back_client: Optional[WriteBackClient] = None): + self._node_property_endpoints = NodePropertyEndpoints(arrow_client, write_back_client) + + def mutate( + self, + G: Graph, + mutate_property: str, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + relationship_weight_property: Optional[str] = None, + ) -> DegreeMutateResult: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + orientation=orientation, + relationship_types=relationship_types, + relationship_weight_property=relationship_weight_property, + sudo=sudo, + ) + + result = self._node_property_endpoints.run_job_and_mutate("v2/centrality.degree", G, config, mutate_property) + + return DegreeMutateResult(**result) + + def stats( + self, + G: Graph, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + relationship_weight_property: Optional[str] = None, + ) -> DegreeStatsResult: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + orientation=orientation, + relationship_types=relationship_types, + relationship_weight_property=relationship_weight_property, + sudo=sudo, + ) + + computation_result = self._node_property_endpoints.run_job_and_get_summary("v2/centrality.degree", G, config) + + return DegreeStatsResult(**computation_result) + + def stream( + self, + G: Graph, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + relationship_weight_property: Optional[str] = None, + ) -> DataFrame: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + orientation=orientation, + relationship_types=relationship_types, + relationship_weight_property=relationship_weight_property, + sudo=sudo, + ) + + return self._node_property_endpoints.run_job_and_stream("v2/centrality.degree", G, config) + + def write( + self, + G: Graph, + write_property: str, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + relationship_weight_property: Optional[str] = None, + write_concurrency: Optional[int] = None, + ) -> DegreeWriteResult: + config = self._node_property_endpoints.create_base_config( + G, + concurrency=concurrency, + job_id=job_id, + log_progress=log_progress, + node_labels=node_labels, + orientation=orientation, + relationship_types=relationship_types, + relationship_weight_property=relationship_weight_property, + sudo=sudo, + ) + + result = self._node_property_endpoints.run_job_and_write( + "v2/centrality.degree", G, config, write_concurrency, concurrency + ) + + return DegreeWriteResult(**result) + + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/centrality.degree.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/k1coloring_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/k1coloring_arrow_endpoints.py index e2652e82c..3ffe7dfc6 100644 --- a/graphdatascience/procedure_surface/arrow/k1coloring_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/k1coloring_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -146,7 +146,5 @@ def write( return K1ColoringWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/community.k1coloring.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/community.k1coloring.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/kcore_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/kcore_arrow_endpoints.py index 0cc763034..14ce6b60a 100644 --- a/graphdatascience/procedure_surface/arrow/kcore_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/kcore_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -120,7 +120,5 @@ def write( ) return KCoreWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/community.kcore.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/community.kcore.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/louvain_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/louvain_arrow_endpoints.py index be7344f9c..27f3ddeb2 100644 --- a/graphdatascience/procedure_surface/arrow/louvain_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/louvain_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -178,7 +178,5 @@ def write( return LouvainWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/community.louvain.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/community.louvain.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/node_property_endpoints.py b/graphdatascience/procedure_surface/arrow/node_property_endpoints.py index 002624f73..f2002b3e2 100644 --- a/graphdatascience/procedure_surface/arrow/node_property_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/node_property_endpoints.py @@ -1,5 +1,5 @@ import json -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union from pandas import DataFrame @@ -82,17 +82,26 @@ def create_base_config(self, G: Graph, **kwargs: Any) -> Dict[str, Any]: """Create base configuration with common parameters.""" return ConfigConverter.convert_to_gds_config(graph_name=G.name(), **kwargs) + def create_estimate_config(self, **kwargs: Any) -> Dict[str, Any]: + """Create configuration for estimation.""" + return ConfigConverter.convert_to_gds_config(**kwargs) + def estimate( - self, estimate_endpoint: str, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None + self, + estimate_endpoint: str, + G: Union[Graph, dict[str, Any]], + algo_config: Optional[dict[str, Any]] = None, ) -> EstimationResult: """Estimate memory requirements for the algorithm.""" - if G is not None: + if isinstance(G, Graph): payload = {"graphName": G.name()} - elif projection_config is not None: - payload = projection_config + elif isinstance(G, dict): + payload = G else: raise ValueError("Either graph_name or projection_config must be provided.") + payload.update(algo_config or {}) + res = self._arrow_client.do_action_with_retry(estimate_endpoint, json.dumps(payload).encode("utf-8")) return EstimationResult(**deserialize_single(res)) diff --git a/graphdatascience/procedure_surface/arrow/pagerank_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/pagerank_arrow_endpoints.py index eaa29deb0..d9ddcb4cc 100644 --- a/graphdatascience/procedure_surface/arrow/pagerank_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/pagerank_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -164,7 +164,5 @@ def write( ) return PageRankWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/centrality.pageRank.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/centrality.pageRank.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/scc_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/scc_arrow_endpoints.py index 127b19b8c..e3648132f 100644 --- a/graphdatascience/procedure_surface/arrow/scc_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/scc_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -125,7 +125,5 @@ def write( return SccWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/community.scc.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/community.scc.estimate", G) diff --git a/graphdatascience/procedure_surface/arrow/wcc_arrow_endpoints.py b/graphdatascience/procedure_surface/arrow/wcc_arrow_endpoints.py index 90190be1d..f066abb1f 100644 --- a/graphdatascience/procedure_surface/arrow/wcc_arrow_endpoints.py +++ b/graphdatascience/procedure_surface/arrow/wcc_arrow_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -153,7 +153,5 @@ def write( return WccWriteResult(**result) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return self._node_property_endpoints.estimate("v2/community.wcc.estimate", G, projection_config) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return self._node_property_endpoints.estimate("v2/community.wcc.estimate", G) diff --git a/graphdatascience/procedure_surface/cypher/articlerank_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/articlerank_cypher_endpoints.py index 8adca283c..838d1c8d4 100644 --- a/graphdatascience/procedure_surface/cypher/articlerank_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/articlerank_cypher_endpoints.py @@ -1,7 +1,9 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame +from graphdatascience.procedure_surface.cypher.estimation_utils import estimate_algorithm + from ...call_parameters import CallParameters from ...graph.graph_object import Graph from ...query_runner.query_runner import QueryRunner @@ -192,23 +194,5 @@ def write( return ArticleRankWriteResult(**cypher_result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - if G is None and projection_config is None: - raise ValueError("Either 'G' or 'projection_config' must be provided") - - if G is not None: - # Use graph name for estimation - params = CallParameters(graph_name=G.name(), config={}) - cypher_result = self._query_runner.call_procedure( - endpoint="gds.articleRank.stream.estimate", params=params - ).squeeze() - else: - # Use projection config for estimation - params = CallParameters(graph_name=projection_config, config={}) - cypher_result = self._query_runner.call_procedure( - endpoint="gds.articleRank.stream.estimate", params=params - ).squeeze() - - return EstimationResult(**cypher_result.to_dict()) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm(endpoint="gds.articleRank.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/cypher/articulationpoints_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/articulationpoints_cypher_endpoints.py index 8cab6fa16..0d5c397c5 100644 --- a/graphdatascience/procedure_surface/cypher/articulationpoints_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/articulationpoints_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -126,7 +126,6 @@ def write( concurrency: Optional[Any] = None, job_id: Optional[Any] = None, write_concurrency: Optional[Any] = None, - write_to_result_store: Optional[bool] = None, ) -> ArticulationPointsWriteResult: config = ConfigConverter.convert_to_gds_config( write_property=write_property, @@ -138,7 +137,6 @@ def write( sudo=sudo, username=username, write_concurrency=write_concurrency, - write_to_result_store=write_to_result_store, ) # Run procedure and return results @@ -152,26 +150,21 @@ def write( return ArticulationPointsWriteResult(**cypher_result.to_dict()) def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None + self, + G: Union[Graph, dict[str, Any]], + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + concurrency: Optional[Any] = None, ) -> EstimationResult: - """ - Estimates the memory requirements for running the ArticulationPoints algorithm. - - Parameters - ---------- - G : Optional[Graph], default=None - The graph to estimate memory requirements for - projection_config : Optional[dict[str, Any]], default=None - Configuration for graph projection - - Returns - ------- - EstimationResult - Memory estimation results - """ + # Build algorithm configuration mirroring other algorithms (see CELF implementation) + algo_config = ConfigConverter.convert_to_gds_config( + relationship_types=relationship_types, + node_labels=node_labels, + concurrency=concurrency, + ) return estimate_algorithm( endpoint="gds.articulationPoints.stats.estimate", query_runner=self._query_runner, G=G, - projection_config=projection_config, + algo_config=algo_config, ) diff --git a/graphdatascience/procedure_surface/cypher/betweenness_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/betweenness_cypher_endpoints.py index af0970d55..43898fd35 100644 --- a/graphdatascience/procedure_surface/cypher/betweenness_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/betweenness_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -143,7 +143,6 @@ def write( job_id: Optional[Any] = None, relationship_weight_property: Optional[str] = None, write_concurrency: Optional[Any] = None, - write_to_result_store: Optional[bool] = None, ) -> BetweennessWriteResult: config = ConfigConverter.convert_to_gds_config( write_property=write_property, @@ -158,7 +157,6 @@ def write( sudo=sudo, username=username, write_concurrency=write_concurrency, - write_to_result_store=write_to_result_store, ) params = CallParameters(graph_name=G.name(), config=config) @@ -168,12 +166,9 @@ def write( return BetweennessWriteResult(**result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: return estimate_algorithm( endpoint="gds.betweenness.stats.estimate", query_runner=self._query_runner, G=G, - projection_config=projection_config, ) diff --git a/graphdatascience/procedure_surface/cypher/celf_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/celf_cypher_endpoints.py new file mode 100644 index 000000000..1b2d51a7c --- /dev/null +++ b/graphdatascience/procedure_surface/cypher/celf_cypher_endpoints.py @@ -0,0 +1,202 @@ +from typing import Any, List, Optional, Union + +from pandas import DataFrame + +from ...call_parameters import CallParameters +from ...graph.graph_object import Graph +from ...query_runner.query_runner import QueryRunner +from ..api.celf_endpoints import CelfEndpoints, CelfMutateResult, CelfStatsResult, CelfWriteResult +from ..api.estimation_result import EstimationResult +from ..utils.config_converter import ConfigConverter +from .estimation_utils import estimate_algorithm + + +class CelfCypherEndpoints(CelfEndpoints): + def __init__(self, query_runner: QueryRunner): + self._query_runner = query_runner + + def mutate( + self, + G: Graph, + seed_set_size: int, + mutate_property: str, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + ) -> CelfMutateResult: + config = ConfigConverter.convert_to_gds_config( + seedSetSize=seed_set_size, + mutateProperty=mutate_property, + propagationProbability=propagation_probability, + monteCarloSimulations=monte_carlo_simulations, + randomSeed=random_seed, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure( + endpoint="gds.influenceMaximization.celf.mutate", params=params + ).squeeze() + return CelfMutateResult(**result.to_dict()) + + def stats( + self, + G: Graph, + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + ) -> CelfStatsResult: + config = ConfigConverter.convert_to_gds_config( + seedSetSize=seed_set_size, + propagationProbability=propagation_probability, + monteCarloSimulations=monte_carlo_simulations, + randomSeed=random_seed, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure( + endpoint="gds.influenceMaximization.celf.stats", params=params + ).squeeze() + return CelfStatsResult(**result.to_dict()) + + def stream( + self, + G: Graph, + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + ) -> DataFrame: + config = ConfigConverter.convert_to_gds_config( + seedSetSize=seed_set_size, + propagationProbability=propagation_probability, + monteCarloSimulations=monte_carlo_simulations, + randomSeed=random_seed, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + return self._query_runner.call_procedure(endpoint="gds.influenceMaximization.celf.stream", params=params) + + def write( + self, + G: Graph, + seed_set_size: int, + write_property: str, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + write_concurrency: Optional[Any] = None, + ) -> CelfWriteResult: + config = ConfigConverter.convert_to_gds_config( + seedSetSize=seed_set_size, + writeProperty=write_property, + propagationProbability=propagation_probability, + monteCarloSimulations=monte_carlo_simulations, + randomSeed=random_seed, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + writeConcurrency=write_concurrency, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure( + endpoint="gds.influenceMaximization.celf.write", params=params + ).squeeze() + return CelfWriteResult(**result.to_dict()) + + def estimate( + self, + G: Union[Graph, dict[str, Any]], + seed_set_size: int, + propagation_probability: Optional[float] = None, + monte_carlo_simulations: Optional[int] = None, + random_seed: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + concurrency: Optional[Any] = None, + ) -> EstimationResult: + algo_config = ConfigConverter.convert_to_gds_config( + seedSetSize=seed_set_size, + propagationProbability=propagation_probability, + monteCarloSimulations=monte_carlo_simulations, + randomSeed=random_seed, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + concurrency=concurrency, + ) + + return estimate_algorithm("gds.influenceMaximization.celf.stats.estimate", self._query_runner, G, algo_config) diff --git a/graphdatascience/procedure_surface/cypher/degree_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/degree_cypher_endpoints.py new file mode 100644 index 000000000..cfa5b1929 --- /dev/null +++ b/graphdatascience/procedure_surface/cypher/degree_cypher_endpoints.py @@ -0,0 +1,163 @@ +from typing import Any, List, Optional, Union + +from pandas import DataFrame + +from ...call_parameters import CallParameters +from ...graph.graph_object import Graph +from ...query_runner.query_runner import QueryRunner +from ..api.degree_endpoints import DegreeEndpoints, DegreeMutateResult, DegreeStatsResult, DegreeWriteResult +from ..api.estimation_result import EstimationResult +from ..utils.config_converter import ConfigConverter +from .estimation_utils import estimate_algorithm + + +class DegreeCypherEndpoints(DegreeEndpoints): + def __init__(self, query_runner: QueryRunner): + self._query_runner = query_runner + + def mutate( + self, + G: Graph, + mutate_property: str, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + ) -> DegreeMutateResult: + config = ConfigConverter.convert_to_gds_config( + mutateProperty=mutate_property, + orientation=orientation, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + relationshipWeightProperty=relationship_weight_property, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure(endpoint="gds.degree.mutate", params=params).squeeze() + return DegreeMutateResult(**result.to_dict()) + + def stats( + self, + G: Graph, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + ) -> DegreeStatsResult: + config = ConfigConverter.convert_to_gds_config( + orientation=orientation, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + relationshipWeightProperty=relationship_weight_property, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + result = self._query_runner.call_procedure(endpoint="gds.degree.stats", params=params).squeeze() + return DegreeStatsResult(**result.to_dict()) + + def stream( + self, + G: Graph, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + ) -> DataFrame: + config = ConfigConverter.convert_to_gds_config( + orientation=orientation, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + relationshipWeightProperty=relationship_weight_property, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + params.ensure_job_id_in_config() + + return self._query_runner.call_procedure(endpoint="gds.degree.stream", params=params) + + def write( + self, + G: Graph, + write_property: str, + orientation: Optional[Any] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + relationship_weight_property: Optional[str] = None, + write_concurrency: Optional[Any] = None, + ) -> DegreeWriteResult: + config = ConfigConverter.convert_to_gds_config( + writeProperty=write_property, + orientation=orientation, + relationshipTypes=relationship_types, + nodeLabels=node_labels, + sudo=sudo, + logProgress=log_progress, + username=username, + concurrency=concurrency, + jobId=job_id, + relationshipWeightProperty=relationship_weight_property, + writeConcurrency=write_concurrency, + ) + + params = CallParameters( + graph_name=G.name(), + config=config, + ) + + result = self._query_runner.call_procedure(endpoint="gds.degree.write", params=params).squeeze() + return DegreeWriteResult(**result.to_dict()) + + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm( + endpoint="gds.degree.stats.estimate", + query_runner=self._query_runner, + G=G, + ) diff --git a/graphdatascience/procedure_surface/cypher/estimation_utils.py b/graphdatascience/procedure_surface/cypher/estimation_utils.py index 8063e3c7d..086495eeb 100644 --- a/graphdatascience/procedure_surface/cypher/estimation_utils.py +++ b/graphdatascience/procedure_surface/cypher/estimation_utils.py @@ -10,8 +10,8 @@ def estimate_algorithm( endpoint: str, query_runner: QueryRunner, - G: Optional[Graph] = None, - projection_config: Optional[dict[str, Any]] = None, + G: Union[Graph, dict[str, Any]], + algo_config: Optional[dict[str, Any]] = None, ) -> EstimationResult: """ Estimate the memory consumption of an algorithm run. @@ -29,6 +29,8 @@ def estimate_algorithm( The graph to be used in the estimation projection_config : Optional[dict[str, Any]], optional Configuration dictionary for the projection + algo_config : Optional[dict[str, Any]], optional + Additional algorithm-specific configuration parameters Returns ------- @@ -42,14 +44,14 @@ def estimate_algorithm( """ config: Union[dict[str, Any]] = OrderedDict() - if G is not None: + if isinstance(G, Graph): config["graphNameOrConfiguration"] = G.name() - elif projection_config is not None: - config["graphNameOrConfiguration"] = projection_config + elif isinstance(G, dict): + config["graphNameOrConfiguration"] = G else: - raise ValueError("Either graph_name or projection_config must be provided.") + raise ValueError(f"G must be either a Graph instance or a configuration dictionary. But was {type(G)}.") - config["algoConfig"] = {} + config["algoConfig"] = algo_config or {} params = CallParameters(**config) diff --git a/graphdatascience/procedure_surface/cypher/k1coloring_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/k1coloring_cypher_endpoints.py index 163585bac..23c28b156 100644 --- a/graphdatascience/procedure_surface/cypher/k1coloring_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/k1coloring_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -162,12 +162,5 @@ def write( return K1ColoringWriteResult(**result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return estimate_algorithm( - endpoint="gds.k1coloring.stats.estimate", - query_runner=self._query_runner, - G=G, - projection_config=projection_config, - ) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm(endpoint="gds.k1coloring.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/cypher/kcore_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/kcore_cypher_endpoints.py index 4e0c5cdcc..0f55b5e07 100644 --- a/graphdatascience/procedure_surface/cypher/kcore_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/kcore_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -141,12 +141,5 @@ def write( return KCoreWriteResult(**result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return estimate_algorithm( - endpoint="gds.kcore.stats.estimate", - query_runner=self._query_runner, - G=G, - projection_config=projection_config, - ) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm(endpoint="gds.kcore.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/cypher/louvain_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/louvain_cypher_endpoints.py index d9796c408..a178aacb3 100644 --- a/graphdatascience/procedure_surface/cypher/louvain_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/louvain_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -197,12 +197,5 @@ def write( return LouvainWriteResult(**result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return estimate_algorithm( - endpoint="gds.louvain.stats.estimate", - query_runner=self._query_runner, - G=G, - projection_config=projection_config, - ) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm(endpoint="gds.louvain.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/cypher/pagerank_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/pagerank_cypher_endpoints.py index 5f878b401..731888170 100644 --- a/graphdatascience/procedure_surface/cypher/pagerank_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/pagerank_cypher_endpoints.py @@ -1,4 +1,3 @@ -from collections import OrderedDict from typing import Any, List, Optional, Union from pandas import DataFrame @@ -9,6 +8,7 @@ from ..api.estimation_result import EstimationResult from ..api.pagerank_endpoints import PageRankEndpoints, PageRankMutateResult, PageRankStatsResult, PageRankWriteResult from ..utils.config_converter import ConfigConverter +from .estimation_utils import estimate_algorithm class PageRankCypherEndpoints(PageRankEndpoints): @@ -189,22 +189,5 @@ def write( return PageRankWriteResult(**result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - config: Union[dict[str, Any]] = OrderedDict() - - if G is not None: - config["graphNameOrConfiguration"] = G.name() - elif projection_config is not None: - config["graphNameOrConfiguration"] = projection_config - else: - raise ValueError("Either graph_name or projection_config must be provided.") - - config["algoConfig"] = {} - - params = CallParameters(**config) - - result = self._query_runner.call_procedure(endpoint="gds.pageRank.stats.estimate", params=params).squeeze() - - return EstimationResult(**result.to_dict()) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm(endpoint="gds.pageRank.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/cypher/scc_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/scc_cypher_endpoints.py index 2b98ce6b0..cc2aa23d7 100644 --- a/graphdatascience/procedure_surface/cypher/scc_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/scc_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -146,8 +146,7 @@ def write( return SccWriteResult(**result.to_dict()) def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None + self, + G: Union[Graph, dict[str, Any]], ) -> EstimationResult: - return estimate_algorithm( - endpoint="gds.scc.stats.estimate", query_runner=self._query_runner, G=G, projection_config=projection_config - ) + return estimate_algorithm(endpoint="gds.scc.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/cypher/wcc_cypher_endpoints.py b/graphdatascience/procedure_surface/cypher/wcc_cypher_endpoints.py index 9e7f74bf9..badc73170 100644 --- a/graphdatascience/procedure_surface/cypher/wcc_cypher_endpoints.py +++ b/graphdatascience/procedure_surface/cypher/wcc_cypher_endpoints.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from pandas import DataFrame @@ -173,9 +173,5 @@ def write( return WccWriteResult(**result.to_dict()) - def estimate( - self, G: Optional[Graph] = None, projection_config: Optional[dict[str, Any]] = None - ) -> EstimationResult: - return estimate_algorithm( - endpoint="gds.wcc.stats.estimate", query_runner=self._query_runner, G=G, projection_config=projection_config - ) + def estimate(self, G: Union[Graph, dict[str, Any]]) -> EstimationResult: + return estimate_algorithm(endpoint="gds.wcc.stats.estimate", query_runner=self._query_runner, G=G) diff --git a/graphdatascience/procedure_surface/utils/config_converter.py b/graphdatascience/procedure_surface/utils/config_converter.py index c6da60355..da33adcf1 100644 --- a/graphdatascience/procedure_surface/utils/config_converter.py +++ b/graphdatascience/procedure_surface/utils/config_converter.py @@ -16,6 +16,11 @@ def convert_to_gds_config(**kwargs: Optional[Any]) -> dict[str, Any]: def _convert_to_camel_case(name: str) -> str: """Convert a snake_case string to camelCase.""" parts = name.split("_") + + # skip if already converted + if len(parts) == 1: + return name + return "".join([word.capitalize() if i > 0 else word.lower() for i, word in enumerate(parts)]) @staticmethod diff --git a/graphdatascience/tests/integrationV2/procedure_surface/arrow/conftest.py b/graphdatascience/tests/integrationV2/procedure_surface/arrow/conftest.py index bb7ea42df..5fd6cfdb5 100644 --- a/graphdatascience/tests/integrationV2/procedure_surface/arrow/conftest.py +++ b/graphdatascience/tests/integrationV2/procedure_surface/arrow/conftest.py @@ -1,5 +1,5 @@ import os -import tempfile +from pathlib import Path from typing import Generator import pytest @@ -12,23 +12,22 @@ @pytest.fixture(scope="package") -def password_file() -> Generator[str, None, None]: +def password_dir(tmpdir_factory: pytest.TempdirFactory) -> Generator[Path, None, None]: """Create a temporary file and return its path.""" - temp_dir = tempfile.mkdtemp() - temp_file_path = os.path.join(temp_dir, "password") + tmp_dir = tmpdir_factory.mktemp("passwords") + temp_file_path = os.path.join(tmp_dir, "password") with open(temp_file_path, "w") as f: f.write("password") - yield temp_dir + yield tmp_dir - # Clean up the file and directory + # Clean up the file os.unlink(temp_file_path) - os.rmdir(temp_dir) @pytest.fixture(scope="package") -def session_container(password_file: str) -> Generator[DockerContainer, None, None]: +def session_container(password_dir: Path) -> Generator[DockerContainer, None, None]: session_image = os.getenv("GDS_SESSION_IMAGE") if session_image is None: @@ -43,7 +42,7 @@ def session_container(password_file: str) -> Generator[DockerContainer, None, No .with_env("PAGE_CACHE_SIZE", "100M") .with_exposed_ports(8491) .with_network_aliases(["gds-session"]) - .with_volume_mapping(password_file, "/passwords") + .with_volume_mapping(password_dir, "/passwords") ) with session_container as session_container: diff --git a/graphdatascience/tests/integrationV2/procedure_surface/arrow/test_celf_arrow_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/arrow/test_celf_arrow_endpoints.py new file mode 100644 index 000000000..2129fe637 --- /dev/null +++ b/graphdatascience/tests/integrationV2/procedure_surface/arrow/test_celf_arrow_endpoints.py @@ -0,0 +1,87 @@ +import json +from typing import Generator + +import pytest + +from graphdatascience import Graph +from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient +from graphdatascience.procedure_surface.arrow.celf_arrow_endpoints import CelfArrowEndpoints +from graphdatascience.tests.integrationV2.procedure_surface.arrow.graph_creation_helper import create_graph + + +@pytest.fixture +def sample_graph(arrow_client: AuthenticatedArrowClient) -> Generator[Graph, None, None]: + gdl = """ + (a: Node {id: 0}) + (b: Node {id: 1}) + (c: Node {id: 2}) + (d: Node {id: 3}) + (e: Node {id: 4}) + (a)-[:REL]->(b) + (a)-[:REL]->(c) + (b)-[:REL]->(d) + (c)-[:REL]->(e) + (d)-[:REL]->(e) + """ + + yield create_graph(arrow_client, "g", gdl) + arrow_client.do_action("v2/graph.drop", json.dumps({"graphName": "g"}).encode("utf-8")) + + +@pytest.fixture +def celf_endpoints(arrow_client: AuthenticatedArrowClient) -> Generator[CelfArrowEndpoints, None, None]: + yield CelfArrowEndpoints(arrow_client) + + +def test_celf_stats(celf_endpoints: CelfArrowEndpoints, sample_graph: Graph) -> None: + """Test CELF stats operation.""" + result = celf_endpoints.stats(G=sample_graph, seed_set_size=2) + + assert result.compute_millis >= 0 + assert result.total_spread >= 0.0 + assert result.node_count == 5 + assert isinstance(result.configuration, dict) + + +def test_celf_stream(celf_endpoints: CelfArrowEndpoints, sample_graph: Graph) -> None: + """Test CELF stream operation.""" + result_df = celf_endpoints.stream(G=sample_graph, seed_set_size=2) + + assert set(result_df.columns) == {"nodeId", "spread"} + assert len(result_df) == 5 # same as node count + assert all(result_df["spread"] >= 0) + + +def test_celf_mutate(celf_endpoints: CelfArrowEndpoints, sample_graph: Graph) -> None: + """Test CELF mutate operation.""" + result = celf_endpoints.mutate(G=sample_graph, seed_set_size=2, mutate_property="celf_spread") + + assert result.node_properties_written == 5 # All nodes get properties (influence spread values) + assert result.compute_millis >= 0 + assert result.mutate_millis >= 0 + assert result.total_spread >= 0.0 + assert result.node_count == 5 + assert isinstance(result.configuration, dict) + + +def test_celf_write_without_write_back_client(celf_endpoints: CelfArrowEndpoints, sample_graph: Graph) -> None: + """Test CELF write operation raises exception when write_back_client is None.""" + with pytest.raises(Exception, match="Write back client is not initialized"): + celf_endpoints.write( + G=sample_graph, + seed_set_size=2, + write_property="celf_spread", + ) + + +def test_celf_estimate(celf_endpoints: CelfArrowEndpoints, sample_graph: Graph) -> None: + """Test CELF memory estimation.""" + result = celf_endpoints.estimate(G=sample_graph, seed_set_size=2) + + assert result.node_count == 5 + assert result.relationship_count >= 0 + assert "Bytes" in result.required_memory or "KiB" in result.required_memory or "MiB" in result.required_memory + assert result.bytes_min > 0 + assert result.bytes_max > 0 + assert result.heap_percentage_min > 0 + assert result.heap_percentage_max > 0 diff --git a/graphdatascience/tests/integrationV2/procedure_surface/arrow/test_degree_arrow_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/arrow/test_degree_arrow_endpoints.py new file mode 100644 index 000000000..3894209b5 --- /dev/null +++ b/graphdatascience/tests/integrationV2/procedure_surface/arrow/test_degree_arrow_endpoints.py @@ -0,0 +1,74 @@ +import json +from typing import Generator + +import pytest + +from graphdatascience import Graph +from graphdatascience.arrow_client.authenticated_flight_client import AuthenticatedArrowClient +from graphdatascience.procedure_surface.arrow.degree_arrow_endpoints import DegreeArrowEndpoints +from graphdatascience.tests.integrationV2.procedure_surface.arrow.graph_creation_helper import create_graph + + +@pytest.fixture +def sample_graph(arrow_client: AuthenticatedArrowClient) -> Generator[Graph, None, None]: + gdl = """ + (a: Node {id: 0}) + (b: Node {id: 1}) + (c: Node {id: 2}) + (a)-[:REL]->(c) + (b)-[:REL]->(c) + """ + + yield create_graph(arrow_client, "g", gdl) + arrow_client.do_action("v2/graph.drop", json.dumps({"graphName": "g"}).encode("utf-8")) + + +@pytest.fixture +def degree_endpoints(arrow_client: AuthenticatedArrowClient) -> Generator[DegreeArrowEndpoints, None, None]: + yield DegreeArrowEndpoints(arrow_client) + + +def test_degree_stats(degree_endpoints: DegreeArrowEndpoints, sample_graph: Graph) -> None: + """Test Degree stats operation.""" + result = degree_endpoints.stats(G=sample_graph) + + assert result.compute_millis >= 0 + assert result.pre_processing_millis >= 0 + assert result.post_processing_millis >= 0 + assert "p50" in result.centrality_distribution + assert isinstance(result.configuration, dict) + + +def test_degree_stream(degree_endpoints: DegreeArrowEndpoints, sample_graph: Graph) -> None: + """Test Degree stream operation.""" + result_df = degree_endpoints.stream(G=sample_graph) + + assert "nodeId" in result_df.columns + assert "score" in result_df.columns + assert len(result_df) == 3 # We have 3 nodes + assert all(result_df["score"] >= 0) # Degree scores should be non-negative + + +def test_degree_mutate(degree_endpoints: DegreeArrowEndpoints, sample_graph: Graph) -> None: + """Test Degree mutate operation.""" + result = degree_endpoints.mutate(G=sample_graph, mutate_property="degree") + + assert result.node_properties_written == 3 + assert result.compute_millis >= 0 + assert result.pre_processing_millis >= 0 + assert result.post_processing_millis >= 0 + assert result.mutate_millis >= 0 + assert "p50" in result.centrality_distribution + assert isinstance(result.configuration, dict) + + +def test_degree_estimate(degree_endpoints: DegreeArrowEndpoints, sample_graph: Graph) -> None: + result = degree_endpoints.estimate(sample_graph) + + assert result.node_count == 3 + assert result.relationship_count == 2 + assert "Bytes" in result.required_memory + assert result.bytes_min > 0 + assert result.bytes_max > 0 + assert result.heap_percentage_min > 0 + assert result.heap_percentage_max > 0 diff --git a/graphdatascience/tests/integrationV2/procedure_surface/cypher/test_celf_cypher_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/cypher/test_celf_cypher_endpoints.py new file mode 100644 index 000000000..582404dcd --- /dev/null +++ b/graphdatascience/tests/integrationV2/procedure_surface/cypher/test_celf_cypher_endpoints.py @@ -0,0 +1,104 @@ +from typing import Generator + +import pytest + +from graphdatascience import Graph, QueryRunner +from graphdatascience.procedure_surface.cypher.celf_cypher_endpoints import CelfCypherEndpoints + + +@pytest.fixture +def sample_graph(query_runner: QueryRunner) -> Generator[Graph, None, None]: + create_statement = """ + CREATE + (a: Node {id: 0}), + (b: Node {id: 1}), + (c: Node {id: 2}), + (d: Node {id: 3}), + (e: Node {id: 4}), + (a)-[:REL]->(b), + (a)-[:REL]->(c), + (b)-[:REL]->(d), + (c)-[:REL]->(e), + (d)-[:REL]->(e) + """ + + query_runner.run_cypher(create_statement) + + query_runner.run_cypher(""" + MATCH (n) + OPTIONAL MATCH (n)-[r]->(m) + WITH gds.graph.project('g', n, m, {}) AS G + RETURN G + """) + + yield Graph("g", query_runner) + + query_runner.run_cypher("CALL gds.graph.drop('g')") + query_runner.run_cypher("MATCH (n) DETACH DELETE n") + + +@pytest.fixture +def celf_endpoints(query_runner: QueryRunner) -> Generator[CelfCypherEndpoints, None, None]: + yield CelfCypherEndpoints(query_runner) + + +def test_celf_stats(celf_endpoints: CelfCypherEndpoints, sample_graph: Graph) -> None: + """Test CELF stats operation.""" + result = celf_endpoints.stats(G=sample_graph, seed_set_size=2) + + assert result.compute_millis >= 0 + assert result.total_spread >= 0.0 + assert result.node_count == 5 + assert isinstance(result.configuration, dict) + assert result.configuration.get("seedSetSize") == 2 + + +def test_celf_stream(celf_endpoints: CelfCypherEndpoints, sample_graph: Graph) -> None: + """Test CELF stream operation.""" + result_df = celf_endpoints.stream(G=sample_graph, seed_set_size=2) + + assert "nodeId" in result_df.columns + assert "spread" in result_df.columns + assert len(result_df) == 2 # We requested 2 nodes in seed set + assert all(result_df["spread"] >= 0) # Spread values should be non-negative + + +def test_celf_mutate(celf_endpoints: CelfCypherEndpoints, sample_graph: Graph) -> None: + """Test CELF mutate operation.""" + result = celf_endpoints.mutate(G=sample_graph, seed_set_size=2, mutate_property="celf_spread") + + assert result.node_properties_written == 5 # All nodes get properties (influence spread values) + assert result.compute_millis >= 0 + assert result.mutate_millis >= 0 + assert result.total_spread >= 0.0 + assert result.node_count == 5 + assert isinstance(result.configuration, dict) + assert result.configuration.get("mutateProperty") == "celf_spread" + assert result.configuration.get("seedSetSize") == 2 + + +def test_celf_write(celf_endpoints: CelfCypherEndpoints, sample_graph: Graph) -> None: + """Test CELF write operation.""" + result = celf_endpoints.write(G=sample_graph, seed_set_size=2, write_property="celf_spread") + + assert result.node_properties_written == 5 # All nodes get properties (influence spread values) + assert result.compute_millis >= 0 + assert result.write_millis >= 0 + assert result.total_spread >= 0.0 + assert result.node_count == 5 + assert isinstance(result.configuration, dict) + assert result.configuration.get("writeProperty") == "celf_spread" + assert result.configuration.get("seedSetSize") == 2 + + +def test_celf_estimate(celf_endpoints: CelfCypherEndpoints, sample_graph: Graph) -> None: + """Test CELF memory estimation.""" + result = celf_endpoints.estimate(G=sample_graph, seed_set_size=2) + + assert result.node_count == 5 + assert result.relationship_count == 5 + assert "Bytes" in result.required_memory + assert result.bytes_min > 0 + assert result.bytes_max > 0 + assert result.heap_percentage_min > 0 + assert result.heap_percentage_max > 0 diff --git a/graphdatascience/tests/integrationV2/procedure_surface/cypher/test_degree_cypher_endpoints.py b/graphdatascience/tests/integrationV2/procedure_surface/cypher/test_degree_cypher_endpoints.py new file mode 100644 index 000000000..2cdd560a7 --- /dev/null +++ b/graphdatascience/tests/integrationV2/procedure_surface/cypher/test_degree_cypher_endpoints.py @@ -0,0 +1,97 @@ +from typing import Generator + +import pytest + +from graphdatascience import Graph, QueryRunner +from graphdatascience.procedure_surface.cypher.degree_cypher_endpoints import DegreeCypherEndpoints + + +@pytest.fixture +def sample_graph(query_runner: QueryRunner) -> Generator[Graph, None, None]: + create_statement = """ + CREATE + (a: Node {id: 0}), + (b: Node {id: 1}), + (c: Node {id: 2}), + (a)-[:REL]->(c), + (b)-[:REL]->(c) + """ + + query_runner.run_cypher(create_statement) + + query_runner.run_cypher(""" + MATCH (n) + OPTIONAL MATCH (n)-[r]->(m) + WITH gds.graph.project('g', n, m, {}) AS G + RETURN G + """) + + yield Graph("g", query_runner) + + query_runner.run_cypher("CALL gds.graph.drop('g')") + query_runner.run_cypher("MATCH (n) DETACH DELETE n") + + +@pytest.fixture +def degree_endpoints(query_runner: QueryRunner) -> Generator[DegreeCypherEndpoints, None, None]: + yield DegreeCypherEndpoints(query_runner) + + +def test_degree_stats(degree_endpoints: DegreeCypherEndpoints, sample_graph: Graph) -> None: + """Test Degree stats operation.""" + result = degree_endpoints.stats(G=sample_graph) + + assert result.compute_millis >= 0 + assert result.pre_processing_millis >= 0 + assert result.post_processing_millis >= 0 + assert "p50" in result.centrality_distribution + assert isinstance(result.configuration, dict) + + +def test_degree_stream(degree_endpoints: DegreeCypherEndpoints, sample_graph: Graph) -> None: + """Test Degree stream operation.""" + result = degree_endpoints.stream(G=sample_graph) + + assert len(result) == 3 # We have 3 nodes + assert "nodeId" in result.columns + assert "score" in result.columns + + +def test_degree_mutate(degree_endpoints: DegreeCypherEndpoints, sample_graph: Graph) -> None: + """Test Degree mutate operation.""" + result = degree_endpoints.mutate(G=sample_graph, mutate_property="degree") + + assert result.node_properties_written == 3 + assert result.compute_millis >= 0 + assert result.pre_processing_millis >= 0 + assert result.post_processing_millis >= 0 + assert result.mutate_millis >= 0 + assert "p50" in result.centrality_distribution + assert isinstance(result.configuration, dict) + assert result.configuration.get("mutateProperty") == "degree" + + +def test_degree_write(degree_endpoints: DegreeCypherEndpoints, sample_graph: Graph) -> None: + """Test Degree write operation.""" + result = degree_endpoints.write(G=sample_graph, write_property="degree") + + assert result.node_properties_written == 3 + assert result.compute_millis >= 0 + assert result.pre_processing_millis >= 0 + assert result.post_processing_millis >= 0 + assert result.write_millis >= 0 + assert "p50" in result.centrality_distribution + assert isinstance(result.configuration, dict) + assert result.configuration.get("writeProperty") == "degree" + + +def test_degree_estimate(degree_endpoints: DegreeCypherEndpoints, sample_graph: Graph) -> None: + result = degree_endpoints.estimate(sample_graph) + + assert result.node_count == 3 + assert result.relationship_count == 2 + assert "Bytes" in result.required_memory + assert result.bytes_min > 0 + assert result.bytes_max > 0 + assert result.heap_percentage_min > 0 + assert result.heap_percentage_max > 0 diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/conftests.py b/graphdatascience/tests/unit/procedure_surface/cypher/conftests.py new file mode 100644 index 000000000..7350fc1c3 --- /dev/null +++ b/graphdatascience/tests/unit/procedure_surface/cypher/conftests.py @@ -0,0 +1,15 @@ +from typing import Any + + +def estimate_mock_result() -> dict[str, Any]: + return { + "nodeCount": 100, + "relationshipCount": 200, + "requiredMemory": "1024 Bytes", + "bytesMin": 1024, + "bytesMax": 2048, + "heapPercentageMin": 1.0, + "heapPercentageMax": 2.0, + "treeView": "1024 KiB", + "mapView": {}, + } diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articlerank_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articlerank_cypher_endpoints.py index 96b76a4ff..c9b62b4f5 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articlerank_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articlerank_cypher_endpoints.py @@ -9,6 +9,7 @@ ) from graphdatascience.procedure_surface.cypher.articlerank_cypher_endpoints import ArticleRankCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -193,29 +194,12 @@ def test_write_basic(graph: Graph) -> None: def test_estimate_with_graph_name(graph: Graph) -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "1024 Bytes", - "bytesMin": 1024, - "bytesMax": 2048, - "heapPercentageMin": 1.0, - "heapPercentageMax": 2.0, - } - query_runner = CollectingQueryRunner( - DEFAULT_SERVER_VERSION, {"articleRank.stream.estimate": pd.DataFrame([result])} + DEFAULT_SERVER_VERSION, {"articleRank.stats.estimate": pd.DataFrame([estimate_mock_result()])} ) - ArticleRankCypherEndpoints(query_runner).estimate(graph) + result = ArticleRankCypherEndpoints(query_runner).estimate(graph) assert len(query_runner.queries) == 1 - assert "gds.articleRank.stream.estimate" in query_runner.queries[0] - - -def test_estimate_raises_value_error_when_no_arguments() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {}) - articlerank_endpoints = ArticleRankCypherEndpoints(query_runner) - - with pytest.raises(ValueError, match="Either 'G' or 'projection_config' must be provided"): - articlerank_endpoints.estimate() + assert "gds.articleRank.stats.estimate" in query_runner.queries[0] + assert result.node_count == 100 diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articulationpoints_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articulationpoints_cypher_endpoints.py index f30f469e6..1eb2a4f25 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articulationpoints_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_articulationpoints_cypher_endpoints.py @@ -11,6 +11,7 @@ ArticulationPointsCypherEndpoints, ) from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -199,7 +200,6 @@ def test_write_with_optional_params( concurrency=4, job_id="test-job", write_concurrency=2, - write_to_result_store=True, ) assert len(query_runner.queries) == 1 @@ -216,5 +216,31 @@ def test_write_with_optional_params( "concurrency": 4, "jobId": "test-job", "writeConcurrency": 2, - "writeToResultStore": True, } + + +def test_estimate_with_graph_name(graph: Graph) -> None: + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"articulationPoints.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) + + ArticulationPointsCypherEndpoints(query_runner).estimate(graph) + + assert len(query_runner.queries) == 1 + assert "gds.articulationPoints.stats.estimate" in query_runner.queries[0] + + +def test_estimate_with_projection_config(query_runner: CollectingQueryRunner) -> None: + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"articulationPoints.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) + + projection_config = { + "nodeProjection": "*", + "relationshipProjection": "*", + } + + ArticulationPointsCypherEndpoints(query_runner).estimate(projection_config, relationship_types=["REL"]) + + assert len(query_runner.queries) == 1 + assert "gds.articulationPoints.stats.estimate" in query_runner.queries[0] diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_betweenness_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_betweenness_cypher_endpoints.py index 9a5f1ef6a..f1bef917b 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_betweenness_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_betweenness_cypher_endpoints.py @@ -9,6 +9,7 @@ ) from graphdatascience.procedure_surface.cypher.betweenness_cypher_endpoints import BetweennessCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -293,7 +294,6 @@ def test_write_with_optional_params(graph: Graph) -> None: job_id="test-job", relationship_weight_property="weight", write_concurrency=4, - write_to_result_store=True, ) assert len(query_runner.queries) == 1 @@ -313,60 +313,31 @@ def test_write_with_optional_params(graph: Graph) -> None: "jobId": "test-job", "relationshipWeightProperty": "weight", "writeConcurrency": 4, - "writeToResultStore": True, } def test_estimate_with_graph_name(graph: Graph) -> None: - result = { - "requiredMemory": "1024 KiB", - "treeView": "1024 KiB", - "mapView": "1024 KiB", - "bytesMin": 1024, - "bytesMax": 2048, - "nodeCount": 100, - "relationshipCount": 200, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"betweenness.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"betweenness.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) BetweennessCypherEndpoints(query_runner).estimate(graph) assert len(query_runner.queries) == 1 assert "gds.betweenness.stats.estimate" in query_runner.queries[0] params = query_runner.params[0] - assert params["graph_name"] == "test_graph" + assert params["graphNameOrConfiguration"] == "test_graph" def test_estimate_with_projection_config() -> None: - result = { - "requiredMemory": "1024 KiB", - "treeView": "1024 KiB", - "mapView": "1024 KiB", - "bytesMin": 1024, - "bytesMax": 2048, - "nodeCount": 100, - "relationshipCount": 200, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"betweenness.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"betweenness.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) projection_config = {"nodeProjection": "Node", "relationshipProjection": "REL"} - BetweennessCypherEndpoints(query_runner).estimate(projection_config=projection_config) + BetweennessCypherEndpoints(query_runner).estimate(G=projection_config) assert len(query_runner.queries) == 1 assert "gds.betweenness.stats.estimate" in query_runner.queries[0] params = query_runner.params[0] - assert params["nodeProjection"] == "Node" - assert params["relationshipProjection"] == "REL" - - -def test_estimate_raises_value_error_when_no_arguments() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION) - - with pytest.raises(ValueError): - BetweennessCypherEndpoints(query_runner).estimate() + assert params["graphNameOrConfiguration"] == projection_config diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_celf_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_celf_cypher_endpoints.py new file mode 100644 index 000000000..bd588d81e --- /dev/null +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_celf_cypher_endpoints.py @@ -0,0 +1,213 @@ +import pandas as pd +import pytest + +from graphdatascience.graph.graph_object import Graph +from graphdatascience.procedure_surface.api.celf_endpoints import ( + CelfMutateResult, + CelfStatsResult, + CelfWriteResult, +) +from graphdatascience.procedure_surface.cypher.celf_cypher_endpoints import CelfCypherEndpoints +from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result + + +@pytest.fixture +def query_runner() -> CollectingQueryRunner: + return CollectingQueryRunner(DEFAULT_SERVER_VERSION, {}) + + +@pytest.fixture +def celf_endpoints(query_runner: CollectingQueryRunner) -> CelfCypherEndpoints: + return CelfCypherEndpoints(query_runner) + + +@pytest.fixture +def graph(query_runner: CollectingQueryRunner) -> Graph: + return Graph("test_graph", query_runner) + + +def test_mutate_basic(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 5, + "mutateMillis": 42, + "computeMillis": 20, + "totalSpread": 15.5, + "nodeCount": 100, + "configuration": {"bar": 1337}, + } + + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"influenceMaximization.celf.mutate": pd.DataFrame([result])} + ) + + result_obj = CelfCypherEndpoints(query_runner).mutate(graph, 5, "celf_influence") + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.mutate" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["seedSetSize"] == 5 + assert config["mutateProperty"] == "celf_influence" + assert "jobId" in config + + assert isinstance(result_obj, CelfMutateResult) + assert result_obj.node_properties_written == 5 + assert result_obj.mutate_millis == 42 + assert result_obj.compute_millis == 20 + assert result_obj.total_spread == 15.5 + assert result_obj.node_count == 100 + assert result_obj.configuration == {"bar": 1337} + + +def test_mutate_with_optional_params(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 3, + "mutateMillis": 35, + "computeMillis": 18, + "totalSpread": 12.3, + "nodeCount": 50, + "configuration": {"foo": 42}, + } + + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"influenceMaximization.celf.mutate": pd.DataFrame([result])} + ) + + CelfCypherEndpoints(query_runner).mutate( + graph, + 3, + "celf_influence", + propagation_probability=0.1, + monte_carlo_simulations=100, + random_seed=42, + relationship_types=["REL"], + node_labels=["Person"], + sudo=True, + log_progress=True, + username="neo4j", + concurrency=4, + job_id="test-job", + ) + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.mutate" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + assert params["config"] == { + "seedSetSize": 3, + "mutateProperty": "celf_influence", + "propagationProbability": 0.1, + "monteCarloSimulations": 100, + "randomSeed": 42, + "relationshipTypes": ["REL"], + "nodeLabels": ["Person"], + "sudo": True, + "logProgress": True, + "username": "neo4j", + "concurrency": 4, + "jobId": "test-job", + } + + +def test_stats_basic(graph: Graph) -> None: + result = { + "computeMillis": 20, + "totalSpread": 15.5, + "nodeCount": 100, + "configuration": {"bar": 1337}, + } + + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"influenceMaximization.celf.stats": pd.DataFrame([result])} + ) + + result_obj = CelfCypherEndpoints(query_runner).stats(graph, 5) + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.stats" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["seedSetSize"] == 5 + assert "jobId" in config + + assert isinstance(result_obj, CelfStatsResult) + assert result_obj.compute_millis == 20 + assert result_obj.total_spread == 15.5 + assert result_obj.node_count == 100 + assert result_obj.configuration == {"bar": 1337} + + +def test_stream_basic(celf_endpoints: CelfCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner) -> None: + celf_endpoints.stream(graph, 3) + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.stream" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["seedSetSize"] == 3 + assert "jobId" in config + + +def test_write_basic(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 5, + "writeMillis": 42, + "computeMillis": 20, + "totalSpread": 15.5, + "nodeCount": 100, + "configuration": {"bar": 1337}, + } + + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"influenceMaximization.celf.write": pd.DataFrame([result])} + ) + + result_obj = CelfCypherEndpoints(query_runner).write(graph, 5, "celf_influence") + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.write" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["seedSetSize"] == 5 + assert config["writeProperty"] == "celf_influence" + assert "jobId" in config + + assert isinstance(result_obj, CelfWriteResult) + assert result_obj.node_properties_written == 5 + assert result_obj.write_millis == 42 + assert result_obj.compute_millis == 20 + assert result_obj.total_spread == 15.5 + assert result_obj.node_count == 100 + assert result_obj.configuration == {"bar": 1337} + + +def test_estimate_with_graph_name(graph: Graph) -> None: + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"influenceMaximization.celf.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) + + CelfCypherEndpoints(query_runner).estimate(graph, 5) + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.stats.estimate" in query_runner.queries[0] + + +def test_estimate_with_projection_config(query_runner: CollectingQueryRunner) -> None: + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"influenceMaximization.celf.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) + + projection_config = { + "nodeProjection": "*", + "relationshipProjection": "*", + } + + CelfCypherEndpoints(query_runner).estimate(projection_config, 3, propagation_probability=0.1) + + assert len(query_runner.queries) == 1 + assert "gds.influenceMaximization.celf.stats.estimate" in query_runner.queries[0] diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_degree_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_degree_cypher_endpoints.py new file mode 100644 index 000000000..26bfb1338 --- /dev/null +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_degree_cypher_endpoints.py @@ -0,0 +1,233 @@ +import pandas as pd +import pytest + +from graphdatascience.graph.graph_object import Graph +from graphdatascience.procedure_surface.api.degree_endpoints import ( + DegreeMutateResult, + DegreeStatsResult, + DegreeWriteResult, +) +from graphdatascience.procedure_surface.cypher.degree_cypher_endpoints import DegreeCypherEndpoints +from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result + + +@pytest.fixture +def query_runner() -> CollectingQueryRunner: + return CollectingQueryRunner(DEFAULT_SERVER_VERSION) + + +@pytest.fixture +def degree_endpoints(query_runner: CollectingQueryRunner) -> DegreeCypherEndpoints: + return DegreeCypherEndpoints(query_runner) + + +@pytest.fixture +def graph(query_runner: CollectingQueryRunner) -> Graph: + return Graph("test_graph", query_runner) + + +def test_mutate_basic(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 5, + "mutateMillis": 42, + "centralityDistribution": {"min": 1.0, "max": 5.0, "mean": 2.5}, + "preProcessingMillis": 10, + "computeMillis": 20, + "postProcessingMillis": 12, + "configuration": {"mutateProperty": "degree"}, + } + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.mutate": pd.DataFrame([result])}) + + result_obj = DegreeCypherEndpoints(query_runner).mutate(graph, "degree") + + assert len(query_runner.queries) == 1 + assert "gds.degree.mutate" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["mutateProperty"] == "degree" + assert "jobId" in config + + assert isinstance(result_obj, DegreeMutateResult) + assert result_obj.node_properties_written == 5 + assert result_obj.mutate_millis == 42 + + +def test_mutate_with_optional_params(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 3, + "mutateMillis": 25, + "centralityDistribution": {"min": 1.0, "max": 3.0, "mean": 2.0}, + "preProcessingMillis": 5, + "computeMillis": 15, + "postProcessingMillis": 5, + "configuration": {"mutateProperty": "degree", "orientation": "UNDIRECTED"}, + } + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.mutate": pd.DataFrame([result])}) + + result_obj = DegreeCypherEndpoints(query_runner).mutate( + graph, + "degree", + orientation="UNDIRECTED", + relationship_types=["KNOWS"], + node_labels=["Person"], + concurrency=4, + log_progress=True, + relationship_weight_property="weight", + ) + + params = query_runner.params[0] + config = params["config"] + assert config["mutateProperty"] == "degree" + assert config["orientation"] == "UNDIRECTED" + assert config["relationshipTypes"] == ["KNOWS"] + assert config["nodeLabels"] == ["Person"] + assert config["concurrency"] == 4 + assert config["logProgress"] is True + assert config["relationshipWeightProperty"] == "weight" + + assert isinstance(result_obj, DegreeMutateResult) + assert result_obj.node_properties_written == 3 + assert result_obj.mutate_millis == 25 + + +def test_stats_basic(graph: Graph) -> None: + result = { + "centralityDistribution": {"min": 1.0, "max": 5.0, "mean": 2.5}, + "preProcessingMillis": 10, + "computeMillis": 20, + "postProcessingMillis": 12, + "configuration": {"orientation": "NATURAL"}, + } + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.stats": pd.DataFrame([result])}) + + result_obj = DegreeCypherEndpoints(query_runner).stats(graph) + + assert len(query_runner.queries) == 1 + assert "gds.degree.stats" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + + assert isinstance(result_obj, DegreeStatsResult) + assert result_obj.centrality_distribution == {"min": 1.0, "max": 5.0, "mean": 2.5} + assert result_obj.pre_processing_millis == 10 + assert result_obj.compute_millis == 20 + assert result_obj.post_processing_millis == 12 + + +def test_stream_basic(graph: Graph) -> None: + result_df = pd.DataFrame({"nodeId": [0, 1, 2], "score": [2.0, 3.0, 1.0]}) + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.stream": result_df}) + + result = DegreeCypherEndpoints(query_runner).stream(graph) + + assert len(query_runner.queries) == 1 + assert "gds.degree.stream" in query_runner.queries[0] + + assert result.equals(result_df) + + +def test_stream_with_optional_params(graph: Graph) -> None: + result_df = pd.DataFrame({"nodeId": [0, 1], "score": [4.0, 2.0]}) + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.stream": result_df}) + + result = DegreeCypherEndpoints(query_runner).stream( + graph, + orientation="UNDIRECTED", + relationship_types=["CONNECTED"], + node_labels=["Node"], + concurrency=2, + relationship_weight_property="strength", + ) + + params = query_runner.params[0] + config = params["config"] + assert config["orientation"] == "UNDIRECTED" + assert config["relationshipTypes"] == ["CONNECTED"] + assert config["nodeLabels"] == ["Node"] + assert config["concurrency"] == 2 + assert config["relationshipWeightProperty"] == "strength" + + assert len(result) == 2 + + +def test_write_basic(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 5, + "writeMillis": 42, + "centralityDistribution": {"min": 1.0, "max": 5.0, "mean": 2.5}, + "preProcessingMillis": 10, + "computeMillis": 20, + "postProcessingMillis": 12, + "configuration": {"writeProperty": "degree"}, + } + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.write": pd.DataFrame([result])}) + + result_obj = DegreeCypherEndpoints(query_runner).write(graph, "degree") + + assert len(query_runner.queries) == 1 + assert "gds.degree.write" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["writeProperty"] == "degree" + + assert isinstance(result_obj, DegreeWriteResult) + assert result_obj.node_properties_written == 5 + assert result_obj.write_millis == 42 + + +def test_write_with_optional_params(graph: Graph) -> None: + result = { + "nodePropertiesWritten": 3, + "writeMillis": 25, + "centralityDistribution": {"min": 1.0, "max": 3.0, "mean": 2.0}, + "preProcessingMillis": 5, + "computeMillis": 15, + "postProcessingMillis": 5, + "configuration": {"writeProperty": "degree", "orientation": "REVERSE"}, + } + + query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"degree.write": pd.DataFrame([result])}) + + result_obj = DegreeCypherEndpoints(query_runner).write( + graph, + "degree", + orientation="REVERSE", + write_concurrency=8, + sudo=True, + username="test_user", + ) + + params = query_runner.params[0] + config = params["config"] + assert config["writeProperty"] == "degree" + assert config["orientation"] == "REVERSE" + assert config["writeConcurrency"] == 8 + assert config["sudo"] is True + assert config["username"] == "test_user" + + assert isinstance(result_obj, DegreeWriteResult) + assert result_obj.node_properties_written == 3 + assert result_obj.write_millis == 25 + + +def test_estimate(graph: Graph) -> None: + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"gds.degree.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) + + estimate_result = DegreeCypherEndpoints(query_runner).estimate(graph) + + assert len(query_runner.queries) == 1 + assert "gds.degree.stats.estimate" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graphNameOrConfiguration"] == "test_graph" + assert estimate_result.node_count == 100 diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_k1coloring_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_k1coloring_cypher_endpoints.py index e78b3b714..a80b08b55 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_k1coloring_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_k1coloring_cypher_endpoints.py @@ -9,6 +9,7 @@ ) from graphdatascience.procedure_surface.cypher.k1coloring_cypher_endpoints import K1ColoringCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -311,25 +312,14 @@ def test_write_with_optional_params(graph: Graph) -> None: "concurrency": 4, "jobId": "test-job", "writeConcurrency": 2, - "writeToResultStore": True, "minCommunitySize": 2, } def test_estimate_with_graph_name(graph: Graph) -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "1 GB", - "treeView": "test", - "mapView": {"foo": "bar"}, - "bytesMin": 100, - "bytesMax": 200, - "heapPercentageMin": 10.0, - "heapPercentageMax": 20.0, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"k1coloring.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"k1coloring.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) K1ColoringCypherEndpoints(query_runner).estimate(G=graph) @@ -341,31 +331,14 @@ def test_estimate_with_graph_name(graph: Graph) -> None: def test_estimate_with_projection_config() -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "1 GB", - "treeView": "test", - "mapView": {"foo": "bar"}, - "bytesMin": 100, - "bytesMax": 200, - "heapPercentageMin": 10.0, - "heapPercentageMax": 20.0, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"k1coloring.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"k1coloring.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) - K1ColoringCypherEndpoints(query_runner).estimate(projection_config={"foo": "bar"}) + K1ColoringCypherEndpoints(query_runner).estimate(G={"foo": "bar"}) assert len(query_runner.queries) == 1 assert "gds.k1coloring.stats.estimate" in query_runner.queries[0] params = query_runner.params[0] assert params["graphNameOrConfiguration"] == {"foo": "bar"} assert params["algoConfig"] == {} - - -def test_estimate_raises_value_error_when_no_arguments() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION) - - with pytest.raises(ValueError, match="Either graph_name or projection_config must be provided."): - K1ColoringCypherEndpoints(query_runner).estimate() diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_louvain_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_louvain_cypher_endpoints.py index 84c117b16..097e26b72 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_louvain_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_louvain_cypher_endpoints.py @@ -9,6 +9,7 @@ ) from graphdatascience.procedure_surface.cypher.louvain_cypher_endpoints import LouvainCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -155,36 +156,15 @@ def test_write_basic(graph: Graph) -> None: def test_estimate_with_graph_name(graph: Graph) -> None: - result = { - "nodeCount": 6, - "relationshipCount": 6, - "requiredMemory": "500MB", - "bytesMin": 1024, - "bytesMax": 2048, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - "treeView": "exampleTree", - "mapView": {"exampleKey": "exampleValue"}, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"louvain.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"louvain.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) estimate = LouvainCypherEndpoints(query_runner).estimate(G=graph) - assert estimate.node_count == 6 - assert estimate.relationship_count == 6 - assert estimate.required_memory == "500MB" - assert estimate.bytes_min == 1024 - assert estimate.bytes_max == 2048 + assert estimate.node_count == 100 assert len(query_runner.queries) == 1 assert "gds.louvain.stats.estimate" in query_runner.queries[0] params = query_runner.params[0] assert params["graphNameOrConfiguration"] == "test_graph" - - -def test_estimate_raises_value_error_when_no_arguments() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION) - - with pytest.raises(ValueError, match="Either graph_name or projection_config must be provided."): - LouvainCypherEndpoints(query_runner).estimate() diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_pagerank_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_pagerank_cypher_endpoints.py index 17b4c97e4..71a164e33 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_pagerank_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_pagerank_cypher_endpoints.py @@ -9,6 +9,7 @@ ) from graphdatascience.procedure_surface.cypher.pagerank_cypher_endpoints import PageRankCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -350,61 +351,26 @@ def test_write_with_optional_params(graph: Graph) -> None: def test_estimate_with_graph_name(graph: Graph) -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "500MB", - "treeView": "Tree", - "mapView": {"key": "value"}, - "bytesMin": 1024, - "bytesMax": 2048, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"pageRank.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"pageRank.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) estimate = PageRankCypherEndpoints(query_runner).estimate(G=graph) assert estimate.node_count == 100 - assert estimate.relationship_count == 200 - assert estimate.required_memory == "500MB" - assert estimate.bytes_min == 1024 - assert estimate.bytes_max == 2048 assert len(query_runner.queries) == 1 assert "gds.pageRank.stats.estimate" in query_runner.queries[0] def test_estimate_with_projection_config() -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "500MB", - "treeView": "Tree", - "mapView": {"key": "value"}, - "bytesMin": 1024, - "bytesMax": 2048, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"pageRank.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"pageRank.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) - estimate = PageRankCypherEndpoints(query_runner).estimate(projection_config={"foo": "bar"}) + estimate = PageRankCypherEndpoints(query_runner).estimate(G={"foo": "bar"}) assert estimate.node_count == 100 - assert estimate.relationship_count == 200 - assert estimate.required_memory == "500MB" - assert estimate.bytes_min == 1024 - assert estimate.bytes_max == 2048 assert len(query_runner.queries) == 1 assert "gds.pageRank.stats.estimate" in query_runner.queries[0] - - -def test_estimate_raises_value_error_when_no_arguments() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION) - - with pytest.raises(ValueError, match="Either graph_name or projection_config must be provided."): - PageRankCypherEndpoints(query_runner).estimate() diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_scc_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_scc_cypher_endpoints.py index d628e1822..b1ce48dee 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_scc_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_scc_cypher_endpoints.py @@ -5,6 +5,7 @@ from graphdatascience.procedure_surface.api.scc_endpoints import SccMutateResult, SccStatsResult, SccWriteResult from graphdatascience.procedure_surface.cypher.scc_cypher_endpoints import SccCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -255,33 +256,15 @@ def test_write_with_optional_params(graph: Graph) -> None: "jobId": "test-job", "consecutiveIds": True, "writeConcurrency": 4, - "writeToResultStore": True, } def test_estimate_with_graph(graph: Graph) -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "1MB", - "treeView": "tree", - "mapView": {"foo": "bar"}, - "bytesMin": 1000, - "bytesMax": 2000, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"scc.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"scc.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) estimate = SccCypherEndpoints(query_runner).estimate(G=graph) assert estimate.node_count == 100 assert estimate.relationship_count == 200 - - -def test_estimate_without_params() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION) - - with pytest.raises(ValueError, match="Either graph_name or projection_config must be provided."): - SccCypherEndpoints(query_runner).estimate() diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_wcc_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_wcc_cypher_endpoints.py index 28fe99ebe..d6f715ffe 100644 --- a/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_wcc_cypher_endpoints.py +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_unit_wcc_cypher_endpoints.py @@ -5,6 +5,7 @@ from graphdatascience.procedure_surface.api.wcc_endpoints import WccMutateResult, WccStatsResult, WccWriteResult from graphdatascience.procedure_surface.cypher.wcc_cypher_endpoints import WccCypherEndpoints from graphdatascience.tests.unit.conftest import DEFAULT_SERVER_VERSION, CollectingQueryRunner +from graphdatascience.tests.unit.procedure_surface.cypher.conftests import estimate_mock_result @pytest.fixture @@ -323,27 +324,13 @@ def test_write_with_optional_params(graph: Graph) -> None: def test_estimate_with_graph_name(graph: Graph) -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "500MB", - "treeView": "Tree", - "mapView": {"key": "value"}, - "bytesMin": 1024, - "bytesMax": 2048, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"wcc.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"wcc.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) estimate = WccCypherEndpoints(query_runner).estimate(G=graph) assert estimate.node_count == 100 - assert estimate.relationship_count == 200 - assert estimate.required_memory == "500MB" - assert estimate.bytes_min == 1024 - assert estimate.bytes_max == 2048 assert len(query_runner.queries) == 1 assert "gds.wcc.stats.estimate" in query_runner.queries[0] @@ -352,36 +339,15 @@ def test_estimate_with_graph_name(graph: Graph) -> None: def test_estimate_with_projection_config() -> None: - result = { - "nodeCount": 100, - "relationshipCount": 200, - "requiredMemory": "500MB", - "treeView": "Tree", - "mapView": {"key": "value"}, - "bytesMin": 1024, - "bytesMax": 2048, - "heapPercentageMin": 0.1, - "heapPercentageMax": 0.2, - } - - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION, {"wcc.stats.estimate": pd.DataFrame([result])}) + query_runner = CollectingQueryRunner( + DEFAULT_SERVER_VERSION, {"wcc.stats.estimate": pd.DataFrame([estimate_mock_result()])} + ) - estimate = WccCypherEndpoints(query_runner).estimate(projection_config={"foo": "bar"}) + estimate = WccCypherEndpoints(query_runner).estimate(G={"foo": "bar"}) assert estimate.node_count == 100 - assert estimate.relationship_count == 200 - assert estimate.required_memory == "500MB" - assert estimate.bytes_min == 1024 - assert estimate.bytes_max == 2048 assert len(query_runner.queries) == 1 assert "gds.wcc.stats.estimate" in query_runner.queries[0] params = query_runner.params[0] - assert params["config"] == {"foo": "bar"} - - -def test_estimate_raises_value_error_when_no_arguments() -> None: - query_runner = CollectingQueryRunner(DEFAULT_SERVER_VERSION) - - with pytest.raises(ValueError, match="Either graph_name or projection_config must be provided."): - WccCypherEndpoints(query_runner).estimate() + assert params["graphNameOrConfiguration"] == {"foo": "bar"} diff --git a/tox.ini b/tox.ini index 60b4363e2..69bf89876 100644 --- a/tox.ini +++ b/tox.ini @@ -114,3 +114,4 @@ passenv = BUILD_NUMBER commands = pytest graphdatascience/tests/integrationV2 --include-integration-v2 + pytest graphdatascience/tests/unit/procedure_surface