mirror of
https://github.com/meeb/tubesync.git
synced 2025-06-22 04:56:35 +00:00
Merge 1e5e27952e
into f5a71592fc
This commit is contained in:
commit
90f7e04bbf
@ -59,7 +59,6 @@ def map_task_to_instance(task):
|
|||||||
because UUID's are incompatible with background_task's "creator" feature.
|
because UUID's are incompatible with background_task's "creator" feature.
|
||||||
'''
|
'''
|
||||||
TASK_MAP = {
|
TASK_MAP = {
|
||||||
'sync.tasks.migrate_to_metadata': Media,
|
|
||||||
'sync.tasks.index_source_task': Source,
|
'sync.tasks.index_source_task': Source,
|
||||||
'sync.tasks.check_source_directory_exists': Source,
|
'sync.tasks.check_source_directory_exists': Source,
|
||||||
'sync.tasks.download_media_thumbnail': Media,
|
'sync.tasks.download_media_thumbnail': Media,
|
||||||
@ -324,7 +323,7 @@ def save_db_batch(qs, objs, fields, /):
|
|||||||
return num_updated
|
return num_updated
|
||||||
|
|
||||||
|
|
||||||
@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.DB), remove_existing_tasks=True)
|
@db_task(delay=60, priority=80, retries=10, retry_delay=60, queue=Val(TaskQueue.DB))
|
||||||
def migrate_to_metadata(media_id):
|
def migrate_to_metadata(media_id):
|
||||||
try:
|
try:
|
||||||
media = Media.objects.get(pk=media_id)
|
media = Media.objects.get(pk=media_id)
|
||||||
@ -332,7 +331,7 @@ def migrate_to_metadata(media_id):
|
|||||||
# Task triggered but the media no longer exists, do nothing
|
# Task triggered but the media no longer exists, do nothing
|
||||||
log.error(f'Task migrate_to_metadata(pk={media_id}) called but no '
|
log.error(f'Task migrate_to_metadata(pk={media_id}) called but no '
|
||||||
f'media exists with ID: {media_id}')
|
f'media exists with ID: {media_id}')
|
||||||
raise InvalidTaskError(_('no such media')) from e
|
raise CancelExecution(_('no such media'), retry=False) from e
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = Metadata.objects.get(
|
data = Metadata.objects.get(
|
||||||
@ -341,7 +340,7 @@ def migrate_to_metadata(media_id):
|
|||||||
key=media.key,
|
key=media.key,
|
||||||
)
|
)
|
||||||
except Metadata.DoesNotExist as e:
|
except Metadata.DoesNotExist as e:
|
||||||
raise InvalidTaskError(_('no indexed data to migrate to metadata')) from e
|
raise CancelExecution(_('no indexed data to migrate to metadata'), retry=False) from e
|
||||||
|
|
||||||
video = data.value
|
video = data.value
|
||||||
fields = lambda f, m: m.get_metadata_field(f)
|
fields = lambda f, m: m.get_metadata_field(f)
|
||||||
@ -363,11 +362,21 @@ def migrate_to_metadata(media_id):
|
|||||||
|
|
||||||
@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=False)
|
@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=False)
|
||||||
def wait_for_database_queue():
|
def wait_for_database_queue():
|
||||||
|
from common.huey import h_q_tuple
|
||||||
|
queue_name = Val(TaskQueue.DB)
|
||||||
|
consumer_down_path = Path(f'/run/service/huey-{queue_name}/down')
|
||||||
worker_down_path = Path('/run/service/tubesync-db-worker/down')
|
worker_down_path = Path('/run/service/tubesync-db-worker/down')
|
||||||
while Task.objects.unlocked(timezone.now()).filter(queue=Val(TaskQueue.DB)).count() > 0:
|
total_count = 1
|
||||||
time.sleep(5)
|
while 0 < total_count:
|
||||||
|
if consumer_down_path.exists() and consumer_down_path.is_file():
|
||||||
|
raise BgTaskWorkerError(_('queue consumer stopped'))
|
||||||
if worker_down_path.exists() and worker_down_path.is_file():
|
if worker_down_path.exists() and worker_down_path.is_file():
|
||||||
raise BgTaskWorkerError(_('queue worker stopped'))
|
raise BgTaskWorkerError(_('queue worker stopped'))
|
||||||
|
time.sleep(5)
|
||||||
|
status_dict = h_q_tuple(queue_name)[2]
|
||||||
|
total_count = Task.objects.unlocked(timezone.now()).filter(queue=queue_name).count()
|
||||||
|
total_count += status_dict.get('pending', (0,))[0]
|
||||||
|
total_count += status_dict.get('scheduled', (0,))[0]
|
||||||
|
|
||||||
|
|
||||||
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
|
||||||
@ -482,11 +491,7 @@ def index_source_task(source_id):
|
|||||||
data.retrieved = source.last_crawl
|
data.retrieved = source.last_crawl
|
||||||
data.value = video
|
data.value = video
|
||||||
db_batch_data.append(data)
|
db_batch_data.append(data)
|
||||||
vn_fmt = _('Updating metadata from indexing results for: "{}": {}')
|
migrate_to_metadata(str(media.pk))
|
||||||
migrate_to_metadata(
|
|
||||||
str(media.pk),
|
|
||||||
verbose_name=vn_fmt.format(media.key, media.name),
|
|
||||||
)
|
|
||||||
if not new_media:
|
if not new_media:
|
||||||
# update the existing media
|
# update the existing media
|
||||||
for key, value in media_defaults.items():
|
for key, value in media_defaults.items():
|
||||||
|
Loading…
Reference in New Issue
Block a user