FSHTBKVS/fshtbkvs/FSHTBKVS.py

520 lines
12 KiB
Python
Raw Permalink Normal View History

2023-09-29 15:43:16 +02:00
import hashlib
import json
import os
from pathlib import Path
class FSHTBKVS:
"""
Filesystem Hash Table Based Key Value Store (FSHTBKVS)
The FSHTBKVS was developed to be an easy to deploy
key value store with fast lookup times for (very)
large data sets and without third party dependencies.
max_depth and the amount of files created:
1: 16^1 = 16
2: 16^2 = 256
3: 16^3 = 4.096
4: 16^4 = 65.536
5: 16^5 = 1.048.576
6: 16^6 = 16.777.216
7: 16^7 = 268.435.456
"""
def __init__(self, root_dir, kvs_name, max_depth=4):
self.__root_dir = os.path.normpath(root_dir)
if not os.path.exists(root_dir):
raise ValueError("root_dir '" + root_dir + "' does not exist")
self.__kvs_name = kvs_name
self.__root_dir = os.path.join(self.__root_dir, self.__kvs_name)
self.__meta_file = os.path.join(self.__root_dir, 'meta.json')
self.__max_depth = max_depth if max_depth in range(1, 7) else 4
self.__entries = 0
self.__all_file_paths = []
self.__all_folder_paths = []
if not os.path.exists(self.__root_dir):
os.makedirs(self.__root_dir)
self.__create_meta_file()
self.__build_all_paths()
if self.__load_meta_file() is False:
raise OSError(
"Not able to load or restore the meta file: "
+ str(self.__meta_file)
)
def delete(self, key):
"""
Deletes an entry with the key 'key' from the kvs
"""
self.__validate_key(key)
key = self.__process_key(key)
file = self.__get_file_by_key(key)
data = self.__load_dict_from_json_file(file)
if not key in data:
return 1
del data[key]
data_written = self.__save_dict_to_json_file(file, data)
if not data_written:
return -1
self.__entries -= 1
self.__create_meta_file()
return 1
def export_kvs(self, file=''):
"""
Exports whole kvs data as importable .fshtbkvs file
"""
if file == '':
file = os.path.join(
os.path.dirname(self.__root_dir),
self.__kvs_name + '.fshtbkvs'
)
# build all file paths
if self.__all_file_paths == []:
self.__build_all_file_paths()
# write all key value pairs to export file
try:
with open(file, 'w') as f_export:
for f in self.__all_file_paths:
data = self.__load_dict_from_json_file(f)
for key, value in data.items():
f_export.write(
json.dumps(
{key: value},
ensure_ascii=False
)
+ '\n'
)
f_export.close()
except:
raise OSError(
"Not able to write kvs export to file: "
+ str(file)
)
return 1
def get_entries(self):
return self.__entries
def get_kvs_name(self):
return self.__kvs_name
def get_max_depth(self):
return self.__max_depth
def get_size_of_kvs(self):
"""
Returns the size of all json files used for storing data in megabytes
"""
kvs_size_in_megabytes = 0.0
if self.__all_file_paths == []:
self.__build_all_file_paths()
for f in self.__all_file_paths:
if not os.path.exists(f):
self.maintain_kvs()
kvs_size_in_megabytes += (
os.path.getsize(f) / 1000 / 1000
)
return round(kvs_size_in_megabytes, 6)
def import_kvs(self, file=''):
"""
Imports .fshtbkvs file into the kvs
"""
if file == '':
raise ValueError("file must not be empty")
if not os.path.exists(file):
raise ValueError("file does not exist")
# get all key value pairs line by line and add it to the kvs
with open(file, 'r') as f:
while True:
line = f.readline()
if not line:
break
if line.strip() == '':
continue
try:
for key, value in json.loads(line.strip()).items():
self.write(key, value)
except:
continue
return 1
def maintain_kvs(self):
"""
Rebuilds broken .json files and, creates missing files and folders
and creates an updated meta file
"""
# build all paths
self.__build_all_paths()
# cleanup all json files
self.__entries = 0
for f in self.__all_file_paths:
data = self.__load_dict_from_json_file(f)
data_clean = {}
for key, value in data.items():
try:
self.__validate_key(key)
self.__validate_value(value)
data_clean[key] = value
except:
continue
data_written = self.__save_dict_to_json_file(f, data_clean)
if not data_written:
return -1
self.__entries += len(data_clean)
self.__create_meta_file()
return 1
def read(self, key):
"""
Returns the entry with the key 'key' from the kvs or 'None', if no entry
exists
"""
self.__validate_key(key)
key = self.__process_key(key)
file = self.__get_file_by_key(key)
data = self.__load_dict_from_json_file(file)
if not key in data:
return None
value = data[key]
self.__validate_value(value)
return value
def wipe_kvs(self):
"""
Deletes every entry from the kvs and creates an updated meta file
"""
# delete meta file for auto maintainance, if wiping fails
os.remove(self.__meta_file)
# make sure, all files and folders exist
self.__build_all_paths()
# wipe all json files
for f in self.__all_file_paths:
self.__save_dict_to_json_file(f, {})
self.__entries = 0
return self.__create_meta_file()
def write(self, key, value):
"""
Adds (or updates) an entry with the key 'key' and the value 'value'
"""
self.__validate_key(key)
self.__validate_value(value)
key = self.__process_key(key)
file = self.__get_file_by_key(key)
key_existed = False
data = self.__load_dict_from_json_file(file)
if key in data:
key_existed = True
data[key] = value
data_written = self.__save_dict_to_json_file(file, data)
if not data_written:
return -1
if not key_existed:
self.__entries += 1
self.__create_meta_file()
return 1
def __build_all_paths(self):
"""
Gathers all file/folder paths and creates those when missing
"""
# build all file paths
if self.__all_file_paths == []:
self.__build_all_file_paths()
# build all folder paths
if self.__all_folder_paths == []:
self.__build_all_folder_paths()
# create all folders
for f in self.__all_folder_paths:
os.makedirs(f, exist_ok=True)
# create all files
for f in self.__all_file_paths:
if not os.path.exists(f):
self.__save_dict_to_json_file(f, {})
def __build_all_file_paths(self):
"""
Calculates all json file paths
"""
# build all file paths
def build_all_file_paths(r_root_dir, r_current_depth=1):
# build all file paths recursivly
r_chars = [c for c in '0123456789abcdef']
r_paths = []
# base case: return with list of files
if r_current_depth == self.__max_depth:
return [
(os.path.join(r_root_dir, c + '.json')) for c in r_chars
]
# recursion: go one step further in the filesystem
for c in r_chars:
for p in build_all_file_paths(
os.path.join(r_root_dir, c),
r_current_depth + 1
):
r_paths.append(p)
return r_paths
self.__all_file_paths = build_all_file_paths(self.__root_dir)
def __build_all_folder_paths(self):
"""
Calculates all folder paths
"""
# iterate over all files an extract their folders
for f in self.__all_file_paths:
path_tmp = os.path.split(f)[0]
while path_tmp != self.__root_dir:
if not path_tmp in self.__all_folder_paths:
self.__all_folder_paths.append(path_tmp)
path_tmp = os.path.dirname(path_tmp)
self.__all_folder_paths = sorted(self.__all_folder_paths)
def __create_meta_file(self):
"""
Creates the kvs meta file
"""
meta = {
'kvs_name': self.__kvs_name,
'max_depth': self.__max_depth,
'entries': self.__entries
}
return self.__save_dict_to_json_file(self.__meta_file, meta)
def __get_file_by_key(self, key):
"""
Calculates the corresponding json file for 'key'
"""
file = self.__root_dir
chars = key[:(self.__max_depth - 1)]
for c in chars:
file = os.path.join(file, c)
file = os.path.join(
file,
key[(self.__max_depth - 1):self.__max_depth] + '.json'
)
return file
def __load_dict_from_json_file(self, path_to_file):
"""
Returns the content of the json file 'path_to_file' as dict or an empty
dict, when something went wrong. If something went wrong, maintain_kvs()
gets called (the meta file is an exception here).
"""
try:
with open(path_to_file, 'r', encoding='UTF-8') as f:
data_as_dict = json.load(f)
f.close()
return data_as_dict
except:
self.__save_dict_to_json_file(path_to_file, {})
if not path_to_file == self.__meta_file:
self.maintain_kvs()
return {}
def __load_meta_file(self):
"""
Reads the meta file or rebuilds it, if it is unreadable (broken/lost)
"""
def load_max_depth(meta):
if not 'max_depth' in meta:
return False
if not isinstance(meta['max_depth'], int):
return False
if not meta['max_depth'] in range(1, 7):
return False
self.__max_depth = meta['max_depth']
return True
def load_entries(meta):
if not 'entries' in meta:
return False
if not isinstance(meta['max_depth'], int):
return False
if not meta['entries'] >= 0:
return False
self.__entries = meta['entries']
return True
meta = self.__load_dict_from_json_file(self.__meta_file)
if meta == {}:
meta_file_restored = self.__restore_meta_file()
if not meta_file_restored:
return False
self.maintain_kvs()
meta = self.__load_dict_from_json_file(self.__meta_file)
if not load_max_depth(meta):
return False
if not load_entries(meta):
return False
return True
def __process_key(self, key):
"""
Processes the key 'key' to match the filesystem based hash table
"""
if len(key) < self.__max_depth:
return self.__str_to_sha256sum(key)
char_whitelist = [c for c in '0123456789abcdef']
for c in key:
if c not in char_whitelist:
return self.__str_to_sha256sum(key)
return key
def __restore_meta_file(self):
"""
Tries to restore an broken or lost meta file by guessing the 'max_depth'
"""
def get_max_depth(r_root_dir, r_current_depth=1):
# build all file paths recursivly
r_chars = [c for c in '0123456789abcdef']
# base case 1: r_current_depth > 7
if r_current_depth > 7:
return -1
# base case 2: json file found
for c in r_chars:
file = os.path.join(r_root_dir, c + '.json')
if os.path.exists(file):
return r_current_depth
# recursion: go one step further in the filesystem
for c in r_chars:
max_depth_guess = get_max_depth(
os.path.join(r_root_dir, c),
r_current_depth + 1
)
if max_depth_guess > -1:
return max_depth_guess
return -1
max_depth = get_max_depth(self.__root_dir)
if max_depth == -1:
return False
self.__max_depth = max_depth
return self.__create_meta_file()
def __save_dict_to_json_file(self, path_to_file, data_as_dict):
"""
Tries to write the dict 'data_as_dict' to the json file 'path_to_file'
"""
try:
str_to_write = json.dumps(
data_as_dict,
ensure_ascii=False
)
with open(path_to_file, 'w', encoding='UTF-8') as f:
f.write(str_to_write)
f.close()
except:
return False
return True
def __str_to_sha256sum(self, s):
"""
Returns a hexdigit sha256sum as string for the sring 's'
"""
return hashlib.sha256(bytes(s, 'utf-8')).hexdigest()
def __validate_key(self, key):
"""
Validates, if the key 'key' can be used for the kvs
"""
if not isinstance(key, str):
raise ValueError("key must be of type <class 'str'>")
if key == '':
raise ValueError("key must not be empty")
if len(key) > 64:
raise ValueError("key max length is 64 characters")
def __validate_value(self, value):
"""
Validates, if the value 'value' can be used for the kvs
"""
if isinstance(value, list):
for v in value:
self.__validate_value(v)
return
if isinstance(value, dict):
for k, v in value.items():
self.__validate_key(k)
self.__validate_value(v)
return
if not isinstance(value, (str, int, float, bool)):
raise ValueError(
"value must be of type"
+ " <class 'str'>, <class 'int'>, <class 'float'>"
+ " or <class 'bool'>"
)