Skip to content

Add ABSStore #288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 6, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,3 +1883,156 @@ def __delitem__(self, key):
with self._mutex:
self._invalidate_keys()
self._invalidate_value(key)


class ABSStore(MutableMapping):
"""Storage class using Azure Blob Storage (ABS).

Parameters
----------
container_name : string
The name of the ABS container to use. Currently this must exist in the
storage account.
prefix : string
Location of the "directory" to use as the root of the storage hierarchy
within the container.
account_name : string
The Azure blob storage account name.
account_key : string
The Azure blob storage account acess key.

Notes
-----
In order to use this store, you must install the Azure Blob Storage
`Python Client Library <https://github.com/Azure/azure-storage-python/tree/master/azure-storage-blob>`_ version >= 1.3.0.
"""

def __init__(self, container_name, prefix, account_name, account_key):
self.account_name = account_name
self.account_key = account_key
self.container_name = container_name
self.prefix = normalize_storage_path(prefix)
self.initialize_container()

def initialize_container(self):
from azure.storage.blob import BlockBlobService
self.client = BlockBlobService(self.account_name, self.account_key)

# needed for pickling
def __getstate__(self):
state = self.__dict__.copy()
return state

def __setstate__(self, state):
self.__dict__.update(state)
self.initialize_container()

def __enter__(self):
return self

def __exit__(self, *args):
pass

def _append_path_to_prefix(path, prefix):
return '/'.join([normalize_storage_path(prefix),
normalize_storage_path(path)])

def full_path(self, path=None):
return _append_path_to_prefix(path, self.prefix)

def __getitem__(self, key):
blob_name = '/'.join([self.prefix, key])
blob = self.client.get_blob_to_bytes(self.container_name, blob_name)
if blob:
return blob.content
else:
raise KeyError('Blob %s not found' % blob_name)

def __setitem__(self, key, value):
blob_name = '/'.join([self.prefix, key])
self.client.create_blob_from_text(self.container_name, blob_name, value)

def __delitem__(self, key):
raise NotImplementedError

def __eq__(self, other):
return (
isinstance(other, ABSStore) and
self.container_name == other.container_name and
self.prefix == other.prefix
)

def keys(self):
raise NotImplementedError

def __iter__(self):
raise NotImplementedError

def __len__(self):
raise NotImplementedError

def __contains__(self, key):
blob_name = '/'.join([self.prefix, key])
if self.client.exists(self.container_name, blob_name):
return True
else:
return False

def list_abs_directory_blobs(self, prefix):
"""Return list of all blobs from an abs prefix."""
return [blob.name for blob in self.client.list_blobs(self.container_name)]

def list_abs_subdirectories(self, prefix):
"""Return list of all "subdirectories" from an abs prefix."""
return list(set([blob.name.rsplit('/', 1)[0] for blob in self.client.list_blobs(self.container_name) if '/' in blob.name]))

def _strip_prefix_from_path(path, prefix):
# normalized things will not have any leading or trailing slashes
path_norm = normalize_storage_path(path)
prefix_norm = normalize_storage_path(prefix)
if path_norm.startswith(prefix_norm):
return path_norm[(len(prefix_norm)+1):]
else:
return path

def list_abs_directory(self, prefix, strip_prefix=True):
"""Return a list of all blobs and subdirectories from an abs prefix."""
items = set()
items.update(self.list_abs_directory_blobs(prefix))
items.update(self.list_abs_subdirectories(prefix))
items = list(items)
if strip_prefix:
items = [_strip_prefix_from_path(path, prefix) for path in items]
return items

def dir_path(self, path=None):
store_path = normalize_storage_path(path)
# prefix is normalized to not have a trailing slash
dir_path = self.prefix
if store_path:
dir_path = os.path.join(dir_path, store_path)
else:
dir_path += '/'
return dir_path

def listdir(self, path=None):
dir_path = self.dir_path(path)
return sorted(self.list_abs_directory(dir_path, strip_prefix=True))

def rename(self, src_path, dst_path):
raise NotImplementedErrror

def rmdir(self, path=None):
dir_path = normalize_storage_path(self.full_path(path)) + '/'
for blob in self.client.list_blobs(self.container_name, prefix=dir_path):
self.client.delete_blob(self.container_name, blob.name)

def getsize(self, path=None):
dir_path = self.dir_path(path)
size = 0
for blob in self.client.list_blobs(self.container_name, prefix=dir_path):
size += blob.properties.content_length
return size

def clear(self):
raise NotImplementedError