from typing import Any, Union from schema import Schema, Schema, SchemaError from contextlib import suppress from os import access, R_OK from typing import Callable def IsReadable(filename: str): """Returns True if the file is readable""" return access(filename, R_OK) def IsFilewithExt(*exts: str): """Returns a function that retunts True if the file extension is consistent""" def checkfilename(filename: str): return any(filename.endswith(f'.{ext}' for ext in exts)) return checkfilename def LoadFileWithExt(*, key: str=None,**kwargs: Callable): """Returns a function that retunts True if the file extension is consistent""" def checkfilename(filename: Union[str, dict]): if key is not None: filename = filename[key] for ext, loader in kwargs.items(): if filename.endswith(f'.{ext}'): return loader(filename) return False return checkfilename from yaml import load, Loader def LoadYaml(fname: str): return load(fname, Loader) from dictwrapper.dictwrapper import DictWrapper class NestedSchema(object): __slots__ = ('_schema', '_processdicts') _schema: Union[Schema,object] _processdicts: bool def __init__(self, /, schema: Union[Schema,object], *, processdicts: bool=False): self._schema = schema self._processdicts = processdicts def validate(self, data: Any) -> Any: if not isinstance(data, dict): return self._schema.validate(data) if self._processdicts: return { key: self._process_dict((key,), subdata) for key, subdata in data.items() } dtin = DictWrapper(data) dtout = DictWrapper({}) for key, subdata in dtin.walkitems(): dtout[key] = self._process_element(key, subdata) return dtout.object def _process_element(self, key, subdata: Any) -> Any: try: return self._schema.validate(subdata, _is_event_schema=False) except SchemaError as err: key = ".".join(str(k) for k in key) raise SchemaError(f'Key "{key}" has invalid value "{subdata}":\n{err.args[0]}') from err def _process_dict(self, key, data: Any) -> Any: if not isinstance(data, dict): return self._schema.validate(data) with suppress(SchemaError): return self._schema.validate(data, _is_event_schema=False) return { subkey: self._process_dict(key+(subkey,), subdata) for subkey, subdata in data.items() }