diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 97a63515..3e3866d8 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -229,7 +229,16 @@ def schedule_media_servers_update(): rescan_media_server(str(mediaserver.pk)) -def wait_for_errors(model, /, *, task_name=None): +def contains_http429(q, task_id, /): + from huey.exceptions import TaskException + try: + q.result(preserve=True, id=task_id) + except TaskException as e: + return True if 'HTTPError 429: Too Many Requests' in str(e) else False + return False + + +def wait_for_errors(model, /, *, queue_name=None, task_name=None): if task_name is None: task_name=tuple(( 'sync.tasks.download_media', @@ -252,13 +261,28 @@ def wait_for_errors(model, /, *, task_name=None): ) for task in tasks: update_task_status(task, 'paused (429)') - - delay = 10 * tqs.count() + + total_count = tqs.count() + if queue_name: + from django_huey import get_queue + q = get_queue(queue_name) + total_count += sum([ 1 if contains_http429(q, k) else 0 for k in q.all_results() ]) + delay = 10 * total_count time_str = seconds_to_timestr(delay) log.info(f'waiting for errors: 429 ({time_str}): {model}') - time.sleep(delay) + db_down_path = Path('/run/service/tubesync-db-worker/down') + fs_down_path = Path('/run/service/tubesync-fs-worker/down') + while delay > 0: + # this happenes when the container is shutting down + # do not prevent that while we are delaying a task + if db_down_path.exists() and fs_down_path.exists(): + break + time.sleep(5) + delay -= 5 for task in tasks: update_task_status(task, None) + if delay > 0: + raise BgTaskWorkerError(_('queue worker stopped')) @db_task(queue=Val(TaskQueue.FS)) @@ -623,7 +647,11 @@ def download_media_metadata(media_id): log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.') return source = media.source - wait_for_errors(media, task_name='sync.tasks.download_media_metadata') + wait_for_errors( + media, + queue_name=Val(TaskQueue.LIMIT), + task_name='sync.tasks.download_media_metadata', + ) try: metadata = media.index_metadata() except YouTubeError as e: @@ -776,7 +804,11 @@ def download_media(media_id, override=False): # should raise an exception to avoid this return - wait_for_errors(media, task_name='sync.tasks.download_media') + wait_for_errors( + media, + queue_name=Val(TaskQueue.LIMIT), + task_name='sync.tasks.download_media', + ) filepath = media.filepath container = format_str = None log.info(f'Downloading media: {media} (UUID: {media.pk}) to: "{filepath}"') @@ -902,6 +934,10 @@ def refresh_formats(media_id): except Media.DoesNotExist as e: raise CancelExecution(_('no such media'), retry=False) from e else: + wait_for_errors( + media, + queue_name=Val(TaskQueue.LIMIT), + ) if media.refresh_formats: save_model(media)