From ddab36dd668de22a3d91073ebc8a726baca103e0 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 8 Mar 2016 22:17:21 -0500 Subject: [PATCH 1/3] Split filesystem from s3 into s3fs --- .travis.yml | 1 + distributed/s3.py | 409 +-------------------------------- distributed/s3fs.py | 404 ++++++++++++++++++++++++++++++++ distributed/tests/test_s3.py | 109 +-------- distributed/tests/test_s3fs.py | 137 +++++++++++ 5 files changed, 546 insertions(+), 514 deletions(-) create mode 100644 distributed/s3fs.py create mode 100644 distributed/tests/test_s3fs.py diff --git a/.travis.yml b/.travis.yml index f230712915f..f1c49c4468b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,6 +50,7 @@ install: - source activate test-environment - conda install pytest coverage tornado toolz dill futures dask ipywidgets psutil bokeh requests - pip install git+https://github.com/dask/dask.git --upgrade + - pip install moto # Install distributed - python setup.py install diff --git a/distributed/s3.py b/distributed/s3.py index 2d320436241..0df08241855 100644 --- a/distributed/s3.py +++ b/distributed/s3.py @@ -1,14 +1,8 @@ from __future__ import print_function, division, absolute_import import logging -import threading -import re -import json import io -import sys -import boto3 -from botocore.exceptions import ClientError from tornado import gen from dask.imperative import Value, do @@ -18,410 +12,9 @@ from .utils import read_block, seek_delimiter, ensure_bytes, ignoring, sync from .executor import default_executor, ensure_default_get from .compatibility import unicode, gzip_decompress - +from .s3fs import S3FileSystem logger = logging.getLogger(__name__) -logging.getLogger('boto3').setLevel(logging.WARNING) -logging.getLogger('botocore').setLevel(logging.WARNING) - - -DEFAULT_PAGE_LENGTH = 1000 - -_conn = dict() - - -get_s3_lock = threading.Lock() - -def split_path(path): - """ - Normalise S3 path string into bucket and key. - - Parameters - ---------- - path : string - Input path, like `s3://mybucket/path/to/file` - - Examples - -------- - >>> split_path("s3://mybucket/path/to/file") - ("mybucket", "path/to/file") - """ - if path.startswith('s3://'): - path = path[5:] - if '/' not in path: - return path, "" - else: - return path.split('/', 1) - - -class S3FileSystem(object): - """ - Access S3 data as if it were a file system. - """ - _conn = {} - - def __init__(self, anon=True, key=None, secret=None, **kwargs): - """ - Create connection object to S3 - - Will use configured key/secret (typically in ~/.aws, see the - boto3 documentation) unless specified - - Parameters - ---------- - anon : bool (True) - whether to use anonymous connection (public buckets only) - key : string (None) - if not anonymouns, use this key, if specified - secret : string (None) - if not anonymous, use this password, if specified - kwargs : other parameters for boto3 session - """ - self.anon = anon - self.key = key - self.secret = secret - self.kwargs = kwargs - self.connect(anon, key, secret, kwargs) - self.dirs = {} - self.s3 = self.connect(anon, key, secret, kwargs) - - def connect(self, anon, key, secret, kwargs): - tok = tokenize(anon, key, secret, kwargs) - if tok not in self._conn: - logger.debug("Open S3 connection. Anonymous: %s", - self.anon) - if self.anon: - from botocore import UNSIGNED - from botocore.client import Config - s3 = boto3.Session().client('s3', - config=Config(signature_version=UNSIGNED)) - else: - s3 = boto3.Session(self.key, self.secret, - **self.kwargs).client('s3') - self._conn[tok] = s3 - return self._conn[tok] - - def __getstate__(self): - d = self.__dict__.copy() - del d['s3'] - logger.debug("Serialize with state: %s", d) - return d - - def __setstate__(self, state): - self.__dict__.update(state) - self.s3 = self.connect(self.anon, self.key, self.secret, self.kwargs) - - def open(self, path, mode='rb', block_size=4*1024**2): - """ Open a file for reading or writing - - Parameters - ---------- - path: string - Path of file on S3 - mode: string - One of 'rb' or 'wb' - block_size: int - Size of data-node blocks if reading - """ - if 'b' not in mode: - raise NotImplementedError("Text mode not supported, use mode='%s'" - " and manage bytes" % (mode[0] + 'b')) - return S3File(self, path, mode, block_size=block_size) - - def _ls(self, path, refresh=False): - """ List files below path - - Parameters - ---------- - path : string/bytes - location at which to list files - detail : bool (=True) - if True, each list item is a dict of file properties; - otherwise, returns list of filenames - refresh : bool (=False) - if False, look in local cache for file details first - """ - path = path.lstrip('s3://').lstrip('/') - bucket, key = split_path(path) - if bucket not in self.dirs or refresh: - if bucket == '': - # list of buckets - if self.anon: - # cannot list buckets if not logged in - return [] - files = self.s3.list_buckets()['Buckets'] - for f in files: - f['Key'] = f['Name'] - f['Size'] = 0 - del f['Name'] - else: - files = self.s3.list_objects(Bucket=bucket).get('Contents', []) - for f in files: - f['Key'] = "/".join([bucket, f['Key']]) - self.dirs[bucket] = list(sorted(files, key=lambda x: x['Key'])) - files = self.dirs[bucket] - return files - - def ls(self, path, detail=False): - path = path.lstrip('s3://').rstrip('/') - try: - files = self._ls(path) - except ClientError: - files = [] - if path: - pattern = re.compile(path + '/[^/]*.$') - files = [f for f in files if pattern.match(f['Key']) is not None] - if not files: - try: - files = [self.info(path)] - except (OSError, IOError): - files = [] - if detail: - return files - else: - return [f['Key'] for f in files] - - def info(self, path): - if path.startswith('s3://'): - path = path[len('s3://'):] - path = path.rstrip('/') - files = self._ls(path) - files = [f for f in files if f['Key'].rstrip('/') == path] - if len(files) == 1: - return files[0] - else: - raise IOError("File not found: %s" %path) - - def walk(self, path): - return [f['Key'] for f in self._ls(path) if f['Key'].rstrip('/' - ).startswith(path.rstrip('/') + '/')] - - def glob(self, path): - """ - Find files by glob-matching. - - Note that the bucket part of the path must not contain a "*" - """ - path0 = path - path = path.lstrip('s3://').lstrip('/') - bucket, key = split_path(path) - if "*" in bucket: - raise ValueError('Bucket cannot contain a "*"') - if '*' not in path: - path = path.rstrip('/') + '/*' - if '/' in path[:path.index('*')]: - ind = path[:path.index('*')].rindex('/') - root = path[:ind+1] - else: - root = '/' - allfiles = self.walk(root) - pattern = re.compile("^" + path.replace('//', '/') - .rstrip('/') - .replace('*', '[^/]*') - .replace('?', '.') + "$") - out = [f for f in allfiles if re.match(pattern, - f.replace('//', '/').rstrip('/'))] - if not out: - out = self.ls(path0) - return out - - def du(self, path, total=False, deep=False): - if deep: - files = self.walk(path) - files = [self.info(f) for f in files] - else: - files = self.ls(path, detail=True) - if total: - return sum(f.get('Size', 0) for f in files) - else: - return {p['Key']: p['Size'] for p in files} - - def exists(self, path): - return bool(self.ls(path)) - - def cat(self, path): - with self.open(path, 'rb') as f: - return f.read() - - def tail(self, path, size=1024): - """ Return last bytes of file """ - length = self.info(path)['Size'] - if size > length: - return self.cat(path) - with self.open(path, 'rb') as f: - f.seek(length - size) - return f.read(size) - - def head(self, path, size=1024): - """ Return first bytes of file """ - with self.open(path, 'rb', block_size=size) as f: - return f.read(size) - - def read_block(self, fn, offset, length, delimiter=None): - """ Read a block of bytes from an S3 file - - Starting at ``offset`` of the file, read ``length`` bytes. If - ``delimiter`` is set then we ensure that the read starts and stops at - delimiter boundaries that follow the locations ``offset`` and ``offset - + length``. If ``offset`` is zero then we start at zero. The - bytestring returned WILL include the end delimiter string. - - If offset+length is beyond the eof, reads to eof. - - Parameters - ---------- - fn: string - Path to filename on S3 - offset: int - Byte offset to start read - length: int - Number of bytes to read - delimiter: bytes (optional) - Ensure reading starts and stops at delimiter bytestring - - Examples - -------- - >>> s3.read_block('data/file.csv', 0, 13) # doctest: +SKIP - b'Alice, 100\\nBo' - >>> s3.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP - b'Alice, 100\\nBob, 200\\n' - - Use ``length=None`` to read to the end of the file. - >>> s3.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP - b'Alice, 100\\nBob, 200\\nCharlie, 300' - - See Also - -------- - distributed.utils.read_block - """ - with self.open(fn, 'rb') as f: - size = f.info()['Size'] - if length is None: - length = size - if offset + length > size: - length = size - offset - bytes = read_block(f, offset, length, delimiter) - return bytes - - -class S3File(object): - """ - Cached read-only interface to a key in S3, behaving like a seekable file. - - Optimized for a single continguous block. - """ - - def __init__(self, s3, path, mode='rb', block_size=4*2**20): - """ - Open S3 as a file. Data is only loaded and cached on demand. - - Parameters - ---------- - s3 : boto3 connection - bucket : string - S3 bucket to access - key : string - S3 key to access - blocksize : int - read-ahead size for finding delimiters - """ - self.mode = mode - if mode != 'rb': - raise NotImplementedError("File mode must be 'rb', not %s" % mode) - self.path = path - bucket, key = split_path(path) - self.s3 = s3 - self.size = self.info()['Size'] - self.bucket = bucket - self.key = key - self.blocksize = block_size - self.cache = b"" - self.loc = 0 - self.start = None - self.end = None - self.closed = False - - def info(self): - return self.s3.info(self.path) - - def tell(self): - return self.loc - - def seek(self, loc, whence=0): - if whence == 0: - self.loc = loc - elif whence == 1: - self.loc += loc - elif whence == 2: - self.loc = self.size + loc - else: - raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence) - if self.loc < 0: - self.loc = 0 - return self.loc - - def _fetch(self, start, end): - try: - if self.start is None and self.end is None: - # First read - self.start = start - self.end = end + self.blocksize - self.cache = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, - Range='bytes=%i-%i' % (start, self.end - 1) - )['Body'].read() - if start < self.start: - new = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, - Range='bytes=%i-%i' % (start, self.start - 1) - )['Body'].read() - self.start = start - self.cache = new + self.cache - if end > self.end: - if end > self.size: - return - new = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, - Range='bytes=%i-%i' % (self.end, end + self.blocksize - 1) - )['Body'].read() - self.end = end + self.blocksize - self.cache = self.cache + new - except ClientError: - self.start = min([start, self.start or self.size]) - self.end = max(end, self.end or self.size) - - def read(self, length=-1): - """ - Return data from cache, or fetch pieces as necessary - """ - if self.mode != 'rb': - raise ValueError('File not in read mode') - if length < 0: - length = self.size - if self.closed: - raise ValueError('I/O operation on closed file.') - self._fetch(self.loc, self.loc + length) - out = self.cache[self.loc - self.start: - self.loc - self.start + length] - self.loc += len(out) - return out - - def flush(self): - pass - - def close(self): - self.flush() - self.cache = None - self.closed = True - - def __str__(self): - return "" % (self.bucket, self.key) - - __repr__ = __str__ - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - def read_bytes(fn, executor=None, s3=None, lazy=True, delimiter=None, not_zero=False, blocksize=2**27, **s3pars): diff --git a/distributed/s3fs.py b/distributed/s3fs.py new file mode 100644 index 00000000000..1f42c88d9c1 --- /dev/null +++ b/distributed/s3fs.py @@ -0,0 +1,404 @@ +# -*- coding: utf-8 -*- +import logging +import re + +import boto3 +from botocore.exceptions import ClientError + +from dask.base import tokenize +from .utils import read_block + +logger = logging.getLogger(__name__) + +logging.getLogger('boto3').setLevel(logging.WARNING) +logging.getLogger('botocore').setLevel(logging.WARNING) + +def split_path(path): + """ + Normalise S3 path string into bucket and key. + + Parameters + ---------- + path : string + Input path, like `s3://mybucket/path/to/file` + + Examples + -------- + >>> split_path("s3://mybucket/path/to/file") + ("mybucket", "path/to/file") + """ + if path.startswith('s3://'): + path = path[5:] + if '/' not in path: + return path, "" + else: + return path.split('/', 1) + + +class S3FileSystem(object): + """ + Access S3 data as if it were a file system. + """ + _conn = {} + + def __init__(self, anon=True, key=None, secret=None, **kwargs): + """ + Create connection object to S3 + + Will use configured key/secret (typically in ~/.aws, see the + boto3 documentation) unless specified + + Parameters + ---------- + anon : bool (True) + whether to use anonymous connection (public buckets only) + key : string (None) + if not anonymouns, use this key, if specified + secret : string (None) + if not anonymous, use this password, if specified + kwargs : other parameters for boto3 session + """ + self.anon = anon + self.key = key + self.secret = secret + self.kwargs = kwargs + self.connect(anon, key, secret, kwargs) + self.dirs = {} + self.s3 = self.connect(anon, key, secret, kwargs) + + def connect(self, anon, key, secret, kwargs): + tok = tokenize(anon, key, secret, kwargs) + if tok not in self._conn: + logger.debug("Open S3 connection. Anonymous: %s", + self.anon) + if self.anon: + from botocore import UNSIGNED + from botocore.client import Config + s3 = boto3.Session().client('s3', + config=Config(signature_version=UNSIGNED)) + else: + s3 = boto3.Session(self.key, self.secret, + **self.kwargs).client('s3') + self._conn[tok] = s3 + return self._conn[tok] + + def __getstate__(self): + d = self.__dict__.copy() + del d['s3'] + logger.debug("Serialize with state: %s", d) + return d + + def __setstate__(self, state): + self.__dict__.update(state) + self.s3 = self.connect(self.anon, self.key, self.secret, self.kwargs) + + def open(self, path, mode='rb', block_size=4*1024**2): + """ Open a file for reading or writing + + Parameters + ---------- + path: string + Path of file on S3 + mode: string + One of 'rb' or 'wb' + block_size: int + Size of data-node blocks if reading + """ + if 'b' not in mode: + raise NotImplementedError("Text mode not supported, use mode='%s'" + " and manage bytes" % (mode[0] + 'b')) + return S3File(self, path, mode, block_size=block_size) + + def _ls(self, path, refresh=False): + """ List files below path + + Parameters + ---------- + path : string/bytes + location at which to list files + detail : bool (=True) + if True, each list item is a dict of file properties; + otherwise, returns list of filenames + refresh : bool (=False) + if False, look in local cache for file details first + """ + path = path.lstrip('s3://').lstrip('/') + bucket, key = split_path(path) + if bucket not in self.dirs or refresh: + if bucket == '': + # list of buckets + if self.anon: + # cannot list buckets if not logged in + return [] + files = self.s3.list_buckets()['Buckets'] + for f in files: + f['Key'] = f['Name'] + f['Size'] = 0 + del f['Name'] + else: + files = self.s3.list_objects(Bucket=bucket).get('Contents', []) + for f in files: + f['Key'] = "/".join([bucket, f['Key']]) + self.dirs[bucket] = list(sorted(files, key=lambda x: x['Key'])) + files = self.dirs[bucket] + return files + + def ls(self, path, detail=False): + path = path.lstrip('s3://').rstrip('/') + try: + files = self._ls(path) + except ClientError: + files = [] + if path: + pattern = re.compile(path + '/[^/]*.$') + files = [f for f in files if pattern.match(f['Key']) is not None] + if not files: + try: + files = [self.info(path)] + except (OSError, IOError): + files = [] + if detail: + return files + else: + return [f['Key'] for f in files] + + def info(self, path): + if path.startswith('s3://'): + path = path[len('s3://'):] + path = path.rstrip('/') + files = self._ls(path) + files = [f for f in files if f['Key'].rstrip('/') == path] + if len(files) == 1: + return files[0] + else: + raise IOError("File not found: %s" %path) + + def walk(self, path): + return [f['Key'] for f in self._ls(path) if f['Key'].rstrip('/' + ).startswith(path.rstrip('/') + '/')] + + def glob(self, path): + """ + Find files by glob-matching. + + Note that the bucket part of the path must not contain a "*" + """ + path0 = path + path = path.lstrip('s3://').lstrip('/') + bucket, key = split_path(path) + if "*" in bucket: + raise ValueError('Bucket cannot contain a "*"') + if '*' not in path: + path = path.rstrip('/') + '/*' + if '/' in path[:path.index('*')]: + ind = path[:path.index('*')].rindex('/') + root = path[:ind+1] + else: + root = '/' + allfiles = self.walk(root) + pattern = re.compile("^" + path.replace('//', '/') + .rstrip('/') + .replace('*', '[^/]*') + .replace('?', '.') + "$") + out = [f for f in allfiles if re.match(pattern, + f.replace('//', '/').rstrip('/'))] + if not out: + out = self.ls(path0) + return out + + def du(self, path, total=False, deep=False): + if deep: + files = self.walk(path) + files = [self.info(f) for f in files] + else: + files = self.ls(path, detail=True) + if total: + return sum(f.get('Size', 0) for f in files) + else: + return {p['Key']: p['Size'] for p in files} + + def exists(self, path): + return bool(self.ls(path)) + + def cat(self, path): + with self.open(path, 'rb') as f: + return f.read() + + def tail(self, path, size=1024): + """ Return last bytes of file """ + length = self.info(path)['Size'] + if size > length: + return self.cat(path) + with self.open(path, 'rb') as f: + f.seek(length - size) + return f.read(size) + + def head(self, path, size=1024): + """ Return first bytes of file """ + with self.open(path, 'rb', block_size=size) as f: + return f.read(size) + + def read_block(self, fn, offset, length, delimiter=None): + """ Read a block of bytes from an S3 file + + Starting at ``offset`` of the file, read ``length`` bytes. If + ``delimiter`` is set then we ensure that the read starts and stops at + delimiter boundaries that follow the locations ``offset`` and ``offset + + length``. If ``offset`` is zero then we start at zero. The + bytestring returned WILL include the end delimiter string. + + If offset+length is beyond the eof, reads to eof. + + Parameters + ---------- + fn: string + Path to filename on S3 + offset: int + Byte offset to start read + length: int + Number of bytes to read + delimiter: bytes (optional) + Ensure reading starts and stops at delimiter bytestring + + Examples + -------- + >>> s3.read_block('data/file.csv', 0, 13) # doctest: +SKIP + b'Alice, 100\\nBo' + >>> s3.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP + b'Alice, 100\\nBob, 200\\n' + + Use ``length=None`` to read to the end of the file. + >>> s3.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP + b'Alice, 100\\nBob, 200\\nCharlie, 300' + + See Also + -------- + distributed.utils.read_block + """ + with self.open(fn, 'rb') as f: + size = f.info()['Size'] + if length is None: + length = size + if offset + length > size: + length = size - offset + bytes = read_block(f, offset, length, delimiter) + return bytes + + +class S3File(object): + """ + Cached read-only interface to a key in S3, behaving like a seekable file. + + Optimized for a single continguous block. + """ + + def __init__(self, s3, path, mode='rb', block_size=4*2**20): + """ + Open S3 as a file. Data is only loaded and cached on demand. + + Parameters + ---------- + s3 : boto3 connection + bucket : string + S3 bucket to access + key : string + S3 key to access + blocksize : int + read-ahead size for finding delimiters + """ + self.mode = mode + if mode != 'rb': + raise NotImplementedError("File mode must be 'rb', not %s" % mode) + self.path = path + bucket, key = split_path(path) + self.s3 = s3 + self.size = self.info()['Size'] + self.bucket = bucket + self.key = key + self.blocksize = block_size + self.cache = b"" + self.loc = 0 + self.start = None + self.end = None + self.closed = False + + def info(self): + return self.s3.info(self.path) + + def tell(self): + return self.loc + + def seek(self, loc, whence=0): + if whence == 0: + self.loc = loc + elif whence == 1: + self.loc += loc + elif whence == 2: + self.loc = self.size + loc + else: + raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence) + if self.loc < 0: + self.loc = 0 + return self.loc + + def _fetch(self, start, end): + try: + if self.start is None and self.end is None: + # First read + self.start = start + self.end = end + self.blocksize + self.cache = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, + Range='bytes=%i-%i' % (start, self.end - 1) + )['Body'].read() + if start < self.start: + new = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, + Range='bytes=%i-%i' % (start, self.start - 1) + )['Body'].read() + self.start = start + self.cache = new + self.cache + if end > self.end: + if end > self.size: + return + new = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, + Range='bytes=%i-%i' % (self.end, end + self.blocksize - 1) + )['Body'].read() + self.end = end + self.blocksize + self.cache = self.cache + new + except ClientError: + self.start = min([start, self.start or self.size]) + self.end = max(end, self.end or self.size) + + def read(self, length=-1): + """ + Return data from cache, or fetch pieces as necessary + """ + if self.mode != 'rb': + raise ValueError('File not in read mode') + if length < 0: + length = self.size + if self.closed: + raise ValueError('I/O operation on closed file.') + self._fetch(self.loc, self.loc + length) + out = self.cache[self.loc - self.start: + self.loc - self.start + length] + self.loc += len(out) + return out + + def flush(self): + pass + + def close(self): + self.flush() + self.cache = None + self.closed = True + + def __str__(self): + return "" % (self.bucket, self.key) + + __repr__ = __str__ + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/distributed/tests/test_s3.py b/distributed/tests/test_s3.py index ce7d7d3ad57..608af5502e9 100644 --- a/distributed/tests/test_s3.py +++ b/distributed/tests/test_s3.py @@ -42,117 +42,14 @@ b'Frank,600,6\n')} + + @pytest.yield_fixture def s3(): - # could do with a bucket with write privileges. + # anonymous S3 access to real remote data yield S3FileSystem(anon=True) -def test_s3_file_access(s3): - fn = 'distributed-test/nested/file1' - data = b'hello\n' - assert s3.cat(fn) == data - assert s3.head(fn, 3) == data[:3] - assert s3.tail(fn, 3) == data[-3:] - assert s3.tail(fn, 10000) == data - - -def test_s3_file_info(s3): - fn = 'distributed-test/nested/file1' - data = b'hello\n' - assert fn in s3.walk('distributed-test') - assert s3.exists(fn) - assert not s3.exists(fn+'another') - assert s3.info(fn)['Size'] == len(data) - with pytest.raises((OSError, IOError)): - s3.info(fn+'another') - -def test_du(s3): - d = s3.du(test_bucket_name, deep=True) - assert all(isinstance(v, int) and v >= 0 for v in d.values()) - assert 'distributed-test/nested/file1' in d - - assert s3.du(test_bucket_name + '/test/', total=True) ==\ - sum(map(len, files.values())) - - -def test_s3_ls(s3): - fn = 'distributed-test/nested/file1' - assert fn not in s3.ls('distributed-test/') - assert fn in s3.ls('distributed-test/nested/') - assert fn in s3.ls('distributed-test/nested') - assert s3.ls('s3://distributed-test/nested/') == s3.ls('distributed-test/nested') - - -def test_s3_ls_detail(s3): - L = s3.ls('distributed-test/nested', detail=True) - assert all(isinstance(item, dict) for item in L) - - -def test_s3_glob(s3): - fn = 'distributed-test/nested/file1' - assert fn not in s3.glob('distributed-test/') - assert fn not in s3.glob('distributed-test/*') - assert fn in s3.glob('distributed-test/nested') - assert fn in s3.glob('distributed-test/nested/*') - assert fn in s3.glob('distributed-test/nested/file*') - assert fn in s3.glob('distributed-test/*/*') - - -def test_get_list_of_summary_objects(s3): - L = s3.ls(test_bucket_name + '/test') - - assert len(L) == 2 - assert [l.lstrip(test_bucket_name).lstrip('/') for l in sorted(L)] == sorted(list(files)) - - L2 = s3.ls('s3://' + test_bucket_name + '/test') - - assert L == L2 - - -def test_read_keys_from_bucket(s3): - for k, data in files.items(): - file_contents = s3.cat('/'.join([test_bucket_name, k])) - assert file_contents == data - - assert (s3.cat('/'.join([test_bucket_name, k])) == - s3.cat('s3://' + '/'.join([test_bucket_name, k]))) - - -@slow -def test_seek_delimiter(s3): - fn = 'test/accounts.1.json' - data = files[fn] - with s3.open('/'.join([test_bucket_name, fn])) as f: - seek_delimiter(f, b'}', 0) - assert f.tell() == 0 - f.seek(1) - seek_delimiter(f, b'}', 5) - assert f.tell() == data.index(b'}') + 1 - seek_delimiter(f, b'\n', 5) - assert f.tell() == data.index(b'\n') + 1 - f.seek(1, 1) - ind = data.index(b'\n') + data[data.index(b'\n')+1:].index(b'\n') + 1 - seek_delimiter(f, b'\n', 5) - assert f.tell() == ind + 1 - - -def test_read_s3_block(s3): - import io - data = files['test/accounts.1.json'] - lines = io.BytesIO(data).readlines() - path = 'distributed-test/test/accounts.1.json' - assert s3.read_block(path, 1, 35, b'\n') == lines[1] - assert s3.read_block(path, 0, 30, b'\n') == lines[0] - assert s3.read_block(path, 0, 35, b'\n') == lines[0] + lines[1] - assert s3.read_block(path, 0, 5000, b'\n') == data - assert len(s3.read_block(path, 0, 5)) == 5 - assert len(s3.read_block(path, 4, 5000)) == len(data) - 4 - assert s3.read_block(path, 5000, 5010) == b'' - - assert s3.read_block(path, 5, None) == s3.read_block(path, 5, 1000) - - @gen_cluster(timeout=60, executor=True) def test_read_bytes(e, s, a, b): futures = read_bytes(test_bucket_name+'/test/accounts.*', lazy=False) diff --git a/distributed/tests/test_s3fs.py b/distributed/tests/test_s3fs.py new file mode 100644 index 00000000000..14b3b2ff5aa --- /dev/null +++ b/distributed/tests/test_s3fs.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +import pytest +from distributed.s3fs import S3FileSystem +from distributed.s3 import seek_delimiter +from distributed.utils_test import slow +import moto + +# These get mirrored on s3://distributed-test/ +test_bucket_name = 'distributed-test' +files = {'test/accounts.1.json': (b'{"amount": 100, "name": "Alice"}\n' + b'{"amount": 200, "name": "Bob"}\n' + b'{"amount": 300, "name": "Charlie"}\n' + b'{"amount": 400, "name": "Dennis"}\n'), + 'test/accounts.2.json': (b'{"amount": 500, "name": "Alice"}\n' + b'{"amount": 600, "name": "Bob"}\n' + b'{"amount": 700, "name": "Charlie"}\n' + b'{"amount": 800, "name": "Dennis"}\n')} + +csv_files = {'2014-01-01.csv': (b'name,amount,id\n' + b'Alice,100,1\n' + b'Bob,200,2\n' + b'Charlie,300,3\n'), + '2014-01-02.csv': (b'name,amount,id\n'), + '2014-01-03.csv': (b'name,amount,id\n' + b'Dennis,400,4\n' + b'Edith,500,5\n' + b'Frank,600,6\n')} + +@pytest.yield_fixture +def s3(): + # could do with a bucket with write privileges. + yield S3FileSystem(anon=True) + + +def test_s3_file_access(s3): + fn = 'distributed-test/nested/file1' + data = b'hello\n' + assert s3.cat(fn) == data + assert s3.head(fn, 3) == data[:3] + assert s3.tail(fn, 3) == data[-3:] + assert s3.tail(fn, 10000) == data + + +def test_s3_file_info(s3): + fn = 'distributed-test/nested/file1' + data = b'hello\n' + assert fn in s3.walk('distributed-test') + assert s3.exists(fn) + assert not s3.exists(fn+'another') + assert s3.info(fn)['Size'] == len(data) + with pytest.raises((OSError, IOError)): + s3.info(fn+'another') + +def test_du(s3): + d = s3.du(test_bucket_name, deep=True) + assert all(isinstance(v, int) and v >= 0 for v in d.values()) + assert 'distributed-test/nested/file1' in d + + assert s3.du(test_bucket_name + '/test/', total=True) ==\ + sum(map(len, files.values())) + + +def test_s3_ls(s3): + fn = 'distributed-test/nested/file1' + assert fn not in s3.ls('distributed-test/') + assert fn in s3.ls('distributed-test/nested/') + assert fn in s3.ls('distributed-test/nested') + assert s3.ls('s3://distributed-test/nested/') == s3.ls('distributed-test/nested') + + +def test_s3_ls_detail(s3): + L = s3.ls('distributed-test/nested', detail=True) + assert all(isinstance(item, dict) for item in L) + + +def test_s3_glob(s3): + fn = 'distributed-test/nested/file1' + assert fn not in s3.glob('distributed-test/') + assert fn not in s3.glob('distributed-test/*') + assert fn in s3.glob('distributed-test/nested') + assert fn in s3.glob('distributed-test/nested/*') + assert fn in s3.glob('distributed-test/nested/file*') + assert fn in s3.glob('distributed-test/*/*') + + +def test_get_list_of_summary_objects(s3): + L = s3.ls(test_bucket_name + '/test') + + assert len(L) == 2 + assert [l.lstrip(test_bucket_name).lstrip('/') for l in sorted(L)] == sorted(list(files)) + + L2 = s3.ls('s3://' + test_bucket_name + '/test') + + assert L == L2 + + +def test_read_keys_from_bucket(s3): + for k, data in files.items(): + file_contents = s3.cat('/'.join([test_bucket_name, k])) + assert file_contents == data + + assert (s3.cat('/'.join([test_bucket_name, k])) == + s3.cat('s3://' + '/'.join([test_bucket_name, k]))) + + +@slow +def test_seek_delimiter(s3): + fn = 'test/accounts.1.json' + data = files[fn] + with s3.open('/'.join([test_bucket_name, fn])) as f: + seek_delimiter(f, b'}', 0) + assert f.tell() == 0 + f.seek(1) + seek_delimiter(f, b'}', 5) + assert f.tell() == data.index(b'}') + 1 + seek_delimiter(f, b'\n', 5) + assert f.tell() == data.index(b'\n') + 1 + f.seek(1, 1) + ind = data.index(b'\n') + data[data.index(b'\n')+1:].index(b'\n') + 1 + seek_delimiter(f, b'\n', 5) + assert f.tell() == ind + 1 + + +def test_read_s3_block(s3): + import io + data = files['test/accounts.1.json'] + lines = io.BytesIO(data).readlines() + path = 'distributed-test/test/accounts.1.json' + assert s3.read_block(path, 1, 35, b'\n') == lines[1] + assert s3.read_block(path, 0, 30, b'\n') == lines[0] + assert s3.read_block(path, 0, 35, b'\n') == lines[0] + lines[1] + assert s3.read_block(path, 0, 5000, b'\n') == data + assert len(s3.read_block(path, 0, 5)) == 5 + assert len(s3.read_block(path, 4, 5000)) == len(data) - 4 + assert s3.read_block(path, 5000, 5010) == b'' + + assert s3.read_block(path, 5, None) == s3.read_block(path, 5, 1000) From 54b0a2d9569385e2b4730350e6655d1580fa0554 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 9 Mar 2016 11:13:22 -0500 Subject: [PATCH 2/3] Add in some write functions and copy a few tests from hdfs3 More of the tests easily converted --- distributed/s3fs.py | 139 +++++++++++++++++++++++++++++---- distributed/tests/test_s3fs.py | 117 ++++++++++++++++++++++++++- 2 files changed, 240 insertions(+), 16 deletions(-) diff --git a/distributed/s3fs.py b/distributed/s3fs.py index 1f42c88d9c1..c17a4a4d23b 100644 --- a/distributed/s3fs.py +++ b/distributed/s3fs.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import logging import re +import io import boto3 from botocore.exceptions import ClientError @@ -62,11 +63,11 @@ def __init__(self, anon=True, key=None, secret=None, **kwargs): self.key = key self.secret = secret self.kwargs = kwargs - self.connect(anon, key, secret, kwargs) self.dirs = {} - self.s3 = self.connect(anon, key, secret, kwargs) + self.s3 = self.connect() - def connect(self, anon, key, secret, kwargs): + def connect(self): + anon, key, secret, kwargs = self.anon, self.key, self.secret, self.kwargs tok = tokenize(anon, key, secret, kwargs) if tok not in self._conn: logger.debug("Open S3 connection. Anonymous: %s", @@ -90,7 +91,7 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__.update(state) - self.s3 = self.connect(self.anon, self.key, self.secret, self.kwargs) + self.s3 = self.connect() def open(self, path, mode='rb', block_size=4*1024**2): """ Open a file for reading or writing @@ -136,7 +137,10 @@ def _ls(self, path, refresh=False): f['Size'] = 0 del f['Name'] else: - files = self.s3.list_objects(Bucket=bucket).get('Contents', []) + try: + files = self.s3.list_objects(Bucket=bucket).get('Contents', []) + except ClientError: + files = [] for f in files: f['Key'] = "/".join([bucket, f['Key']]) self.dirs[bucket] = list(sorted(files, key=lambda x: x['Key'])) @@ -155,7 +159,7 @@ def ls(self, path, detail=False): if not files: try: files = [self.info(path)] - except (OSError, IOError): + except (OSError, IOError, ClientError): files = [] if detail: return files @@ -238,6 +242,44 @@ def head(self, path, size=1024): with self.open(path, 'rb', block_size=size) as f: return f.read(size) + def mkdir(self, path): + self.touch(path) + + def mv(self, path1, path2): + self.copy(path1, path2) + self.rm(path1) + + def copy(self, path1, path2): + buc1, key1 = split_path(path1) + buc2, key2 = split_path(path2) + try: + self.s3.copy_object(Bucket=buc2, Key=key2, CopySource='/'.join([buc1, key1])) + except ClientError: + raise IOError('Copy failed on %s->%s', path1, path2) + self._ls(path2, refresh=True) + + def rm(self, path, recursive=True): + if recursive: + for f in self.walk(path): + self.rm(f, recursive=False) + bucket, key = split_path(path) + if key: + try: + out = self.s3.delete_object(Bucket=bucket, Key=key) + except ClientError: + raise IOError('Delete key failed: (%s, %s)', bucket, key) + else: + try: + out = self.s3.delete_bucket(Bucket=bucket) + except ClientError: + raise IOError('Delete bucket failed: %s', bucket) + if out['ResponseMetadata']['HTTPStatusCode'] != 204: + raise IOError('rm failed on %s', path) + self._ls(path, refresh=True) + + def touch(self, path): + self.open(path, mode='wb') + def read_block(self, fn, offset, length, delimiter=None): """ Read a block of bytes from an S3 file @@ -307,12 +349,11 @@ def __init__(self, s3, path, mode='rb', block_size=4*2**20): read-ahead size for finding delimiters """ self.mode = mode - if mode != 'rb': - raise NotImplementedError("File mode must be 'rb', not %s" % mode) + if mode not in {'rb', 'wb'}: + raise NotImplementedError("File mode must be 'rb' or 'wb', not %s" % mode) self.path = path bucket, key = split_path(path) self.s3 = s3 - self.size = self.info()['Size'] self.bucket = bucket self.key = key self.blocksize = block_size @@ -321,6 +362,14 @@ def __init__(self, s3, path, mode='rb', block_size=4*2**20): self.start = None self.end = None self.closed = False + if mode == 'wb': + self.buffer = io.BytesIO() + self.size = 0 + else: + try: + self.size = self.info()['Size'] + except ClientError: + raise IOError("File not accessible: %s", path) def info(self): return self.s3.info(self.path) @@ -329,18 +378,43 @@ def tell(self): return self.loc def seek(self, loc, whence=0): + if not self.mode == 'rb': + raise ValueError('Seek only available in read mode') if whence == 0: - self.loc = loc + nloc = loc elif whence == 1: - self.loc += loc + nloc = self.loc + loc elif whence == 2: - self.loc = self.size + loc + nloc = self.size + loc else: raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence) - if self.loc < 0: - self.loc = 0 + if nloc < 0: + raise ValueError('Seek before start of file') + self.loc = nloc return self.loc + def copy(self, path1, path2): + buc2, key2 = path2.lstrip('s3://').split('/', maxsplit=1) + out = self.s3.copy_object(Bucket=buc2, Key=key2, CopySource=path1.lstrip('s3://')) + if out['ResponseMetadata']['HTTPStatusCode'] != 200: + raise IOError('Copy failed on %s->%s', path1, path2) + self._ls(path2, refresh=True) + + def put(self, filename, path, chunk=2**27): + """ Copy local file to path in S3 """ + with self.open(path, 'wb') as f: + with open(filename, 'rb') as f2: + while True: + out = f2.read(chunk) + if len(out) == 0: + break + f.write(out) + self._ls(path, refresh=True) + + def mv(self, path1, path2): + self.copy(path1, path2) + self.rm(path1) + def _fetch(self, start, end): try: if self.start is None and self.end is None: @@ -384,14 +458,49 @@ def read(self, length=-1): self.loc += len(out) return out + def write(self, data): + """ + Write data to buffer. + + Buffer only sent to S3 on flush(). + """ + if self.mode != 'wb': + raise ValueError('File not in write mode') + if self.closed: + raise ValueError('I/O operation on closed file.') + return self.buffer.write(data) + def flush(self): - pass + """ + Write buffered data to S3. + """ + if self.mode == 'wb': + try: + self.s3.s3.head_bucket(Bucket=self.bucket) + except ClientError: + try: + self.s3.s3.create_bucket(Bucket=self.bucket) + except ClientError: + raise IOError('Create bucket failed: %s', self.bucket) + pos = self.buffer.tell() + self.buffer.seek(0) + try: + out = self.s3.s3.put_object(Bucket=self.bucket, Key=self.key, + Body=self.buffer.read()) + finally: + self.buffer.seek(pos) + self.s3._ls(self.bucket, refresh=True) + if out['ResponseMetadata']['HTTPStatusCode'] != 200: + raise IOError("Write failed: %s", out) def close(self): self.flush() self.cache = None self.closed = True + def __del__(self): + self.close() + def __str__(self): return "" % (self.bucket, self.key) diff --git a/distributed/tests/test_s3fs.py b/distributed/tests/test_s3fs.py index 14b3b2ff5aa..8f48d811379 100644 --- a/distributed/tests/test_s3fs.py +++ b/distributed/tests/test_s3fs.py @@ -25,11 +25,64 @@ b'Dennis,400,4\n' b'Edith,500,5\n' b'Frank,600,6\n')} +text_files = {'nested/file1': b'hello\n', + 'nested/file2': b'world', + 'nested/nested2/file1': b'hello\n', + 'nested/nested2/file2': b'world'} +a = 'tmp/test/a' +b = 'tmp/test/b' +c = 'tmp/test/c' +d = 'tmp/test/d' + @pytest.yield_fixture def s3(): - # could do with a bucket with write privileges. + # make writable local S3 system + m = moto.mock_s3() + m.start() + import boto3 + client = boto3.client('s3') + client.create_bucket(Bucket=test_bucket_name) + for flist in [files, csv_files, text_files]: + for f, data in flist.items(): + client.put_object(Bucket=test_bucket_name, Key=f, Body=data) yield S3FileSystem(anon=True) + m.stop() + + +def test_simple(s3): + data = b'a' * (10 * 2**20) + + with s3.open(a, 'wb') as f: + f.write(data) + + with s3.open(a, 'rb') as f: + out = f.read(len(data)) + assert len(data) == len(out) + assert out == data + + +def test_idempotent_connect(s3): + s3.connect() + s3.connect() + + +def test_ls_touch(s3): + assert not s3.ls('tmp/test') + s3.touch(a) + s3.touch(b) + L = s3.ls('tmp/test', True) + assert set(d['Key'] for d in L) == set([a, b]) + L = s3.ls('tmp/test', False) + assert set(L) == set([a, b]) + + +def test_rm(s3): + assert not s3.exists(a) + s3.touch(a) + assert s3.exists(a) + s3.rm(a) + assert not s3.exists(a) def test_s3_file_access(s3): @@ -103,6 +156,68 @@ def test_read_keys_from_bucket(s3): s3.cat('s3://' + '/'.join([test_bucket_name, k]))) +def test_seek(s3): + with s3.open(a, 'wb') as f: + f.write(b'123') + + with s3.open(a) as f: + f.seek(1000) + with pytest.raises(ValueError): + f.seek(-1) + with pytest.raises(ValueError): + f.seek(-5, 2) + with pytest.raises(ValueError): + f.seek(0, 10) + f.seek(0) + assert f.read(1) == b'1' + f.seek(0) + assert f.read(1) == b'1' + f.seek(3) + assert f.read(1) == b'' + f.seek(-1, 2) + assert f.read(1) == b'3' + f.seek(-1, 1) + f.seek(-1, 1) + assert f.read(1) == b'2' + for i in range(4): + assert f.seek(i) == i + + +def test_bad_open(s3): + with pytest.raises(IOError): + s3.open('') + + +def test_errors(s3): + with pytest.raises((IOError, OSError)): + s3.open('tmp/test/shfoshf', 'rb') + + ## This is fine, no need for interleving directories on S3 + #with pytest.raises((IOError, OSError)): + # s3.touch('tmp/test/shfoshf/x') + + with pytest.raises((IOError, OSError)): + s3.rm('tmp/test/shfoshf/x') + + with pytest.raises((IOError, OSError)): + s3.mv('tmp/test/shfoshf/x', 'tmp/test/shfoshf/y') + + #with pytest.raises((IOError, OSError)): + # s3.open('x', 'wb') + + with pytest.raises((IOError, OSError)): + s3.open('x', 'rb') + + #with pytest.raises(IOError): + # s3.chown('/unknown', 'someone', 'group') + + #with pytest.raises(IOError): + # s3.chmod('/unknonwn', 'rb') + + with pytest.raises(IOError): + s3.rm('unknown') + + @slow def test_seek_delimiter(s3): fn = 'test/accounts.1.json' From bfa30e9ed4ddf8c9e6f04475c3edf7f93577d9b8 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 9 Mar 2016 13:18:57 -0500 Subject: [PATCH 3/3] Implement retriable S3 fetch --- distributed/s3fs.py | 84 +++++++++++++++++++++++++++++---------------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/distributed/s3fs.py b/distributed/s3fs.py index c17a4a4d23b..2b9d127c868 100644 --- a/distributed/s3fs.py +++ b/distributed/s3fs.py @@ -5,6 +5,7 @@ import boto3 from botocore.exceptions import ClientError +from botocore.client import Config from dask.base import tokenize from .utils import read_block @@ -41,6 +42,8 @@ class S3FileSystem(object): Access S3 data as if it were a file system. """ _conn = {} + connect_timeout=5 + read_timeout=15 def __init__(self, anon=True, key=None, secret=None, **kwargs): """ @@ -74,11 +77,14 @@ def connect(self): self.anon) if self.anon: from botocore import UNSIGNED - from botocore.client import Config - s3 = boto3.Session().client('s3', - config=Config(signature_version=UNSIGNED)) + conf = Config(connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout, + signature_version=UNSIGNED) + s3 = boto3.Session().client('s3', config=conf) else: - s3 = boto3.Session(self.key, self.secret, + conf = Config(connect_timeout=self.connect_timeout, + read_timeout=self.read_timeout) + s3 = boto3.Session(self.key, self.secret, config=conf **self.kwargs).client('s3') self._conn[tok] = s3 return self._conn[tok] @@ -416,31 +422,24 @@ def mv(self, path1, path2): self.rm(path1) def _fetch(self, start, end): - try: - if self.start is None and self.end is None: - # First read - self.start = start - self.end = end + self.blocksize - self.cache = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, - Range='bytes=%i-%i' % (start, self.end - 1) - )['Body'].read() - if start < self.start: - new = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, - Range='bytes=%i-%i' % (start, self.start - 1) - )['Body'].read() - self.start = start - self.cache = new + self.cache - if end > self.end: - if end > self.size: - return - new = self.s3.s3.get_object(Bucket=self.bucket, Key=self.key, - Range='bytes=%i-%i' % (self.end, end + self.blocksize - 1) - )['Body'].read() - self.end = end + self.blocksize - self.cache = self.cache + new - except ClientError: - self.start = min([start, self.start or self.size]) - self.end = max(end, self.end or self.size) + if self.start is None and self.end is None: + # First read + self.start = start + self.end = end + self.blocksize + self.cache = _fetch_range(self.s3.s3, self.bucket, self.key, + start, self.end) + if start < self.start: + new = _fetch_range(self.s3.s3, self.bucket, self.key, + start, self.start) + self.start = start + self.cache = new + self.cache + if end > self.end: + if end > self.size: + return + new = _fetch_range(self.s3.s3, self.bucket, self.key, + self.end, end + self.blocksize) + self.end = end + self.blocksize + self.cache = self.cache + new def read(self, length=-1): """ @@ -511,3 +510,30 @@ def __enter__(self): def __exit__(self, *args): self.close() + +MAX_ATTEMPTS = 10 + + +def _fetch_range(client, bucket, key, start, end): + try: + for i in range(MAX_ATTEMPTS): + try: + resp = client.get_object(Bucket=bucket, Key=key, + Range='bytes=%i-%i' % (start, end - 1)) + except ClientError as e: + if e.response['Error'].get('Code', 'Unknown') in ['416', 'InvalidRange']: + return b'' + except Exception as e: + logger.debug('Exception %e on S3 download', e) + continue + buff = io.BytesIO() + buffer_size = 1024 * 16 + for chunk in iter(lambda: resp['Body'].read(buffer_size), + b''): + buff.write(chunk) + buff.seek(0) + return buff.read() + raise RuntimeError("Max number of S3 retries exceeded") + finally: + logger.debug("EXITING _fetch_range for part: %s/%s, %s-%s", + bucket, key, start, end)