From 030efc8d0439b5656a691cd73145799cbb7029cf Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 19 May 2022 18:35:41 -0700 Subject: [PATCH] CLN: Move test_parallel to asv benchmark --- asv_bench/benchmarks/gil.py | 82 +++++++++++++++++++++++++------------ pandas/_testing/__init__.py | 50 ---------------------- 2 files changed, 56 insertions(+), 76 deletions(-) diff --git a/asv_bench/benchmarks/gil.py b/asv_bench/benchmarks/gil.py index af2efe56c2530..31654a5c75617 100644 --- a/asv_bench/benchmarks/gil.py +++ b/asv_bench/benchmarks/gil.py @@ -1,3 +1,6 @@ +from functools import wraps +import threading + import numpy as np from pandas import ( @@ -30,21 +33,57 @@ from pandas._libs import algos except ImportError: from pandas import algos -try: - from pandas._testing import test_parallel # noqa: PDF014 - have_real_test_parallel = True -except ImportError: - have_real_test_parallel = False - def test_parallel(num_threads=1): - def wrapper(fname): - return fname +from .pandas_vb_common import BaseIO # isort:skip - return wrapper +def test_parallel(num_threads=2, kwargs_list=None): + """ + Decorator to run the same function multiple times in parallel. -from .pandas_vb_common import BaseIO # isort:skip + Parameters + ---------- + num_threads : int, optional + The number of times the function is run in parallel. + kwargs_list : list of dicts, optional + The list of kwargs to update original + function kwargs on different threads. + + Notes + ----- + This decorator does not pass the return value of the decorated function. + + Original from scikit-image: + + https://github.com/scikit-image/scikit-image/pull/1519 + + """ + assert num_threads > 0 + has_kwargs_list = kwargs_list is not None + if has_kwargs_list: + assert len(kwargs_list) == num_threads + + def wrapper(func): + @wraps(func) + def inner(*args, **kwargs): + if has_kwargs_list: + update_kwargs = lambda i: dict(kwargs, **kwargs_list[i]) + else: + update_kwargs = lambda i: kwargs + threads = [] + for i in range(num_threads): + updated_kwargs = update_kwargs(i) + thread = threading.Thread(target=func, args=args, kwargs=updated_kwargs) + threads.append(thread) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + return inner + + return wrapper class ParallelGroupbyMethods: @@ -53,8 +92,7 @@ class ParallelGroupbyMethods: param_names = ["threads", "method"] def setup(self, threads, method): - if not have_real_test_parallel: - raise NotImplementedError + N = 10**6 ngroups = 10**3 df = DataFrame( @@ -86,8 +124,7 @@ class ParallelGroups: param_names = ["threads"] def setup(self, threads): - if not have_real_test_parallel: - raise NotImplementedError + size = 2**22 ngroups = 10**3 data = Series(np.random.randint(0, ngroups, size=size)) @@ -108,8 +145,7 @@ class ParallelTake1D: param_names = ["dtype"] def setup(self, dtype): - if not have_real_test_parallel: - raise NotImplementedError + N = 10**6 df = DataFrame({"col": np.arange(N, dtype=dtype)}) indexer = np.arange(100, len(df) - 100) @@ -131,8 +167,7 @@ class ParallelKth: repeat = 5 def setup(self): - if not have_real_test_parallel: - raise NotImplementedError + N = 10**7 k = 5 * 10**5 kwargs_list = [{"arr": np.random.randn(N)}, {"arr": np.random.randn(N)}] @@ -149,8 +184,7 @@ def time_kth_smallest(self): class ParallelDatetimeFields: def setup(self): - if not have_real_test_parallel: - raise NotImplementedError + N = 10**6 self.dti = date_range("1900-01-01", periods=N, freq="T") self.period = self.dti.to_period("D") @@ -204,8 +238,7 @@ class ParallelRolling: param_names = ["method"] def setup(self, method): - if not have_real_test_parallel: - raise NotImplementedError + win = 100 arr = np.random.rand(100000) if hasattr(DataFrame, "rolling"): @@ -248,8 +281,7 @@ class ParallelReadCSV(BaseIO): param_names = ["dtype"] def setup(self, dtype): - if not have_real_test_parallel: - raise NotImplementedError + rows = 10000 cols = 50 data = { @@ -284,8 +316,6 @@ class ParallelFactorize: param_names = ["threads"] def setup(self, threads): - if not have_real_test_parallel: - raise NotImplementedError strings = tm.makeStringIndex(100000) diff --git a/pandas/_testing/__init__.py b/pandas/_testing/__init__.py index 603c2f081a31a..53e003e2ed7dd 100644 --- a/pandas/_testing/__init__.py +++ b/pandas/_testing/__init__.py @@ -3,7 +3,6 @@ import collections from datetime import datetime from decimal import Decimal -from functools import wraps import operator import os import re @@ -749,55 +748,6 @@ def makeMissingDataframe(density=0.9, random_state=None): return df -def test_parallel(num_threads=2, kwargs_list=None): - """ - Decorator to run the same function multiple times in parallel. - - Parameters - ---------- - num_threads : int, optional - The number of times the function is run in parallel. - kwargs_list : list of dicts, optional - The list of kwargs to update original - function kwargs on different threads. - - Notes - ----- - This decorator does not pass the return value of the decorated function. - - Original from scikit-image: - - https://github.com/scikit-image/scikit-image/pull/1519 - - """ - assert num_threads > 0 - has_kwargs_list = kwargs_list is not None - if has_kwargs_list: - assert len(kwargs_list) == num_threads - import threading - - def wrapper(func): - @wraps(func) - def inner(*args, **kwargs): - if has_kwargs_list: - update_kwargs = lambda i: dict(kwargs, **kwargs_list[i]) - else: - update_kwargs = lambda i: kwargs - threads = [] - for i in range(num_threads): - updated_kwargs = update_kwargs(i) - thread = threading.Thread(target=func, args=args, kwargs=updated_kwargs) - threads.append(thread) - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - return inner - - return wrapper - - class SubclassedSeries(Series): _metadata = ["testattr", "name"]