From 60b0fc644035daf903ac1e475c9fcfb338059c32 Mon Sep 17 00:00:00 2001 From: tcely Date: Mon, 16 Jun 2025 13:01:45 -0400 Subject: [PATCH 1/3] Teach `wait_for_errors` about `huey` results --- tubesync/sync/tasks.py | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 97a63515..0106ed53 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -229,7 +229,16 @@ def schedule_media_servers_update(): rescan_media_server(str(mediaserver.pk)) -def wait_for_errors(model, /, *, task_name=None): +def contains_http429(q, task_id, /): + from huey.exceptions import TaskException + try: + result = q.result(preserve=True, id=task_id) + except TaskException as e: + return True if 'HTTPError 429: Too Many Requests' in str(e) else False + return False + + +def wait_for_errors(model, /, *, queue_name=None, task_name=None): if task_name is None: task_name=tuple(( 'sync.tasks.download_media', @@ -252,8 +261,13 @@ def wait_for_errors(model, /, *, task_name=None): ) for task in tasks: update_task_status(task, 'paused (429)') - - delay = 10 * tqs.count() + + total_count = tqs.count() + if queue_name: + from django_huey import get_queue + q = get_queue(queue_name) + total_count += sum([ 1 if contains_http429(q, k) else 0 for k in q.all_results() ]) + delay = 10 * total_count time_str = seconds_to_timestr(delay) log.info(f'waiting for errors: 429 ({time_str}): {model}') time.sleep(delay) @@ -623,7 +637,11 @@ def download_media_metadata(media_id): log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') return source = media.source - wait_for_errors(media, task_name='sync.tasks.download_media_metadata') + wait_for_errors( + media, + queue_name=Val(TaskQueue.LIMIT), + task_name='sync.tasks.download_media_metadata', + ) try: metadata = media.index_metadata() except YouTubeError as e: @@ -776,7 +794,11 @@ def download_media(media_id, override=False): # should raise an exception to avoid this return - wait_for_errors(media, task_name='sync.tasks.download_media') + wait_for_errors( + media, + queue_name=Val(TaskQueue.LIMIT), + task_name='sync.tasks.download_media', + ) filepath = media.filepath container = format_str = None log.info(f'Downloading media: {media} (UUID: {media.pk}) to: "{filepath}"') @@ -902,6 +924,10 @@ def refresh_formats(media_id): except Media.DoesNotExist as e: raise CancelExecution(_('no such media'), retry=False) from e else: + wait_for_errors( + media, + queue_name=Val(TaskQueue.LIMIT), + ) if media.refresh_formats: save_model(media) From 0486329a8ba84d77fd0b4f4db8c7a39ce75223a6 Mon Sep 17 00:00:00 2001 From: tcely Date: Mon, 16 Jun 2025 13:05:38 -0400 Subject: [PATCH 2/3] fixup: remove assignment --- tubesync/sync/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 0106ed53..adf5db71 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -232,7 +232,7 @@ def schedule_media_servers_update(): def contains_http429(q, task_id, /): from huey.exceptions import TaskException try: - result = q.result(preserve=True, id=task_id) + q.result(preserve=True, id=task_id) except TaskException as e: return True if 'HTTPError 429: Too Many Requests' in str(e) else False return False From 983679964b4a56f560aeff8441dd996fba3a7413 Mon Sep 17 00:00:00 2001 From: tcely Date: Mon, 16 Jun 2025 22:19:42 -0400 Subject: [PATCH 3/3] Abort `wait_for_errors` during container shutdown --- tubesync/sync/tasks.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index adf5db71..3e3866d8 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -270,9 +270,19 @@ def wait_for_errors(model, /, *, queue_name=None, task_name=None): delay = 10 * total_count time_str = seconds_to_timestr(delay) log.info(f'waiting for errors: 429 ({time_str}): {model}') - time.sleep(delay) + db_down_path = Path('/run/service/tubesync-db-worker/down') + fs_down_path = Path('/run/service/tubesync-fs-worker/down') + while delay > 0: + # this happenes when the container is shutting down + # do not prevent that while we are delaying a task + if db_down_path.exists() and fs_down_path.exists(): + break + time.sleep(5) + delay -= 5 for task in tasks: update_task_status(task, None) + if delay > 0: + raise BgTaskWorkerError(_('queue worker stopped')) @db_task(queue=Val(TaskQueue.FS))