Skip to content

POC: Sketch of localfilesystem and commiting #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions fsspec/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
import shutil
import tempfile
from .spec import AbstractFileSystem


class LocalFileSystem(AbstractFileSystem):
def mkdir(self, path, **kwargs):
os.mkdir(path, **kwargs)

def makedirs(self, path, exist_ok=False):
os.makedirs(path, exist_ok=exist_ok)

def rmdir(self, path):
os.rmdir(path)

def ls(self, path, detail=False):
paths = [os.path.abspath(os.path.join(path, f))
for f in os.listdir(path)]
if detail:
return [self.info(f) for f in paths]
else:
return paths

def walk(self, path, simple=False):
out = os.walk(os.path.abspath(path))
if simple:
results = []
for dirpath, dirnames, filenames in out:
results.extend([os.path.join(dirpath, f) for f in filenames])
return results
else:
return out

def info(self, path):
out = os.stat(path)
if os.path.isfile(path):
t = 'file'
elif os.path.isdir(path):
t = 'directory'
elif os.path.islink(path):
t = 'link'
else:
t = 'other'
result = {
'name': path,
'size': out.st_size,
'type': t,
'created': out.st_ctime
}
for field in ['mode', 'uid', 'gid', 'mtime']:
result[field] = getattr(out, 'st_' + field)
return result

def copy(self, path1, path2, **kwargs):
""" Copy within two locations in the filesystem"""
shutil.copyfile(path1, path2)

get = copy
put = copy

def mv(self, path1, path2, **kwargs):
""" Move file from one location to another """
os.rename(path1, path2)

def rm(self, path, recursive=False):
if recursive:
shutil.rmtree(path)
else:
os.remove(path)

def _open(self, path, mode='rb', block_size=None, **kwargs):
return LocalFileOpener(path, mode, **kwargs)

def touch(self, path, **kwargs):
""" Create empty file, or update timestamp """
if self.exists(path):
os.utime(path, None)
else:
open(path, 'a').close()


class LocalFileOpener(object):
def __init__ (self, path, mode, autocommit=True):
# TODO: does autocommit mean write directory to destination, or
# do move operation immediately on close
self.path = path
self._incontext = False
if autocommit or 'w' not in mode:
self.autocommit = True
self.f = open(path, mode=mode)
else:
# TODO: check if path is writable?
self.autocommit = False
i, name = tempfile.mkstemp()
self.temp = name
self.f = open(name, mode=mode)

def commit(self):
if self._incontext:
raise RuntimeError('Cannot commit while within file context')
os.rename(self.temp, self.path)

def discard(self):
if self._incontext:
raise RuntimeError('Cannot discard while within file context')
if self.autocommit is False:
os.remove(self.temp)

def __getattr__(self, item):
return getattr(self.f, item)

def __enter__(self):
self._incontext = True
return self.f

def __exit__(self, exc_type, exc_value, traceback):
self.f.close()
self._incontext = False
20 changes: 20 additions & 0 deletions fsspec/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import importlib
__all__ = ['registry', 'get_filesystem_class', 'default']

registry = {}
default = 'fsspec.local.LocalFileSystem'

known_implementations = {
'file': default,
}


def get_filesystem_class(protocol):
if protocol not in registry:
if protocol not in known_implementations:
raise ValueError("Protocol not known: %s" % protocol)
mod, name = protocol.rsplit('.', 1)
mod = importlib.import_module(mod)
registry[protocol] = getattr(mod, name)

return registry[protocol]
144 changes: 126 additions & 18 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@

from contextlib import contextmanager
from .utils import read_block


aliases = [
('makedir', 'mkdir'),
('listdir', 'ls'),
('cp', 'copy'),
('move', 'mv'),
('rename', 'mv'),
('delete', 'rm'),
]


class AbstractFileSystem(object):
"""
A specification for python file-systems
Expand All @@ -16,7 +26,12 @@ def __init__(self, *args, **kwargs):

A reasonable default should be provided if there are no arguments
"""
self.autocommit = True
self._intrans = False
self._transaction = Transaction(self)
self._singleton[0] = self
for new, old in aliases:
setattr(self, new, getattr(self, old))

@classmethod
def current(cls):
Expand All @@ -25,10 +40,19 @@ def current(cls):
If no instance has been created, then create one with defaults
"""
if not cls._singleton[0]:
return AbstractFileSystem()
return cls()
else:
return cls._singleton[0]

@property
def transaction(self):
"""A context within which files are committed together upon exit

Requires the file class to implement `.commit()` and `.discard()`
for the normal and exception cases.
"""
return self._transaction

def invalidate_cache(self, path=None):
"""
Discard any cached directory information
Expand Down Expand Up @@ -57,6 +81,21 @@ def mkdir(self, path, **kwargs):
"""
pass

def makedirs(self, path, exist_ok=False):
"""Recursively make directories

Creates directory at path and any intervening required directories.
Raises exception if, for instance, the path already exists but is a
file.

Parameters
----------
path: str
leaf directory name
exist_ok: bool (False)
If True, will error if the target already exists
"""

def rmdir(self, path):
"""Remove a directory, if empty"""
pass
Expand Down Expand Up @@ -87,14 +126,22 @@ def ls(self, path, detail=False):
"""
pass

def walk(self, path, detail=False):
def walk(self, path, simple=False):
""" Return all files belows path

Like ``ls``, but recursing into subdirectories. If detail is False,
returns a list of full paths.
Similar to ``ls``, but recursing into subdirectories.

Parameters
----------
path: str
Root to recurse into
simple: bool (False)
If True, returns a list of filenames. If False, returns an
iterator over tuples like
(dirpath, dirnames, filenames), see ``os.walk``.
"""

def du(self, path, total=False, deep=False):
def du(self, path, total=False):
"""Space used by files within a path

If total is True, returns a number (bytes), if False, returns a
Expand All @@ -103,14 +150,12 @@ def du(self, path, total=False, deep=False):
Parameters
----------
total: bool
whether to sum all the file sized
deep: bool
whether to descend into subdirectories.
whether to sum all the file sizes
"""
if deep:
sizes = {f['name']: f['size'] for f in self.walk(path, True)}
else:
sizes = {f['name']: f['size'] for f in self.ls(path, True)}
sizes = {}
for f in self.walk(path, True):
info = self.info(f)
sizes[info['name']] = info['size']
if total:
return sum(sizes.values())
else:
Expand Down Expand Up @@ -147,20 +192,35 @@ def glob(self, path):

def exists(self, path):
"""Is there a file at the given path"""
pass
try:
self.info(path)
return True
except:
return False

def info(self, path):
"""Give details of entry at path

Returns a single dictionary, with exactly the same information as ``ls``
would with ``detail=True``

Returns
-------
dict with keys: name (full path in the FS), size (in bytes), type (file,
directory, or something else) and other FS-specific keys.
"""

def isdir(self, path):
"""Is this entry directory-like?"""
return self.info(path)['type'] == 'directory'

def isfile(self, path):
"""Is this entry file-like?"""
return self.info(path)['type'] == 'file'

def cat(self, path):
""" Get the content of a file """
return self.open(path, 'rb').read()

def get(self, rpath, lpath, **kwargs):
""" Copy file to local
Expand All @@ -185,6 +245,7 @@ def tail(self, path, size=1024):
def copy(self, path1, path2, **kwargs):
""" Copy within two locations in the filesystem"""


def mv(self, path1, path2, **kwargs):
""" Move file from one location to another """
self.copy(path1, path2, **kwargs)
Expand All @@ -202,6 +263,10 @@ def rm(self, path, recursive=False):
also remove the directory
"""

def _open(self, path, mode='rb', block_size=None, autocommit=True,
**kwargs):
pass

def open(self, path, mode='rb', block_size=None, **kwargs):
"""
Return a file-like object from the filesystem
Expand All @@ -211,6 +276,8 @@ def open(self, path, mode='rb', block_size=None, **kwargs):

Parameters
----------
path: str
Target file
mode: str like 'rb', 'w'
See builtin ``open()``
block_size: int
Expand All @@ -220,12 +287,24 @@ def open(self, path, mode='rb', block_size=None, **kwargs):
if 'b' not in mode:
mode = mode.replace('t', '') + 'b'
return io.TextIOWrapper(
self.open(self, path, mode, block_size, **kwargs))
self.open(path, mode, block_size, **kwargs))
else:
ac = kwargs.pop('autocommit', not self._intrans)
if not self._intrans and not ac:
raise ValueError('Must use autocommit outside a transaction.')
f = self._open(path, mode=mode, block_size=block_size,
autocommit=ac, **kwargs)
if not ac:
self.transaction.files.append(f)
return f

def touch(self, path, **kwargs):
""" Create empty file """
with self.open(path, 'wb', **kwargs):
pass
""" Create empty file, or update timestamp """
if not self.exists(path):
with self.open(path, 'wb', **kwargs):
pass
else:
raise NotImplementedError # update timestamp, if possible

def read_block(self, fn, offset, length, delimiter=None):
""" Read a block of bytes from
Expand Down Expand Up @@ -280,3 +359,32 @@ def __getstate__(self):

def __setstate__(self, state):
self.__dict__.update(state)


class Transaction(object):
"""Filesystem transaction context

Gathers files for deferred commit or discard, so that several write
operations can be finalized semi-atomically.
"""

def __init__(self, fs):
self.fs = fs

def __enter__(self):
self.files = []
self.fs._intrans = True

def __exit__(self, exc_type, exc_val, exc_tb):
# only commit if there was no exception
self.complete(commit=exc_type is None)

def complete(self, commit=True):
# TODO: define behaviour in case of exception during completion
for f in self.files:
if commit:
f.commit()
else:
f.discard()
self.files = []
self.fs._intrans = False