import contextlib
import re
import time
import h5py
import numpy as np
import pandas as pd
[docs]
class HDF_Object:
"""
A generic class to access HDF components.
"""
@property
def read_only(self):
return self._read_only
@property
def truncate(self):
return self._truncate
@property
def file_name(self):
return self._file_name
@property
def name(self):
return self._name
def __eq__(self, other):
return (self.file_name == other.self.file_name) and (self.name == other.name)
[docs]
@contextlib.contextmanager
def editing_mode(self, read_only: bool = False, truncate: bool = True):
original_read_only = self.read_only
original_truncate = self.truncate
try:
self.set_read_only(read_only)
self.set_truncate(truncate)
yield self
finally:
self.set_read_only(original_read_only)
self.set_truncate(original_truncate)
@property
def metadata(self):
with h5py.File(self.file_name) as hdf_file:
return dict(hdf_file[self.name].attrs)
[docs]
def __init__(
self,
*,
file_name: str,
name: str,
read_only: bool = True,
truncate: bool = False,
):
object.__setattr__(self, "_read_only", read_only)
object.__setattr__(self, "_truncate", truncate)
object.__setattr__(self, "_file_name", file_name)
object.__setattr__(self, "_name", name)
for key, value in self.metadata.items():
object.__setattr__(self, key, value)
[docs]
def set_read_only(self, read_only: bool = True):
object.__setattr__(self, "_read_only", read_only)
[docs]
def set_truncate(self, truncate: bool = True):
object.__setattr__(self, "_truncate", truncate)
def __setattr__(self, name, value):
if self.read_only:
raise AttributeError("Cannot set read-only attributes")
elif not isinstance(name, str):
raise KeyError(f"Attribute name '{name}' is not a string")
elif not bool(re.match(r"^[a-zA-Z_][\w.-]*$", name)):
raise KeyError(f"Invalid attribute name: {name}")
if (not self.truncate) and (name in self.metadata):
raise KeyError(f"Attribute '{name}' cannot be truncated")
if isinstance(value, (str, bool, int, float)):
with h5py.File(self.file_name, "a") as hdf_file:
hdf_object = hdf_file[self.name]
hdf_object.attrs[name] = value
object.__setattr__(self, name, value)
else:
raise NotImplementedError(
f"Type '{type(name)}' is invalid for attribute {name}. "
"Only (str, bool, int, float) types are accepted."
)
[docs]
class HDF_Group(HDF_Object):
[docs]
def __init__(
self,
*,
file_name: str,
name: str,
read_only: bool = True,
truncate: bool = False,
):
super().__init__(
file_name=file_name,
name=name,
read_only=read_only,
truncate=truncate,
)
for dataset_name in self.dataset_names:
dataset = HDF_Dataset(
file_name=self.file_name,
name=f"{self.name}/{dataset_name}",
read_only=self.read_only,
truncate=self.truncate,
)
object.__setattr__(self, dataset_name, dataset)
for group_name in self.group_names:
group = HDF_Group(
file_name=self.file_name,
name=f"{self.name}/{group_name}",
read_only=self.read_only,
truncate=self.truncate,
)
object.__setattr__(self, group_name, group)
for dataframe_name in self.dataframe_names:
dataframe = HDF_Dataframe(
file_name=self.file_name,
name=f"{self.name}/{dataframe_name}",
read_only=self.read_only,
truncate=self.truncate,
)
object.__setattr__(self, dataframe_name, dataframe)
def __len__(self):
return sum([len(component) for component in self.components])
@property
def group_names(self):
return self.components[0]
@property
def dataset_names(self):
return self.components[1]
@property
def dataframe_names(self):
return self.components[2]
@property
def groups(self):
return [self.__getattribute__(name) for name in self.group_names]
@property
def datasets(self):
return [self.__getattribute__(name) for name in self.dataset_names]
@property
def dataframes(self):
return [self.__getattribute__(name) for name in self.dataframe_names]
@property
def components(self):
group_names = []
dataset_names = []
datafame_names = []
with h5py.File(self.file_name) as hdf_file:
hdf_object = hdf_file[self.name]
for name in sorted(hdf_object):
if isinstance(hdf_object[name], h5py.Dataset):
if not name.endswith("_mmap"):
dataset_names.append(name)
else:
if (
name.endswith("_df")
or "is_pd_dataframe" in hdf_object[name].attrs
):
datafame_names.append(name)
else:
group_names.append(name)
return group_names, dataset_names, datafame_names
[docs]
def set_read_only(self, read_only: bool = True):
super().__setattr__(self, "_read_only", read_only)
for dataset_name in self.dataset_names:
self.__getattribute__(dataset_name).set_read_only(read_only)
for group_name in self.group_names:
self.__getattribute__(group_name).set_read_only(read_only)
for dataframe_name in self.dataframe_names:
self.__getattribute__(dataframe_name).set_read_only(read_only)
[docs]
def set_truncate(self, truncate: bool = True):
super().__setattr__(self, "_truncate", truncate)
for dataset_name in self.dataset_names:
self.__getattribute__(dataset_name).set_truncate(truncate)
for group_name in self.group_names:
self.__getattribute__(group_name).set_truncate(truncate)
for dataframe_name in self.dataframe_names:
self.__getattribute__(dataframe_name).set_truncate(truncate)
def __setattr__(self, name, value):
try:
super().__setattr__(name, value)
except NotImplementedError as e:
if not self.truncate:
if name in self.group_names:
raise KeyError(f"Group name '{name}' cannot be truncated") from e
elif name in self.dataset_names:
raise KeyError(f"Dataset name '{name}' cannot be truncated") from e
elif name in self.dataframe_names:
raise KeyError(
f"Dataframe name '{name}' cannot be truncated"
) from e
if isinstance(value, (np.ndarray, pd.core.series.Series)):
self.add_dataset(name, value)
elif isinstance(value, (dict, pd.DataFrame)):
self.add_group(name, value)
else:
raise NotImplementedError(
f"Type '{type(value)}' is invalid for attribute {name}",
"Only (str, bool, int, float, np.ndarray, "
"pd.core.series.Series, dict pd.DataFrame) types are "
"accepted.",
) from e
[docs]
def add_dataset(
self,
name: str,
array: np.ndarray,
):
with h5py.File(self.file_name, "a") as hdf_file:
hdf_object = hdf_file[self.name]
if name in hdf_object:
del hdf_object[name]
mmap_name = f"{name}_mmap"
if mmap_name in hdf_object:
del hdf_object[mmap_name]
if isinstance(array, (pd.core.series.Series)):
array = array.values
# if array.dtype == np.dtype('O'):
# print("YAR")
# # dtype = h5py.string_dtype(encoding='utf-8')
# dtype = h5py.vlen_dtype(str)
# else:
# dtype = array.dtype
# # data=value_.astype(str).values,
# # # dtype=h5py.string_dtype(encoding='utf-8')
# # dtype=h5py.vlen_dtype(str),
try:
hdf_object.create_dataset(
name,
data=array,
compression="lzf",
shuffle=True,
chunks=True,
# chunks=array.shape,
maxshape=tuple([None for i in array.shape]),
)
except TypeError as e:
raise NotImplementedError(
f"Type {array.dtype} is not understood. "
"If this is a string format, try to cast it to "
"np.dtype('O') as possible solution."
) from e
dataset = HDF_Dataset(
file_name=self.file_name,
name=f"{self.name}/{name}",
read_only=self.read_only,
truncate=self.truncate,
)
dataset.last_updated = time.asctime()
object.__setattr__(self, name, dataset)
[docs]
def add_group(
self,
name: str,
group: dict,
):
with h5py.File(self.file_name, "a") as hdf_file:
hdf_object = hdf_file[self.name]
if name in hdf_object:
del hdf_object[name]
hdf_object.create_group(name)
if isinstance(group, pd.DataFrame):
if not name.endswith("_df"):
raise TypeError(f"DataFrame group name `{name}` must end with `_df`")
group = dict(group)
# group["is_pd_dataframe"] = True
new_group = HDF_Dataframe(
file_name=self.file_name,
name=f"{self.name}/{name}",
read_only=self.read_only,
truncate=self.truncate,
)
else:
new_group = HDF_Group(
file_name=self.file_name,
name=f"{self.name}/{name}",
read_only=self.read_only,
truncate=self.truncate,
)
for key, value in group.items():
new_group.__setattr__(key, value)
new_group.last_updated = time.asctime()
object.__setattr__(self, name, new_group)
[docs]
class HDF_Dataset(HDF_Object):
[docs]
def __init__(
self,
*,
file_name: str,
name: str,
read_only: bool = True,
truncate: bool = False,
):
super().__init__(
file_name=file_name,
name=name,
read_only=read_only,
truncate=truncate,
)
object.__setattr__(self, "mmap_name", f"{self.name}_mmap")
with h5py.File(self.file_name, "r") as hdf_file:
mmap_exists = self.mmap_name in hdf_file
object.__setattr__(self, "mmap_exists", mmap_exists)
def __len__(self):
return self.shape[0]
@property
def dtype(self):
with h5py.File(self.file_name) as hdf_file:
return hdf_file[self.name].dtype
@property
def shape(self):
with h5py.File(self.file_name) as hdf_file:
return hdf_file[self.name].shape
@property
def values(self):
return self[...]
def __getitem__(self, keys):
with h5py.File(self.file_name) as hdf_file:
hdf_object = hdf_file[self.name]
if h5py.check_string_dtype(hdf_object.dtype) is not None:
hdf_object = hdf_object.asstr()
return hdf_object[keys]
[docs]
def append(self, data):
if self.read_only:
raise AttributeError("Cannot append read-only dataset")
with h5py.File(self.file_name, "a") as hdf_file:
hdf_object = hdf_file[self.name]
new_shape = tuple([i + j for i, j in zip(self.shape, data.shape)])
old_size = len(self)
hdf_object.resize(new_shape)
hdf_object[old_size:] = data
[docs]
def set_slice(self, slice_selection, values):
if self.read_only:
raise AttributeError("Cannot set slice of read-only dataset")
with h5py.File(self.file_name, "a") as hdf_file:
hdf_object = hdf_file[self.name]
hdf_object[slice_selection] = values
if self.mmap_exists:
hdf_object = hdf_file[self.mmap_name]
hdf_object[slice_selection] = values
[docs]
def delete_mmap(self):
if self.read_only:
raise AttributeError("Cannot delete read-only mmap of dataset")
if self.mmap_exists:
with h5py.File(self.file_name, "a") as hdf_file:
del hdf_file[self.mmap_name]
object.__setattr__(self, "mmap_exists", False)
[docs]
def create_mmap(self):
if self.read_only:
raise AttributeError("Cannot create read-only mmap of dataset")
if self.mmap_exists:
self.delete_mmap()
with h5py.File(self.file_name, "a") as hdf_file:
hdf_object = hdf_file[self.name]
subgroup = hdf_file.create_dataset(
self.mmap_name,
hdf_object.shape,
dtype=hdf_object.dtype,
)
for i in hdf_object.iter_chunks():
subgroup[i] = hdf_object[i]
object.__setattr__(self, "mmap_exists", True)
@property
def mmap(self):
if not self.mmap_exists:
self.create_mmap()
with h5py.File(self.file_name, "r") as hdf_file:
subgroup = hdf_file[self.mmap_name]
offset = subgroup.id.get_offset()
shape = subgroup.shape
import mmap
with open(self.file_name, "rb") as raw_hdf_file:
mmap_obj = mmap.mmap(raw_hdf_file.fileno(), 0, access=mmap.ACCESS_READ)
return np.frombuffer(
mmap_obj, dtype=subgroup.dtype, count=np.prod(shape), offset=offset
).reshape(shape)
[docs]
class HDF_Dataframe(HDF_Group):
@property
def dtype(self):
dtypes = []
for column_name in self.dataset_names:
dtype = self.__getattribute__(column_name).dtype
dtypes.append(dtype)
return list(dtypes)
@property
def columns(self):
return self.dataset_names
def __len__(self):
return len(self.__getattribute__(self.dataset_names[0]))
@property
def values(self):
return self[...]
def __getitem__(self, keys):
df_dict = {}
for column_name in self.dataset_names:
dataset = self.__getattribute__(column_name)
if isinstance(dataset, HDF_Dataset):
df_dict[column_name] = dataset[keys]
return pd.DataFrame(df_dict)
[docs]
def append(self, data):
for column_name in self.dataset_names:
dataset = self.__getattribute__(column_name)
if isinstance(dataset, HDF_Dataset):
dataset.append(data[column_name])
[docs]
def set_slice(self, slice_selection, df):
if self.read_only:
raise AttributeError("Cannot set slice of read-only dataframe")
for column_name in self.dataset_names:
dataset = self.__getattribute__(column_name)
dataset.set_slice(slice_selection, df[column_name])
[docs]
class HDF_File(HDF_Group):
[docs]
def __init__(
self,
file_name: str,
*,
read_only: bool = True,
truncate: bool = False,
delete_existing: bool = False,
):
"""HDF file object to load/save the hdf file. It also provides convenient
attribute-like accesses to operate the data in the HDF object.
Instead of relying directly on the `h5py` interface, we will use an HDF wrapper
file to provide consistent access to only those specific HDF features we want.
Since components of an HDF file come in three shapes `datasets`, `groups` and `attributes`,
we will first define a generic HDF wrapper object to handle these components.
Once this is done, the HDF wrapper file can be treated as such an object with additional
features to open and close the initial connection.
Args:
file_name (str): file path.
read_only (bool, optional): If hdf is read-only. Mutually exclusive with `delete_existing` and `truncate`. Defaults to True.
truncate (bool, optional): If existing groups and datasets can be
truncated (i.e. are overwitten). Mutually exclusive with `read_only`. Defaults to False.
delete_existing (bool, optional): If the file already exists,
delete it completely and create a new one. Mutually exclusive with `read_only`. Defaults to False.
Examples::
>>> # create a hdf file to write
>>> hdf_file = HDF_File(hdf_file_path, read_only=False, truncate=True, delete_existing=True)
>>> # create an empty group as "dfs"
>>> hdf_file.dfs = {}
>>> # write a DataFrame dataset into the dfs
>>> hdf_file.dfs.df1 = pd.DataFrame({'a':[1,2,3]})
>>> # write another DataFrame dataset into the dfs
>>> hdf_file.dfs.df2 = pd.DataFrame({'a':[3,2,1]})
>>> # set a property value to the dataframe
>>> hdf_file.dfs.df1.data_from = "colleagues"
>>> # get a dataframe dataset from a dfs
>>> df1 = hdf_file.dfs.df1.values
>>> # features below are not important, but may be useful sometimes
>>> # get the dataframe via the dataset name instead of attribute
>>> df1 = hdf_file.dfs.__getattribute__("df1").values
>>> # get the dataframe via the dataset path (i.e. "dfs/df1")
>>> df1 = hdf_file.__getattribute__('dfs').__getattribute__("df1").values
>>> hdf_file.dfs.df1.data_from
"colleagues"
"""
if read_only and (delete_existing or truncate):
raise ValueError(
"Parameters 'delete_existing'/'truncate' are mutually exclusive with 'read_only'."
)
if read_only:
mode = "r"
elif delete_existing:
mode = "w"
else:
mode = "a"
with h5py.File(file_name, mode):
pass
super().__init__(
file_name=file_name,
name="/",
read_only=read_only,
truncate=truncate,
)