From 1e5e27952efd71fdda99e450547fd685bbaecb41 Mon Sep 17 00:00:00 2001 From: tcely Date: Mon, 16 Jun 2025 14:34:52 -0400 Subject: [PATCH] Migrate `migrate_to_metadata` to `huey` --- tubesync/sync/tasks.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index 97a63515..1e990518 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -59,7 +59,6 @@ def map_task_to_instance(task): because UUID's are incompatible with background_task's "creator" feature. ''' TASK_MAP = { - 'sync.tasks.migrate_to_metadata': Media, 'sync.tasks.index_source_task': Source, 'sync.tasks.check_source_directory_exists': Source, 'sync.tasks.download_media_thumbnail': Media, @@ -324,7 +323,7 @@ def save_db_batch(qs, objs, fields, /): return num_updated -@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.DB), remove_existing_tasks=True) +@db_task(delay=60, priority=80, retries=10, retry_delay=60, queue=Val(TaskQueue.DB)) def migrate_to_metadata(media_id): try: media = Media.objects.get(pk=media_id) @@ -332,7 +331,7 @@ def migrate_to_metadata(media_id): # Task triggered but the media no longer exists, do nothing log.error(f'Task migrate_to_metadata(pk={media_id}) called but no ' f'media exists with ID: {media_id}') - raise InvalidTaskError(_('no such media')) from e + raise CancelExecution(_('no such media'), retry=False) from e try: data = Metadata.objects.get( @@ -341,7 +340,7 @@ def migrate_to_metadata(media_id): key=media.key, ) except Metadata.DoesNotExist as e: - raise InvalidTaskError(_('no indexed data to migrate to metadata')) from e + raise CancelExecution(_('no indexed data to migrate to metadata'), retry=False) from e video = data.value fields = lambda f, m: m.get_metadata_field(f) @@ -363,11 +362,21 @@ def migrate_to_metadata(media_id): @background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=False) def wait_for_database_queue(): + from common.huey import h_q_tuple + queue_name = Val(TaskQueue.DB) + consumer_down_path = Path(f'/run/service/huey-{queue_name}/down') worker_down_path = Path('/run/service/tubesync-db-worker/down') - while Task.objects.unlocked(timezone.now()).filter(queue=Val(TaskQueue.DB)).count() > 0: - time.sleep(5) + total_count = 1 + while 0 < total_count: + if consumer_down_path.exists() and consumer_down_path.is_file(): + raise BgTaskWorkerError(_('queue consumer stopped')) if worker_down_path.exists() and worker_down_path.is_file(): raise BgTaskWorkerError(_('queue worker stopped')) + time.sleep(5) + status_dict = h_q_tuple(queue_name)[2] + total_count = Task.objects.unlocked(timezone.now()).filter(queue=queue_name).count() + total_count += status_dict.get('pending', (0,))[0] + total_count += status_dict.get('scheduled', (0,))[0] @background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) @@ -482,11 +491,7 @@ def index_source_task(source_id): data.retrieved = source.last_crawl data.value = video db_batch_data.append(data) - vn_fmt = _('Updating metadata from indexing results for: "{}": {}') - migrate_to_metadata( - str(media.pk), - verbose_name=vn_fmt.format(media.key, media.name), - ) + migrate_to_metadata(str(media.pk)) if not new_media: # update the existing media for key, value in media_defaults.items():