Skip to content

Commit 76c27a9

Browse files
authored
Merge branch 'main' into main_chelsealin_stroffset
2 parents 2fc31e0 + 991bb0a commit 76c27a9

File tree

9 files changed

+1261
-1177
lines changed

9 files changed

+1261
-1177
lines changed

bigframes/functions/_remote_function_client.py

+476
Large diffs are not rendered by default.

bigframes/functions/_remote_function_session.py

+546
Large diffs are not rendered by default.

bigframes/functions/_utils.py

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import hashlib
17+
import inspect
18+
from typing import cast, List, NamedTuple, Optional, Sequence, Set
19+
20+
import cloudpickle
21+
import google.api_core.exceptions
22+
from google.cloud import bigquery, functions_v2
23+
import ibis.expr.datatypes.core
24+
import numpy
25+
import pandas
26+
import pyarrow
27+
28+
import bigframes.core.compile.ibis_types
29+
30+
# Naming convention for the remote function artifacts
31+
_BIGFRAMES_REMOTE_FUNCTION_PREFIX = "bigframes"
32+
_BQ_FUNCTION_NAME_SEPERATOR = "_"
33+
_GCF_FUNCTION_NAME_SEPERATOR = "-"
34+
35+
# Protocol version 4 is available in python version 3.4 and above
36+
# https://docs.python.org/3/library/pickle.html#data-stream-format
37+
_pickle_protocol_version = 4
38+
39+
40+
def get_remote_function_locations(bq_location):
41+
"""Get BQ location and cloud functions region given a BQ client."""
42+
# TODO(shobs, b/274647164): Find the best way to determine default location.
43+
# For now let's assume that if no BQ location is set in the client then it
44+
# defaults to US multi region
45+
bq_location = bq_location.lower() if bq_location else "us"
46+
47+
# Cloud function should be in the same region as the bigquery remote function
48+
cloud_function_region = bq_location
49+
50+
# BigQuery has multi region but cloud functions does not.
51+
# Any region in the multi region that supports cloud functions should work
52+
# https://cloud.google.com/functions/docs/locations
53+
if bq_location == "us":
54+
cloud_function_region = "us-central1"
55+
elif bq_location == "eu":
56+
cloud_function_region = "europe-west1"
57+
58+
return bq_location, cloud_function_region
59+
60+
61+
def _get_updated_package_requirements(
62+
package_requirements=None, is_row_processor=False
63+
):
64+
requirements = [f"cloudpickle=={cloudpickle.__version__}"]
65+
if is_row_processor:
66+
# bigframes remote function will send an entire row of data as json,
67+
# which would be converted to a pandas series and processed
68+
# Ensure numpy versions match to avoid unpickling problems. See
69+
# internal issue b/347934471.
70+
requirements.append(f"numpy=={numpy.__version__}")
71+
requirements.append(f"pandas=={pandas.__version__}")
72+
requirements.append(f"pyarrow=={pyarrow.__version__}")
73+
74+
if package_requirements:
75+
requirements.extend(package_requirements)
76+
77+
requirements = sorted(requirements)
78+
return requirements
79+
80+
81+
def _clean_up_by_session_id(
82+
bqclient: bigquery.Client,
83+
gcfclient: functions_v2.FunctionServiceClient,
84+
dataset: bigquery.DatasetReference,
85+
session_id: str,
86+
):
87+
"""Delete remote function artifacts for a session id, where the session id
88+
was not necessarily created in the current runtime. This is useful if the
89+
user worked with a BigQuery DataFrames session previously and remembered the
90+
session id, and now wants to clean up its temporary resources at a later
91+
point in time.
92+
"""
93+
94+
# First clean up the BQ remote functions and then the underlying
95+
# cloud functions, so that at no point we are left with a remote function
96+
# that is pointing to a cloud function that does not exist
97+
98+
endpoints_to_be_deleted: Set[str] = set()
99+
match_prefix = "".join(
100+
[
101+
_BIGFRAMES_REMOTE_FUNCTION_PREFIX,
102+
_BQ_FUNCTION_NAME_SEPERATOR,
103+
session_id,
104+
_BQ_FUNCTION_NAME_SEPERATOR,
105+
]
106+
)
107+
for routine in bqclient.list_routines(dataset):
108+
routine = cast(bigquery.Routine, routine)
109+
110+
# skip past the routines not belonging to the given session id, or
111+
# non-remote-function routines
112+
if (
113+
routine.type_ != bigquery.RoutineType.SCALAR_FUNCTION
114+
or not cast(str, routine.routine_id).startswith(match_prefix)
115+
or not routine.remote_function_options
116+
or not routine.remote_function_options.endpoint
117+
):
118+
continue
119+
120+
# Let's forgive the edge case possibility that the BQ remote function
121+
# may have been deleted at the same time directly by the user
122+
bqclient.delete_routine(routine, not_found_ok=True)
123+
endpoints_to_be_deleted.add(routine.remote_function_options.endpoint)
124+
125+
# Now clean up the cloud functions
126+
bq_location = bqclient.get_dataset(dataset).location
127+
bq_location, gcf_location = get_remote_function_locations(bq_location)
128+
parent_path = gcfclient.common_location_path(
129+
project=dataset.project, location=gcf_location
130+
)
131+
for gcf in gcfclient.list_functions(parent=parent_path):
132+
# skip past the cloud functions not attached to any BQ remote function
133+
# belonging to the given session id
134+
if gcf.service_config.uri not in endpoints_to_be_deleted:
135+
continue
136+
137+
# Let's forgive the edge case possibility that the cloud function
138+
# may have been deleted at the same time directly by the user
139+
try:
140+
gcfclient.delete_function(name=gcf.name)
141+
except google.api_core.exceptions.NotFound:
142+
pass
143+
144+
145+
def _get_hash(def_, package_requirements=None):
146+
"Get hash (32 digits alphanumeric) of a function."
147+
# There is a known cell-id sensitivity of the cloudpickle serialization in
148+
# notebooks https://github.com/cloudpipe/cloudpickle/issues/538. Because of
149+
# this, if a cell contains a udf decorated with @remote_function, a unique
150+
# cloudpickle code is generated every time the cell is run, creating new
151+
# cloud artifacts every time. This is slow and wasteful.
152+
# A workaround of the same can be achieved by replacing the filename in the
153+
# code object to a static value
154+
# https://github.com/cloudpipe/cloudpickle/issues/120#issuecomment-338510661.
155+
#
156+
# To respect the user code/environment let's make this modification on a
157+
# copy of the udf, not on the original udf itself.
158+
def_copy = cloudpickle.loads(cloudpickle.dumps(def_))
159+
def_copy.__code__ = def_copy.__code__.replace(
160+
co_filename="bigframes_place_holder_filename"
161+
)
162+
163+
def_repr = cloudpickle.dumps(def_copy, protocol=_pickle_protocol_version)
164+
if package_requirements:
165+
for p in sorted(package_requirements):
166+
def_repr += p.encode()
167+
return hashlib.md5(def_repr).hexdigest()
168+
169+
170+
def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str:
171+
return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}"
172+
173+
174+
def get_cloud_function_name(function_hash, session_id=None, uniq_suffix=None):
175+
"Get a name for the cloud function for the given user defined function."
176+
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX]
177+
if session_id:
178+
parts.append(session_id)
179+
parts.append(function_hash)
180+
if uniq_suffix:
181+
parts.append(uniq_suffix)
182+
return _GCF_FUNCTION_NAME_SEPERATOR.join(parts)
183+
184+
185+
def get_remote_function_name(function_hash, session_id, uniq_suffix=None):
186+
"Get a name for the BQ remote function for the given user defined function."
187+
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
188+
if uniq_suffix:
189+
parts.append(uniq_suffix)
190+
return _BQ_FUNCTION_NAME_SEPERATOR.join(parts)
191+
192+
193+
class IbisSignature(NamedTuple):
194+
parameter_names: List[str]
195+
input_types: List[Optional[ibis.expr.datatypes.core.DataType]]
196+
output_type: ibis.expr.datatypes.core.DataType
197+
198+
199+
def ibis_signature_from_python_signature(
200+
signature: inspect.Signature,
201+
input_types: Sequence[type],
202+
output_type: type,
203+
) -> IbisSignature:
204+
205+
return IbisSignature(
206+
parameter_names=list(signature.parameters.keys()),
207+
input_types=[
208+
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
209+
for t in input_types
210+
],
211+
output_type=bigframes.core.compile.ibis_types.ibis_type_from_python_type(
212+
output_type
213+
),
214+
)

0 commit comments

Comments
 (0)