mirror of
https://github.com/meeb/tubesync.git
synced 2025-06-18 19:16:36 +00:00
Merge 983679964b
into f5a71592fc
This commit is contained in:
commit
ff909d0f93
@ -229,7 +229,16 @@ def schedule_media_servers_update():
|
|||||||
rescan_media_server(str(mediaserver.pk))
|
rescan_media_server(str(mediaserver.pk))
|
||||||
|
|
||||||
|
|
||||||
def wait_for_errors(model, /, *, task_name=None):
|
def contains_http429(q, task_id, /):
|
||||||
|
from huey.exceptions import TaskException
|
||||||
|
try:
|
||||||
|
q.result(preserve=True, id=task_id)
|
||||||
|
except TaskException as e:
|
||||||
|
return True if 'HTTPError 429: Too Many Requests' in str(e) else False
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_errors(model, /, *, queue_name=None, task_name=None):
|
||||||
if task_name is None:
|
if task_name is None:
|
||||||
task_name=tuple((
|
task_name=tuple((
|
||||||
'sync.tasks.download_media',
|
'sync.tasks.download_media',
|
||||||
@ -253,12 +262,27 @@ def wait_for_errors(model, /, *, task_name=None):
|
|||||||
for task in tasks:
|
for task in tasks:
|
||||||
update_task_status(task, 'paused (429)')
|
update_task_status(task, 'paused (429)')
|
||||||
|
|
||||||
delay = 10 * tqs.count()
|
total_count = tqs.count()
|
||||||
|
if queue_name:
|
||||||
|
from django_huey import get_queue
|
||||||
|
q = get_queue(queue_name)
|
||||||
|
total_count += sum([ 1 if contains_http429(q, k) else 0 for k in q.all_results() ])
|
||||||
|
delay = 10 * total_count
|
||||||
time_str = seconds_to_timestr(delay)
|
time_str = seconds_to_timestr(delay)
|
||||||
log.info(f'waiting for errors: 429 ({time_str}): {model}')
|
log.info(f'waiting for errors: 429 ({time_str}): {model}')
|
||||||
time.sleep(delay)
|
db_down_path = Path('/run/service/tubesync-db-worker/down')
|
||||||
|
fs_down_path = Path('/run/service/tubesync-fs-worker/down')
|
||||||
|
while delay > 0:
|
||||||
|
# this happenes when the container is shutting down
|
||||||
|
# do not prevent that while we are delaying a task
|
||||||
|
if db_down_path.exists() and fs_down_path.exists():
|
||||||
|
break
|
||||||
|
time.sleep(5)
|
||||||
|
delay -= 5
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
update_task_status(task, None)
|
update_task_status(task, None)
|
||||||
|
if delay > 0:
|
||||||
|
raise BgTaskWorkerError(_('queue worker stopped'))
|
||||||
|
|
||||||
|
|
||||||
@db_task(queue=Val(TaskQueue.FS))
|
@db_task(queue=Val(TaskQueue.FS))
|
||||||
@ -623,7 +647,11 @@ def download_media_metadata(media_id):
|
|||||||
log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.')
|
log.info(f'Task for ID: {media_id} / {media} skipped, due to task being manually skipped.')
|
||||||
return
|
return
|
||||||
source = media.source
|
source = media.source
|
||||||
wait_for_errors(media, task_name='sync.tasks.download_media_metadata')
|
wait_for_errors(
|
||||||
|
media,
|
||||||
|
queue_name=Val(TaskQueue.LIMIT),
|
||||||
|
task_name='sync.tasks.download_media_metadata',
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
metadata = media.index_metadata()
|
metadata = media.index_metadata()
|
||||||
except YouTubeError as e:
|
except YouTubeError as e:
|
||||||
@ -776,7 +804,11 @@ def download_media(media_id, override=False):
|
|||||||
# should raise an exception to avoid this
|
# should raise an exception to avoid this
|
||||||
return
|
return
|
||||||
|
|
||||||
wait_for_errors(media, task_name='sync.tasks.download_media')
|
wait_for_errors(
|
||||||
|
media,
|
||||||
|
queue_name=Val(TaskQueue.LIMIT),
|
||||||
|
task_name='sync.tasks.download_media',
|
||||||
|
)
|
||||||
filepath = media.filepath
|
filepath = media.filepath
|
||||||
container = format_str = None
|
container = format_str = None
|
||||||
log.info(f'Downloading media: {media} (UUID: {media.pk}) to: "{filepath}"')
|
log.info(f'Downloading media: {media} (UUID: {media.pk}) to: "{filepath}"')
|
||||||
@ -902,6 +934,10 @@ def refresh_formats(media_id):
|
|||||||
except Media.DoesNotExist as e:
|
except Media.DoesNotExist as e:
|
||||||
raise CancelExecution(_('no such media'), retry=False) from e
|
raise CancelExecution(_('no such media'), retry=False) from e
|
||||||
else:
|
else:
|
||||||
|
wait_for_errors(
|
||||||
|
media,
|
||||||
|
queue_name=Val(TaskQueue.LIMIT),
|
||||||
|
)
|
||||||
if media.refresh_formats:
|
if media.refresh_formats:
|
||||||
save_model(media)
|
save_model(media)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user