Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/ray/_private/runtime_env/agent/runtime_env_consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@
RAY_WORKING_DIR = os.environ.get("RAY_WORKING_DIR", "RAY_WORKING_DIR")

RAY_JOB_DIR = os.environ.get("RAY_JOB_DIR", "RAY_JOB_DIR")

# ZDFS Config
ZDFS_LOG_PATH = "/tmp/log_zdfs.LOG"
ZDFS_CONF_PATH = "/tmp/zdfs_client_conf.json"
ZDFS_THREAD_NUM = 4
ZDFS_BUFFER_LEN = 20971520 # 20MB
84 changes: 83 additions & 1 deletion python/ray/_private/runtime_env/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ def get_protocols(cls):
# File storage path, assumes everything packed in one zip file.
"file",
"http",
# ZDFS path, used by ant-group internally.
"dfs",
# HDFS path
"hdfs",
}

@classmethod
def get_remote_protocols(cls):
return {"https", "s3", "gs", "file", "http"}
return {"https", "s3", "gs", "file", "http", "dfs", "hdfs"}

@classmethod
def download_remote_uri(cls, protocol: str, source_uri: str, dest_file: str):
Expand Down Expand Up @@ -78,6 +82,84 @@ def open_file(uri, mode, *, transport_params=None):
"to fetch URIs in Google Cloud Storage bucket."
+ cls._MISSING_DEPENDENCIES_WARNING
)

elif protocol == "dfs":
try:
try:
import zdfs
from zdfs import zdfs_util
from urllib.parse import urlparse
import ray._private.runtime_env.agent.runtime_env_consts as runtime_env_consts
except ImportError:
raise ImportError(
"You must `pip install zdfs-dfs` "
"to fetch URIs in ZDFS. " + cls._MISSING_DEPENDENCIES_WARNING
)
parsed = urlparse(source_uri)
cluster_info = f"{parsed.scheme}://{parsed.hostname}:{parsed.port}"
options = zdfs.FileSystemOptions()
options.log_path = runtime_env_consts.ZDFS_LOG_PATH
options.conf_path = runtime_env_consts.ZDFS_CONF_PATH
pangu_options = zdfs.PanguOptions()
pangu_options.io_thread_num = runtime_env_consts.ZDFS_THREAD_NUM
pangu_options.callback_thread_num = runtime_env_consts.ZDFS_THREAD_NUM
pangu_options.callback_in_iothread = True
zdfs.PanguFileSystem.SetOptions(pangu_options)
fs = zdfs.PanguFileSystem.Create(cluster_info, options)
ec = zdfs_util.get(
fs,
parsed.path,
dest_file,
buflen=runtime_env_consts.ZDFS_BUFFER_LEN,
overwrite=False,
)
if ec != 0:
raise Exception(
f"ZDFS client failed to download file, error code:{ec}"
)
return
except Exception:
import shutil
import subprocess

def check_hadoop_client():
if shutil.which("hdfs"):
return ["hdfs", "dfs"]
elif shutil.which("hadoop"):
return ["hadoop", "fs"]
else:
return None

hadoop_command = check_hadoop_client()
if hadoop_command:
result = subprocess.run(
[*hadoop_command, "-get", source_uri, dest_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
raise Exception(
f"Hadoop client failed to download file, return code:{result.returncode}"
)
else:
raise Exception("Hadoop client not found.")
return

elif protocol == "hdfs":
try:
from smart_open import open as open_file
except ImportError:
raise ImportError(
"You must `pip install smart_open[hdfs]` "
f"to fetch {protocol.upper()} URIs. "
+ cls._MISSING_DEPENDENCIES_WARNING
)
tp = {
"client": "pyarrow",
"hdfs_driver": "libhdfs",
}

else:
try:
from smart_open import open as open_file
Expand Down
Loading