diff --git a/zarr/storage.py b/zarr/storage.py index 39a497d08b..7964e3dd01 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -1883,3 +1883,156 @@ def __delitem__(self, key): with self._mutex: self._invalidate_keys() self._invalidate_value(key) + + +class ABSStore(MutableMapping): + """Storage class using Azure Blob Storage (ABS). + + Parameters + ---------- + container_name : string + The name of the ABS container to use. Currently this must exist in the + storage account. + prefix : string + Location of the "directory" to use as the root of the storage hierarchy + within the container. + account_name : string + The Azure blob storage account name. + account_key : string + The Azure blob storage account acess key. + + Notes + ----- + In order to use this store, you must install the Azure Blob Storage + `Python Client Library `_ version >= 1.3.0. + """ + + def __init__(self, container_name, prefix, account_name, account_key): + self.account_name = account_name + self.account_key = account_key + self.container_name = container_name + self.prefix = normalize_storage_path(prefix) + self.initialize_container() + + def initialize_container(self): + from azure.storage.blob import BlockBlobService + self.client = BlockBlobService(self.account_name, self.account_key) + + # needed for pickling + def __getstate__(self): + state = self.__dict__.copy() + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self.initialize_container() + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def _append_path_to_prefix(path, prefix): + return '/'.join([normalize_storage_path(prefix), + normalize_storage_path(path)]) + + def full_path(self, path=None): + return _append_path_to_prefix(path, self.prefix) + + def __getitem__(self, key): + blob_name = '/'.join([self.prefix, key]) + blob = self.client.get_blob_to_bytes(self.container_name, blob_name) + if blob: + return blob.content + else: + raise KeyError('Blob %s not found' % blob_name) + + def __setitem__(self, key, value): + blob_name = '/'.join([self.prefix, key]) + self.client.create_blob_from_text(self.container_name, blob_name, value) + + def __delitem__(self, key): + raise NotImplementedError + + def __eq__(self, other): + return ( + isinstance(other, ABSStore) and + self.container_name == other.container_name and + self.prefix == other.prefix + ) + + def keys(self): + raise NotImplementedError + + def __iter__(self): + raise NotImplementedError + + def __len__(self): + raise NotImplementedError + + def __contains__(self, key): + blob_name = '/'.join([self.prefix, key]) + if self.client.exists(self.container_name, blob_name): + return True + else: + return False + + def list_abs_directory_blobs(self, prefix): + """Return list of all blobs from an abs prefix.""" + return [blob.name for blob in self.client.list_blobs(self.container_name)] + + def list_abs_subdirectories(self, prefix): + """Return list of all "subdirectories" from an abs prefix.""" + return list(set([blob.name.rsplit('/', 1)[0] for blob in self.client.list_blobs(self.container_name) if '/' in blob.name])) + + def _strip_prefix_from_path(path, prefix): + # normalized things will not have any leading or trailing slashes + path_norm = normalize_storage_path(path) + prefix_norm = normalize_storage_path(prefix) + if path_norm.startswith(prefix_norm): + return path_norm[(len(prefix_norm)+1):] + else: + return path + + def list_abs_directory(self, prefix, strip_prefix=True): + """Return a list of all blobs and subdirectories from an abs prefix.""" + items = set() + items.update(self.list_abs_directory_blobs(prefix)) + items.update(self.list_abs_subdirectories(prefix)) + items = list(items) + if strip_prefix: + items = [_strip_prefix_from_path(path, prefix) for path in items] + return items + + def dir_path(self, path=None): + store_path = normalize_storage_path(path) + # prefix is normalized to not have a trailing slash + dir_path = self.prefix + if store_path: + dir_path = os.path.join(dir_path, store_path) + else: + dir_path += '/' + return dir_path + + def listdir(self, path=None): + dir_path = self.dir_path(path) + return sorted(self.list_abs_directory(dir_path, strip_prefix=True)) + + def rename(self, src_path, dst_path): + raise NotImplementedErrror + + def rmdir(self, path=None): + dir_path = normalize_storage_path(self.full_path(path)) + '/' + for blob in self.client.list_blobs(self.container_name, prefix=dir_path): + self.client.delete_blob(self.container_name, blob.name) + + def getsize(self, path=None): + dir_path = self.dir_path(path) + size = 0 + for blob in self.client.list_blobs(self.container_name, prefix=dir_path): + size += blob.properties.content_length + return size + + def clear(self): + raise NotImplementedError