diff --git a/tubesync/common/utils.py b/tubesync/common/utils.py index c4798943..fdf04352 100644 --- a/tubesync/common/utils.py +++ b/tubesync/common/utils.py @@ -8,11 +8,22 @@ import string import time from datetime import datetime from django.core.paginator import Paginator +from functools import partial +from operator import attrgetter, itemgetter +from pathlib import Path from urllib.parse import urlunsplit, urlencode, urlparse from yt_dlp.utils import LazyList from .errors import DatabaseConnectionError +def directory_and_stem(arg_path, /): + filepath = Path(arg_path) + stem = Path(filepath.stem) + while stem.suffixes and '' != stem.suffix: + stem = Path(stem.stem) + return (filepath.parent, str(stem),) + + def getenv(key, default=None, /, *, integer=False, string=True): ''' Guarantees a returned type from calling `os.getenv` @@ -48,6 +59,51 @@ def getenv(key, default=None, /, *, integer=False, string=True): return r +def glob_quote(filestr, /): + _glob_specials = { + '?': '[?]', + '*': '[*]', + '[': '[[]', + ']': '[]]', # probably not needed, but it won't hurt + } + + if not isinstance(filestr, str): + raise TypeError(f'expected a str, got "{type(filestr)}"') + + return filestr.translate(str.maketrans(_glob_specials)) + + +def list_of_dictionaries(arg_list, /, *, arg_function=lambda x: x): + assert callable(arg_function) + if isinstance(arg_list, list): + _map_func = partial(lambda f, d: f(d) if isinstance(d, dict) else d, arg_function) + return (True, list(map(_map_func, arg_list)),) + return (False, arg_list,) + + +def mkdir_p(arg_path, /, *, mode=0o777): + ''' + Reminder: mode only affects the last directory + ''' + dirpath = Path(arg_path) + return dirpath.mkdir(mode=mode, parents=True, exist_ok=True) + + +def multi_key_sort(iterable, specs, /, use_reversed=False, *, item=False, attr=False, key_func=None): + result = list(iterable) + if key_func is None: + # itemgetter is the default + if item or not (item or attr): + key_func = itemgetter + elif attr: + key_func = attrgetter + for key, reverse in reversed(specs): + result.sort(key=key_func(key), reverse=reverse) + if use_reversed: + return list(reversed(result)) + return result + + def parse_database_connection_string(database_connection_string): ''' Parses a connection string in a URL style format, such as: @@ -180,6 +236,15 @@ def json_serial(obj): raise TypeError(f'Type {type(obj)} is not json_serial()-able') +def seconds_to_timestr(seconds): + seconds = seconds % (24 * 3600) + hour = seconds // 3600 + seconds %= 3600 + minutes = seconds // 60 + seconds %= 60 + return '{:02d}:{:02d}:{:02d}'.format(hour, minutes, seconds) + + def time_func(func): def wrapper(*args, **kwargs): start = time.perf_counter()