Merge pull request #933 from tcely/patch-2

Adjust queue workers
This commit is contained in:
meeb 2025-04-13 22:21:31 +10:00 committed by GitHub
commit 8c729d0808
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 71 additions and 38 deletions

View File

@ -2,4 +2,5 @@
exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \
/usr/bin/python3 /app/manage.py process_tasks \
--queue database
--queue database --duration 86400 \
--sleep "30.${RANDOM}"

View File

@ -2,4 +2,5 @@
exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \
/usr/bin/python3 /app/manage.py process_tasks \
--queue filesystem
--queue filesystem --duration 43200 \
--sleep "20.${RANDOM}"

View File

@ -2,4 +2,5 @@
exec nice -n "${TUBESYNC_NICE:-1}" s6-setuidgid app \
/usr/bin/python3 /app/manage.py process_tasks \
--queue network
--queue network --duration 43200 \
--sleep "10.${RANDOM}"

View File

@ -17,7 +17,7 @@ from PIL import Image
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.uploadedfile import SimpleUploadedFile
from django.db import DatabaseError, IntegrityError
from django.db import reset_queries, DatabaseError, IntegrityError
from django.db.transaction import atomic
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
@ -123,8 +123,7 @@ def update_task_status(task, status):
else:
task.verbose_name = f'[{status}] {task._verbose_name}'
try:
with atomic():
task.save(update_fields={'verbose_name'})
task.save(update_fields={'verbose_name'})
except DatabaseError as e:
if 'Save with update_fields did not affect any rows.' == str(e):
pass
@ -202,25 +201,29 @@ def migrate_queues():
return qs.update(queue=Val(TaskQueue.NET))
@atomic(durable=False)
def schedule_media_servers_update():
with atomic():
# Schedule a task to update media servers
log.info(f'Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
for mediaserver in MediaServer.objects.all():
rescan_media_server(
str(mediaserver.pk),
priority=10,
verbose_name=verbose_name.format(mediaserver),
remove_existing_tasks=True,
)
# Schedule a task to update media servers
log.info(f'Scheduling media server updates')
verbose_name = _('Request media server rescan for "{}"')
for mediaserver in MediaServer.objects.all():
rescan_media_server(
str(mediaserver.pk),
verbose_name=verbose_name.format(mediaserver),
)
def cleanup_old_media():
with atomic():
for source in Source.objects.filter(delete_old_media=True, days_to_keep__gt=0):
delta = timezone.now() - timedelta(days=source.days_to_keep)
for media in source.media_source.filter(downloaded=True, download_date__lt=delta):
mqs = source.media_source.defer(
'metadata',
).filter(
downloaded=True,
download_date__lt=delta,
)
for media in mqs:
log.info(f'Deleting expired media: {source} / {media} '
f'(now older than {source.days_to_keep} days / '
f'download_date before {delta})')
@ -234,8 +237,12 @@ def cleanup_removed_media(source, videos):
if not source.delete_removed_media:
return
log.info(f'Cleaning up media no longer in source: {source}')
media_objects = Media.objects.filter(source=source)
for media in media_objects:
mqs = Media.objects.defer(
'metadata',
).filter(
source=source,
)
for media in mqs:
matching_source_item = [video['id'] for video in videos if video['id'] == media.key]
if not matching_source_item:
log.info(f'{media.name} is no longer in source, removing')
@ -244,11 +251,12 @@ def cleanup_removed_media(source, videos):
schedule_media_servers_update()
@background(schedule=dict(priority=10, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def index_source_task(source_id):
'''
Indexes media available from a Source object.
'''
reset_queries()
cleanup_completed_tasks()
# deleting expired media should happen any time an index task is requested
cleanup_old_media()
@ -322,7 +330,6 @@ def index_source_task(source_id):
verbose_name = _('Downloading metadata for "{}"')
download_media_metadata(
str(media.pk),
priority=20,
verbose_name=verbose_name.format(media.pk),
)
# Reset task.verbose_name to the saved value
@ -350,7 +357,7 @@ def check_source_directory_exists(source_id):
source.make_directory()
@background(schedule=dict(priority=5, run_at=10), queue=Val(TaskQueue.NET))
@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.NET))
def download_source_images(source_id):
'''
Downloads an image and save it as a local thumbnail attached to a
@ -400,7 +407,7 @@ def download_source_images(source_id):
log.info(f'Thumbnail downloaded for source with ID: {source_id} / {source}')
@background(schedule=dict(priority=20, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@background(schedule=dict(priority=40, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media_metadata(media_id):
'''
Downloads the metadata for a media item.
@ -484,7 +491,7 @@ def download_media_metadata(media_id):
f'{source} / {media}: {media_id}')
@background(schedule=dict(priority=15, run_at=10), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@background(schedule=dict(priority=10, run_at=10), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
def download_media_thumbnail(media_id, url):
'''
Downloads an image from a URL and save it as a local thumbnail attached to a
@ -522,7 +529,7 @@ def download_media_thumbnail(media_id, url):
return True
@background(schedule=dict(priority=15, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@background(schedule=dict(priority=30, run_at=60), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def download_media(media_id):
'''
Downloads the media to disk and attaches it to the Media instance.
@ -676,7 +683,7 @@ def rescan_media_server(mediaserver_id):
mediaserver.update()
@background(schedule=dict(priority=25, run_at=600), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
@background(schedule=dict(priority=30, run_at=600), queue=Val(TaskQueue.FS), remove_existing_tasks=True)
def save_all_media_for_source(source_id):
'''
Iterates all media items linked to a source and saves them to
@ -684,6 +691,7 @@ def save_all_media_for_source(source_id):
source has its parameters changed and all media needs to be
checked to see if its download status has changed.
'''
reset_queries()
try:
source = Source.objects.get(pk=source_id)
except Source.DoesNotExist as e:
@ -693,15 +701,26 @@ def save_all_media_for_source(source_id):
raise InvalidTaskError(_('no such source')) from e
saved_later = set()
mqs = Media.objects.filter(source=source)
task = get_source_check_task(source_id)
refresh_qs = mqs.filter(
refresh_qs = Media.objects.all().only(
'pk',
'uuid',
'key',
'title', # for name property
).filter(
source=source,
can_download=False,
skip=False,
manual_skip=False,
downloaded=False,
metadata__isnull=False,
)
uuid_qs = Media.objects.all().only(
'pk',
'uuid',
).filter(
source=source,
).values_list('uuid', flat=True)
task = get_source_check_task(source_id)
if task:
task._verbose_name = remove_enclosed(
task.verbose_name, '[', ']', ' ',
@ -719,17 +738,23 @@ def save_all_media_for_source(source_id):
# Trigger the post_save signal for each media item linked to this source as various
# flags may need to be recalculated
tvn_format = '2/{:,}' + f'/{mqs.count():,}'
for mn, media in enumerate(mqs, start=1):
if media.uuid not in saved_later:
tvn_format = '2/{:,}' + f'/{uuid_qs.count():,}'
for mn, media_uuid in enumerate(uuid_qs, start=1):
if media_uuid not in saved_later:
update_task_status(task, tvn_format.format(mn))
with atomic():
media.save()
try:
media = Media.objects.get(pk=str(media_uuid))
except Media.DoesNotExist as e:
log.exception(str(e))
pass
else:
with atomic():
media.save()
# Reset task.verbose_name to the saved value
update_task_status(task, None)
@background(schedule=dict(priority=10, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
@background(schedule=dict(priority=50, run_at=0), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def refresh_formats(media_id):
try:
media = Media.objects.get(pk=media_id)
@ -776,7 +801,6 @@ def rename_all_media_for_source(source_id):
if not create_rename_tasks:
return
mqs = Media.objects.all().defer(
'metadata',
'thumb',
).filter(
source=source,

View File

@ -205,10 +205,14 @@ def get_media_info(url, /, *, days=None, info_json=None):
'paths': paths,
'postprocessors': postprocessors,
'skip_unavailable_fragments': False,
'sleep_interval_requests': 2 * settings.BACKGROUND_TASK_ASYNC_THREADS,
'sleep_interval_requests': 1,
'verbose': True if settings.DEBUG else False,
'writeinfojson': True,
})
if settings.BACKGROUND_TASK_RUN_ASYNC:
opts.update({
'sleep_interval_requests': 2 * settings.BACKGROUND_TASK_ASYNC_THREADS,
})
if start:
log.debug(f'get_media_info: used date range: {opts["daterange"]} for URL: {url}')
response = {}

View File

@ -62,6 +62,8 @@ else:
DEFAULT_THREADS = 1
BACKGROUND_TASK_ASYNC_THREADS = getenv('TUBESYNC_WORKERS', DEFAULT_THREADS, integer=True)
if BACKGROUND_TASK_ASYNC_THREADS > 1:
BACKGROUND_TASK_RUN_ASYNC = True
MEDIA_ROOT = CONFIG_BASE_DIR / 'media'