Source code for cfg4py.core

"""Main module."""
import logging.config
import os
import re
from collections.abc import Mapping
from io import StringIO

from watchdog.events import FileModifiedEvent, FileSystemEventHandler
from watchdog.observers import Observer

from apscheduler.schedulers.background import BackgroundScheduler
from cfg4py.config import Config
from ruamel.yaml import YAML
from ruamel.yaml.error import YAMLError

logger = logging.getLogger(__name__)

envar = "__cfg4py_server_role__"
_scheduler = BackgroundScheduler()
_remote_fetcher = None
_dump_on_change: bool = False

yaml = YAML(typ="safe")

# handle local configuration file change
_local_observer = None
_cfg_obj = Config()

_cfg_local = {}
_cfg_remote = {}
_strict = True

_local_config_dir: str = ""


[docs]class RemoteConfigFetcher:
[docs] def fetch(self) -> str: raise NotImplementedError("sub class must implement this!") # pragma: no cover
[docs]class RedisConfigFetcher(RemoteConfigFetcher): def __init__( self, key: str, host: str = "localhost", port: int = 6379, db: int = 0, **kwargs ): self.key = key from redis import StrictRedis # type: ignore self.client = StrictRedis( host, port=port, db=db, decode_responses="utf-8", **kwargs )
[docs] def fetch(self) -> dict: settings = None try: logger.info("fetching configuration from redis server") settings = self.client.get(self.key) return _load_and_replace_envar(settings) except Exception as e: # pragma: no cover logger.exception(e) return {}
[docs]class LocalConfigChangeHandler(FileSystemEventHandler):
[docs] def dispatch(self, event): if isinstance(event, FileModifiedEvent): _load_from_local_file()
def _mixin(d, u): # noinspection SpellCheckingInspection """ if value x in "keyx:valuex" pair is list, it's will be replaced :param d: :param u: :return: """ u = u or {} d = d or {} for k, v in u.items(): if isinstance(v, Mapping): d[k] = _mixin(d.get(k, {}), v) else: d[k] = v return d def _to_obj(obj, conf: dict): for key, value in conf.items(): if type(value) == dict: _obj = Config() setattr(obj, key, _obj) _to_obj(_obj, value) else: setattr(obj, key, value) def _refresh(): global _cfg_local, _cfg_remote _cfg_remote = _remote_fetcher.fetch() merged = _mixin(_cfg_remote, _cfg_local) update_config(merged)
[docs]def enable_logging(level=logging.INFO, log_file=None, file_size=10, file_count=7): """ Enable basic log function for the application if log_file is None, then it'll provide console logging, otherwise, the console logging is turned off, all events will be logged into the provided file. Args: level: the log level, one of logging.DEBUG, logging.INFO, logging.WARNING, logging.Error log_file: the absolute file path for the log. file_size: file size in MB unit file_count: how many backup files leaved in disk Returns: None """ assert file_count > 0 assert file_size > 0 from logging import handlers formatter = logging.Formatter( "%(asctime)s %(levelname)-1.1s %(filename)s:%(lineno)s | %(message)s" ) _logger = logging.getLogger() _logger.setLevel(level) if log_file is None: console = logging.StreamHandler() console.setFormatter(formatter) _logger.addHandler(console) else: file_dir = os.path.dirname(log_file) os.makedirs(file_dir, exist_ok=True) rotating_file = handlers.RotatingFileHandler( log_file, maxBytes=1024 * 1024 * file_size, backupCount=file_count ) rotating_file.setFormatter(formatter) _logger.addHandler(rotating_file)
[docs]def config_remote_fetcher(fetcher: RemoteConfigFetcher, interval: int = 300): """ config a remote configuration fetcher, which will pull the settings on every `refresh_interval` Args: fetcher: sub class of `RemoteConfigFetcher` interval: how long should cfg4py to pull the configuration from remote Returns: """ global _remote_fetcher _remote_fetcher = fetcher _scheduler.add_job(_refresh, "interval", seconds=interval) _scheduler.start()
[docs]def build(save_to: str): global _cfg_obj with open(os.path.join(os.path.dirname(__file__), "config.py"), "r") as origin: lines = origin.readlines() lines.append("\n") no_instance = [ f"{' ' * 4}def __init__(self):\n", f"{' ' * 8}raise TypeError(\"Do NOT instantiate this class\")\n", ] lines.extend(no_instance) with open(save_to, encoding="utf-8", mode="w") as f: lines = _schema_from_obj_(_cfg_obj, lines) f.writelines("".join(lines))
def _schema_from_obj_(obj, lines, depth: int = 0): """ build a python file for auto-complete. """ depth += 1 if isinstance(obj, Config): for name in obj.__dict__.copy().keys(): if name.startswith("__"): continue child = getattr(obj, name) if callable(child): continue if isinstance(child, Config): lines.append("\n") lines.append(f"{' ' * 4 * depth}class {name}:\n") _schema_from_obj_(child, lines, depth) else: _type = f"{type(child)}" _type = re.sub(r".*\'(.*)\'>", r"\1", _type) if _type != "NoneType": lines.append(f"{' ' * 4 * depth}{name}: Optional[{_type}] = None\n") else: lines.append(f"{' ' * 4 * depth}{name} = None\n") else: print(obj) return lines
[docs]def init(local_cfg_path: str = None, dump_on_change=True, strict=False): """ create cfg object. Args: local_cfg_path: the directory name where your configuration files exist dump_on_change: if configuration is updated, whether or not to dump them into log file Returns: """ global _local_config_dir, _dump_on_change, _remote_fetcher, _local_observer global _cfg_obj, _cfg_local, _cfg_remote global _strict _strict = strict _dump_on_change = dump_on_change if local_cfg_path: _local_config_dir = os.path.expanduser(local_cfg_path) # handle local configuration file change _local_observer = Observer() _local_observer.schedule( LocalConfigChangeHandler(), _local_config_dir, recursive=False ) _local_observer.start() _cfg_local = _load_from_local_file() update_config(_mixin(_cfg_remote, _cfg_local)) # todo: will this overwrite existing file occasionally? save_to = os.path.join(_local_config_dir, "schema.py") build(save_to) return _cfg_obj
[docs]def yaml_dump(conf, options=None): if options is None: options = {} string_stream = StringIO() try: yaml.dump(conf, string_stream, **options) output_str = string_stream.getvalue() finally: string_stream.close() return output_str
[docs]def update_config(conf: dict): global _cfg_obj if "logging" in conf: _process_logging_settings(conf["logging"]) del conf["logging"] if _dump_on_change: logger.info("configuration is\n%s", yaml_dump(conf)) _to_obj(_cfg_obj, conf) return _cfg_obj
def _process_logging_settings(conf: dict): logging.config.dictConfig(conf) def _guess_extension(): files = os.listdir(_local_config_dir) counter = {".yml": 0, ".yaml": 0} for f in files: _, ext = os.path.splitext(f) if ext == ".yml": counter[ext] += 1 elif ext == ".yaml": counter[ext] += 1 _max = 0 ext = "" for key in counter.keys(): if counter[key] > _max: _max = counter[key] ext = key if ext in [".yml", ".yaml"]: # noinspection PyPackageRequirements return ext else: msg = "No config files present, or file format is not yaml." raise FileNotFoundError(msg)
[docs]def get_config_dir(): return _local_config_dir
def _load_and_replace_envar(content: str): """parse content, replace placeholder with environment variables, and load with yaml Args: content (str): the content of configurations """ pattern = re.compile(r".*?\${(\w+)}.*?") match = pattern.findall(content) if match: replaced = content for g in match: replaced = replaced.replace( f"${{{g}}}", os.environ.get(g, f"ERROR_ENVAR_NOT_SET[{g}]") ) try: return yaml.load(replaced) except YAMLError as e: logger.error("failed to parse:%s\n", content) raise e try: return yaml.load(content) except YAMLError as e: logger.error("failed to parseļ¼š%s\n", content) raise e def _load_from_local_file() -> dict: """ read configuration hierarchically from disk Args: Returns: """ global _strict conf = {} role = os.getenv(envar, "") if role == "" and _strict: msg = f"You must config environment variables {envar} as one of" "'DEV, TEST, PRODUCTION'" raise EnvironmentError(msg) try: ext = _guess_extension() with open( os.path.join(_local_config_dir, f"defaults{ext}"), "r", encoding="utf-8" ) as base: conf = _load_and_replace_envar(base.read(-1)) if role == "PRODUCTION": with open( os.path.join(_local_config_dir, f"production{ext}"), "r", encoding="utf-8", ) as prod: _prod = _load_and_replace_envar(prod.read(-1)) _mixin(conf, _prod) elif role == "TEST": with open( os.path.join(_local_config_dir, f"test{ext}"), "r", encoding="utf-8" ) as test: _test = _load_and_replace_envar(test.read(-1)) _mixin(conf, _test) elif role == "DEV": with open( os.path.join(_local_config_dir, f"dev{ext}"), "r", encoding="utf-8" ) as dev: _dev = _load_and_replace_envar(dev.read(-1)) _mixin(conf, _dev) else: pass except FileNotFoundError as e: if e.filename.find("defaults") != -1: raise FileNotFoundError("Failed to find default configuration file") except Exception as e: logger.exception(e) return conf # noinspection PyUnusedLocal
[docs]def config_server_role(role: str): os.environ[envar] = role