Skip to content

Add retries to get_json() #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/workloads/cortex/lib/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def model_config(self, model_name):

def get_resource_status(self, resource):
key = self.resource_status_key(resource)
return self.storage.get_json(key)
return self.storage.get_json(key, num_retries=5)

def upload_resource_status_start(self, *resources):
timestamp = util.now_timestamp_rfc_3339()
Expand Down
40 changes: 33 additions & 7 deletions pkg/workloads/cortex/lib/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import msgpack
from pathlib import Path
import shutil
import time

from cortex.lib import util
from cortex.lib.exceptions import CortexException
Expand All @@ -38,7 +39,17 @@ def _get_or_create_path(self, key):
p.parent.mkdir(parents=True, exist_ok=True)
return p

def _get_path_if_exists(self, key, allow_missing=False):
def _get_path_if_exists(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
while True:
try:
return self._get_path_if_exists_single(key, allow_missing=allow_missing)
except:
if num_retries <= 0:
raise
num_retries -= 1
time.sleep(retry_delay_sec)

def _get_path_if_exists_single(self, key, allow_missing=False):
p = Path(os.path.join(self.base_dir, key))
if not p.exists() and allow_missing:
return None
Expand Down Expand Up @@ -69,8 +80,13 @@ def put_json(self, obj, key):
f = self._get_or_create_path(key)
f.write_text(json.dumps(obj))

def get_json(self, key, allow_missing=False):
f = self._get_path_if_exists(key, allow_missing)
def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
f = self._get_path_if_exists(
key,
allow_missing=allow_missing,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if f is None:
return None
return json.loads(f.read_text())
Expand All @@ -79,8 +95,13 @@ def put_msgpack(self, obj, key):
f = self._get_or_create_path(key)
f.write_bytes(msgpack.dumps(obj))

def get_msgpack(self, key, allow_missing=False):
f = self._get_path_if_exists(key, allow_missing)
def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
f = self._get_path_if_exists(
key,
allow_missing=allow_missing,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if f is None:
return None
return msgpack.loads(f.read_bytes())
Expand All @@ -89,8 +110,13 @@ def put_pyobj(self, obj, key):
f = self._get_or_create_path(key)
f.write_bytes(pickle.dumps(obj))

def get_pyobj(self, key, bucket, allow_missing=False):
f = self._get_path_if_exists(key, allow_missing)
def get_pyobj(self, key, bucket, allow_missing=False, num_retries=0, retry_delay_sec=2):
f = self._get_path_if_exists(
key,
allow_missing=allow_missing,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if f is None:
return None
return pickle.loads(f.read_bytes())
Expand Down
58 changes: 47 additions & 11 deletions pkg/workloads/cortex/lib/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pickle
import json
import msgpack
import time

from cortex.lib import util
from cortex.lib.exceptions import CortexException
Expand Down Expand Up @@ -114,18 +115,32 @@ def _get_matching_s3_keys_generator(self, prefix="", suffix=""):
def _upload_string_to_s3(self, string, key):
self.s3.put_object(Bucket=self.bucket, Key=key, Body=string)

def _read_bytes_from_s3(self, key, allow_missing=False, ext_bucket=None):
def _read_bytes_from_s3(
self, key, allow_missing=False, ext_bucket=None, num_retries=0, retry_delay_sec=2
):
while True:
try:
return self._read_bytes_from_s3_single(
key, allow_missing=allow_missing, ext_bucket=ext_bucket
)
except:
if num_retries <= 0:
raise
num_retries -= 1
time.sleep(retry_delay_sec)

def _read_bytes_from_s3_single(self, key, allow_missing=False, ext_bucket=None):
bucket = self.bucket
if ext_bucket is not None:
bucket = ext_bucket

try:
try:
byte_array = self.s3.get_object(Bucket=bucket, Key=key)["Body"].read()
except self.s3.exceptions.NoSuchKey as e:
except self.s3.exceptions.NoSuchKey:
if allow_missing:
return None
raise e
raise
except Exception as e:
raise CortexException(
'key "{}" in bucket "{}" could not be accessed; '.format(key, bucket)
Expand All @@ -140,26 +155,41 @@ def search(self, prefix="", suffix=""):
def put_json(self, obj, key):
self._upload_string_to_s3(json.dumps(obj), key)

def get_json(self, key, allow_missing=False):
obj = self._read_bytes_from_s3(key, allow_missing)
def get_json(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
obj = self._read_bytes_from_s3(
key,
allow_missing=allow_missing,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if obj is None:
return None
return json.loads(obj.decode("utf-8"))

def put_msgpack(self, obj, key):
self._upload_string_to_s3(msgpack.dumps(obj), key)

def get_msgpack(self, key, allow_missing=False):
obj = self._read_bytes_from_s3(key, allow_missing)
def get_msgpack(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
obj = self._read_bytes_from_s3(
key,
allow_missing=allow_missing,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if obj == None:
return None
return msgpack.loads(obj, raw=False)

def put_pyobj(self, obj, key):
self._upload_string_to_s3(pickle.dumps(obj), key)

def get_pyobj(self, key, allow_missing=False):
obj = self._read_bytes_from_s3(key, allow_missing)
def get_pyobj(self, key, allow_missing=False, num_retries=0, retry_delay_sec=2):
obj = self._read_bytes_from_s3(
key,
allow_missing=allow_missing,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if obj is None:
return None
return pickle.loads(obj)
Expand Down Expand Up @@ -207,9 +237,15 @@ def download_file_external(self, s3_path, local_path):
+ "it may not exist, or you may not have suffienct permissions"
) from e

def get_json_external(self, s3_path):
def get_json_external(self, s3_path, num_retries=0, retry_delay_sec=2):
bucket, key = self.deconstruct_s3_path(s3_path)
obj = self._read_bytes_from_s3(key, ext_bucket=bucket)
obj = self._read_bytes_from_s3(
key,
allow_missing=False,
ext_bucket=bucket,
num_retries=num_retries,
retry_delay_sec=retry_delay_sec,
)
if obj is None:
return None
return json.loads(obj.decode("utf-8"))