Copy generic functions from sync.utils

This commit is contained in:
tcely 2025-05-16 03:33:33 -04:00 committed by GitHub
parent 7287bc1dee
commit aedaa6f7b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -8,11 +8,22 @@ import string
import time
from datetime import datetime
from django.core.paginator import Paginator
from functools import partial
from operator import attrgetter, itemgetter
from pathlib import Path
from urllib.parse import urlunsplit, urlencode, urlparse
from yt_dlp.utils import LazyList
from .errors import DatabaseConnectionError
def directory_and_stem(arg_path, /):
filepath = Path(arg_path)
stem = Path(filepath.stem)
while stem.suffixes and '' != stem.suffix:
stem = Path(stem.stem)
return (filepath.parent, str(stem),)
def getenv(key, default=None, /, *, integer=False, string=True):
'''
Guarantees a returned type from calling `os.getenv`
@ -48,6 +59,51 @@ def getenv(key, default=None, /, *, integer=False, string=True):
return r
def glob_quote(filestr, /):
_glob_specials = {
'?': '[?]',
'*': '[*]',
'[': '[[]',
']': '[]]', # probably not needed, but it won't hurt
}
if not isinstance(filestr, str):
raise TypeError(f'expected a str, got "{type(filestr)}"')
return filestr.translate(str.maketrans(_glob_specials))
def list_of_dictionaries(arg_list, /, *, arg_function=lambda x: x):
assert callable(arg_function)
if isinstance(arg_list, list):
_map_func = partial(lambda f, d: f(d) if isinstance(d, dict) else d, arg_function)
return (True, list(map(_map_func, arg_list)),)
return (False, arg_list,)
def mkdir_p(arg_path, /, *, mode=0o777):
'''
Reminder: mode only affects the last directory
'''
dirpath = Path(arg_path)
return dirpath.mkdir(mode=mode, parents=True, exist_ok=True)
def multi_key_sort(iterable, specs, /, use_reversed=False, *, item=False, attr=False, key_func=None):
result = list(iterable)
if key_func is None:
# itemgetter is the default
if item or not (item or attr):
key_func = itemgetter
elif attr:
key_func = attrgetter
for key, reverse in reversed(specs):
result.sort(key=key_func(key), reverse=reverse)
if use_reversed:
return list(reversed(result))
return result
def parse_database_connection_string(database_connection_string):
'''
Parses a connection string in a URL style format, such as:
@ -180,6 +236,15 @@ def json_serial(obj):
raise TypeError(f'Type {type(obj)} is not json_serial()-able')
def seconds_to_timestr(seconds):
seconds = seconds % (24 * 3600)
hour = seconds // 3600
seconds %= 3600
minutes = seconds // 60
seconds %= 60
return '{:02d}:{:02d}:{:02d}'.format(hour, minutes, seconds)
def time_func(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()