mirror of
https://github.com/meeb/tubesync.git
synced 2025-06-22 21:16:38 +00:00
Merge pull request #690 from tcely/patch-3
Some checks are pending
Run Django tests for TubeSync / test (3.10) (push) Waiting to run
Run Django tests for TubeSync / test (3.11) (push) Waiting to run
Run Django tests for TubeSync / test (3.12) (push) Waiting to run
Run Django tests for TubeSync / test (3.7) (push) Waiting to run
Run Django tests for TubeSync / test (3.8) (push) Waiting to run
Run Django tests for TubeSync / test (3.9) (push) Waiting to run
Run Django tests for TubeSync / containerise (push) Waiting to run
Some checks are pending
Run Django tests for TubeSync / test (3.10) (push) Waiting to run
Run Django tests for TubeSync / test (3.11) (push) Waiting to run
Run Django tests for TubeSync / test (3.12) (push) Waiting to run
Run Django tests for TubeSync / test (3.7) (push) Waiting to run
Run Django tests for TubeSync / test (3.8) (push) Waiting to run
Run Django tests for TubeSync / test (3.9) (push) Waiting to run
Run Django tests for TubeSync / containerise (push) Waiting to run
Let Tasks expire correctly
This commit is contained in:
commit
dd55ed5fe5
@ -369,6 +369,10 @@ RUN set -x && \
|
||||
# Copy root
|
||||
COPY config/root /
|
||||
|
||||
# patch background_task
|
||||
COPY patches/background_task/ \
|
||||
/usr/local/lib/python3/dist-packages/background_task/
|
||||
|
||||
# patch yt_dlp
|
||||
COPY patches/yt_dlp/ \
|
||||
/usr/local/lib/python3/dist-packages/yt_dlp/
|
||||
|
446
patches/background_task/models.py
Normal file
446
patches/background_task/models.py
Normal file
@ -0,0 +1,446 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from datetime import timedelta
|
||||
from hashlib import sha1
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import traceback
|
||||
|
||||
from compat import StringIO
|
||||
from compat.models import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import models
|
||||
from django.db.models import Q
|
||||
from django.utils import timezone
|
||||
from six import python_2_unicode_compatible
|
||||
|
||||
from background_task.exceptions import InvalidTaskError
|
||||
from background_task.settings import app_settings
|
||||
from background_task.signals import task_failed
|
||||
from background_task.signals import task_rescheduled
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TaskQuerySet(models.QuerySet):
|
||||
|
||||
def created_by(self, creator):
|
||||
"""
|
||||
:return: A Task queryset filtered by creator
|
||||
"""
|
||||
content_type = ContentType.objects.get_for_model(creator)
|
||||
return self.filter(
|
||||
creator_content_type=content_type,
|
||||
creator_object_id=creator.id,
|
||||
)
|
||||
|
||||
|
||||
class TaskManager(models.Manager):
|
||||
|
||||
def get_queryset(self):
|
||||
return TaskQuerySet(self.model, using=self._db)
|
||||
|
||||
def created_by(self, creator):
|
||||
return self.get_queryset().created_by(creator)
|
||||
|
||||
def find_available(self, queue=None):
|
||||
now = timezone.now()
|
||||
qs = self.unlocked(now)
|
||||
if queue:
|
||||
qs = qs.filter(queue=queue)
|
||||
ready = qs.filter(run_at__lte=now, failed_at=None)
|
||||
_priority_ordering = '{}priority'.format(app_settings.BACKGROUND_TASK_PRIORITY_ORDERING)
|
||||
ready = ready.order_by(_priority_ordering, 'run_at')
|
||||
|
||||
if app_settings.BACKGROUND_TASK_RUN_ASYNC:
|
||||
currently_failed = self.failed().count()
|
||||
currently_locked = self.locked(now).count()
|
||||
count = app_settings.BACKGROUND_TASK_ASYNC_THREADS - \
|
||||
(currently_locked - currently_failed)
|
||||
if count > 0:
|
||||
ready = ready[:count]
|
||||
else:
|
||||
ready = self.none()
|
||||
return ready
|
||||
|
||||
def unlocked(self, now):
|
||||
max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
|
||||
qs = self.get_queryset()
|
||||
expires_at = now - timedelta(seconds=max_run_time)
|
||||
unlocked = Q(locked_by=None) | Q(locked_at__lt=expires_at)
|
||||
return qs.filter(unlocked)
|
||||
|
||||
def locked(self, now):
|
||||
max_run_time = app_settings.BACKGROUND_TASK_MAX_RUN_TIME
|
||||
qs = self.get_queryset()
|
||||
expires_at = now - timedelta(seconds=max_run_time)
|
||||
locked = Q(locked_by__isnull=False) & Q(locked_at__gt=expires_at)
|
||||
return qs.filter(locked)
|
||||
|
||||
def failed(self):
|
||||
"""
|
||||
`currently_locked - currently_failed` in `find_available` assues that
|
||||
tasks marked as failed are also in processing by the running PID.
|
||||
"""
|
||||
qs = self.get_queryset()
|
||||
return qs.filter(failed_at__isnull=False)
|
||||
|
||||
def new_task(self, task_name, args=None, kwargs=None,
|
||||
run_at=None, priority=0, queue=None, verbose_name=None,
|
||||
creator=None, repeat=None, repeat_until=None,
|
||||
remove_existing_tasks=False):
|
||||
"""
|
||||
If `remove_existing_tasks` is True, all unlocked tasks with the identical task hash will be removed.
|
||||
The attributes `repeat` and `repeat_until` are not supported at the moment.
|
||||
"""
|
||||
args = args or ()
|
||||
kwargs = kwargs or {}
|
||||
if run_at is None:
|
||||
run_at = timezone.now()
|
||||
task_params = json.dumps((args, kwargs), sort_keys=True)
|
||||
s = "%s%s" % (task_name, task_params)
|
||||
task_hash = sha1(s.encode('utf-8')).hexdigest()
|
||||
if remove_existing_tasks:
|
||||
Task.objects.filter(task_hash=task_hash, locked_at__isnull=True).delete()
|
||||
return Task(task_name=task_name,
|
||||
task_params=task_params,
|
||||
task_hash=task_hash,
|
||||
priority=priority,
|
||||
run_at=run_at,
|
||||
queue=queue,
|
||||
verbose_name=verbose_name,
|
||||
creator=creator,
|
||||
repeat=repeat or Task.NEVER,
|
||||
repeat_until=repeat_until,
|
||||
)
|
||||
|
||||
def get_task(self, task_name, args=None, kwargs=None):
|
||||
args = args or ()
|
||||
kwargs = kwargs or {}
|
||||
task_params = json.dumps((args, kwargs), sort_keys=True)
|
||||
s = "%s%s" % (task_name, task_params)
|
||||
task_hash = sha1(s.encode('utf-8')).hexdigest()
|
||||
qs = self.get_queryset()
|
||||
return qs.filter(task_hash=task_hash)
|
||||
|
||||
def drop_task(self, task_name, args=None, kwargs=None):
|
||||
return self.get_task(task_name, args, kwargs).delete()
|
||||
|
||||
|
||||
@python_2_unicode_compatible
|
||||
class Task(models.Model):
|
||||
# the "name" of the task/function to be run
|
||||
task_name = models.CharField(max_length=190, db_index=True)
|
||||
# the json encoded parameters to pass to the task
|
||||
task_params = models.TextField()
|
||||
# a sha1 hash of the name and params, to lookup already scheduled tasks
|
||||
task_hash = models.CharField(max_length=40, db_index=True)
|
||||
|
||||
verbose_name = models.CharField(max_length=255, null=True, blank=True)
|
||||
|
||||
# what priority the task has
|
||||
priority = models.IntegerField(default=0, db_index=True)
|
||||
# when the task should be run
|
||||
run_at = models.DateTimeField(db_index=True)
|
||||
|
||||
# Repeat choices are encoded as number of seconds
|
||||
# The repeat implementation is based on this encoding
|
||||
HOURLY = 3600
|
||||
DAILY = 24 * HOURLY
|
||||
WEEKLY = 7 * DAILY
|
||||
EVERY_2_WEEKS = 2 * WEEKLY
|
||||
EVERY_4_WEEKS = 4 * WEEKLY
|
||||
NEVER = 0
|
||||
REPEAT_CHOICES = (
|
||||
(HOURLY, 'hourly'),
|
||||
(DAILY, 'daily'),
|
||||
(WEEKLY, 'weekly'),
|
||||
(EVERY_2_WEEKS, 'every 2 weeks'),
|
||||
(EVERY_4_WEEKS, 'every 4 weeks'),
|
||||
(NEVER, 'never'),
|
||||
)
|
||||
repeat = models.BigIntegerField(choices=REPEAT_CHOICES, default=NEVER)
|
||||
repeat_until = models.DateTimeField(null=True, blank=True)
|
||||
|
||||
# the "name" of the queue this is to be run on
|
||||
queue = models.CharField(max_length=190, db_index=True,
|
||||
null=True, blank=True)
|
||||
|
||||
# how many times the task has been tried
|
||||
attempts = models.IntegerField(default=0, db_index=True)
|
||||
# when the task last failed
|
||||
failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
|
||||
# details of the error that occurred
|
||||
last_error = models.TextField(blank=True)
|
||||
|
||||
# details of who's trying to run the task at the moment
|
||||
locked_by = models.CharField(max_length=64, db_index=True,
|
||||
null=True, blank=True)
|
||||
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
|
||||
|
||||
creator_content_type = models.ForeignKey(
|
||||
ContentType, null=True, blank=True,
|
||||
related_name='background_task', on_delete=models.CASCADE
|
||||
)
|
||||
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
|
||||
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
|
||||
|
||||
objects = TaskManager()
|
||||
|
||||
def locked_by_pid_running(self):
|
||||
"""
|
||||
Check if the locked_by process is still running.
|
||||
"""
|
||||
if self.locked_by:
|
||||
try:
|
||||
# won't kill the process. kill is a bad named system call
|
||||
os.kill(int(self.locked_by), 0)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
else:
|
||||
return None
|
||||
locked_by_pid_running.boolean = True
|
||||
|
||||
def has_error(self):
|
||||
"""
|
||||
Check if the last_error field is empty.
|
||||
"""
|
||||
return bool(self.last_error)
|
||||
has_error.boolean = True
|
||||
|
||||
def params(self):
|
||||
args, kwargs = json.loads(self.task_params)
|
||||
# need to coerce kwargs keys to str
|
||||
kwargs = dict((str(k), v) for k, v in kwargs.items())
|
||||
return args, kwargs
|
||||
|
||||
def lock(self, locked_by):
|
||||
now = timezone.now()
|
||||
unlocked = Task.objects.unlocked(now).filter(pk=self.pk)
|
||||
updated = unlocked.update(locked_by=locked_by, locked_at=now)
|
||||
if updated:
|
||||
return Task.objects.get(pk=self.pk)
|
||||
return None
|
||||
|
||||
def _extract_error(self, type, err, tb):
|
||||
file = StringIO()
|
||||
traceback.print_exception(type, err, tb, None, file)
|
||||
return file.getvalue()
|
||||
|
||||
def increment_attempts(self):
|
||||
self.attempts += 1
|
||||
self.save()
|
||||
|
||||
def has_reached_max_attempts(self):
|
||||
max_attempts = app_settings.BACKGROUND_TASK_MAX_ATTEMPTS
|
||||
return self.attempts >= max_attempts
|
||||
|
||||
def is_repeating_task(self):
|
||||
return self.repeat > self.NEVER
|
||||
|
||||
def reschedule(self, type, err, traceback):
|
||||
'''
|
||||
Set a new time to run the task in future, or create a CompletedTask and delete the Task
|
||||
if it has reached the maximum of allowed attempts
|
||||
'''
|
||||
self.last_error = self._extract_error(type, err, traceback)
|
||||
self.increment_attempts()
|
||||
if self.has_reached_max_attempts() or isinstance(err, InvalidTaskError):
|
||||
self.failed_at = timezone.now()
|
||||
logger.warning('Marking task %s as failed', self)
|
||||
completed = self.create_completed_task()
|
||||
task_failed.send(sender=self.__class__, task_id=self.id, completed_task=completed)
|
||||
self.delete()
|
||||
else:
|
||||
backoff = timedelta(seconds=(self.attempts ** 4) + 5)
|
||||
self.run_at = timezone.now() + backoff
|
||||
logger.warning('Rescheduling task %s for %s later at %s', self,
|
||||
backoff, self.run_at)
|
||||
task_rescheduled.send(sender=self.__class__, task=self)
|
||||
self.locked_by = None
|
||||
self.locked_at = None
|
||||
self.save()
|
||||
|
||||
def create_completed_task(self):
|
||||
'''
|
||||
Returns a new CompletedTask instance with the same values
|
||||
'''
|
||||
completed_task = CompletedTask(
|
||||
task_name=self.task_name,
|
||||
task_params=self.task_params,
|
||||
task_hash=self.task_hash,
|
||||
priority=self.priority,
|
||||
run_at=timezone.now(),
|
||||
queue=self.queue,
|
||||
attempts=self.attempts,
|
||||
failed_at=self.failed_at,
|
||||
last_error=self.last_error,
|
||||
locked_by=self.locked_by,
|
||||
locked_at=self.locked_at,
|
||||
verbose_name=self.verbose_name,
|
||||
creator=self.creator,
|
||||
repeat=self.repeat,
|
||||
repeat_until=self.repeat_until,
|
||||
)
|
||||
completed_task.save()
|
||||
return completed_task
|
||||
|
||||
def create_repetition(self):
|
||||
"""
|
||||
:return: A new Task with an offset of self.repeat, or None if the self.repeat_until is reached
|
||||
"""
|
||||
if not self.is_repeating_task():
|
||||
return None
|
||||
|
||||
if self.repeat_until and self.repeat_until <= timezone.now():
|
||||
# Repeat chain completed
|
||||
return None
|
||||
|
||||
args, kwargs = self.params()
|
||||
new_run_at = self.run_at + timedelta(seconds=self.repeat)
|
||||
while new_run_at < timezone.now():
|
||||
new_run_at += timedelta(seconds=self.repeat)
|
||||
|
||||
new_task = TaskManager().new_task(
|
||||
task_name=self.task_name,
|
||||
args=args,
|
||||
kwargs=kwargs,
|
||||
run_at=new_run_at,
|
||||
priority=self.priority,
|
||||
queue=self.queue,
|
||||
verbose_name=self.verbose_name,
|
||||
creator=self.creator,
|
||||
repeat=self.repeat,
|
||||
repeat_until=self.repeat_until,
|
||||
)
|
||||
new_task.save()
|
||||
return new_task
|
||||
|
||||
def save(self, *arg, **kw):
|
||||
# force NULL rather than empty string
|
||||
self.locked_by = self.locked_by or None
|
||||
return super(Task, self).save(*arg, **kw)
|
||||
|
||||
def __str__(self):
|
||||
return u'{}'.format(self.verbose_name or self.task_name)
|
||||
|
||||
class Meta:
|
||||
db_table = 'background_task'
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class CompletedTaskQuerySet(models.QuerySet):
|
||||
|
||||
def created_by(self, creator):
|
||||
"""
|
||||
:return: A CompletedTask queryset filtered by creator
|
||||
"""
|
||||
content_type = ContentType.objects.get_for_model(creator)
|
||||
return self.filter(
|
||||
creator_content_type=content_type,
|
||||
creator_object_id=creator.id,
|
||||
)
|
||||
|
||||
def failed(self, within=None):
|
||||
"""
|
||||
:param within: A timedelta object
|
||||
:return: A queryset of CompletedTasks that failed within the given timeframe (e.g. less than 1h ago)
|
||||
"""
|
||||
qs = self.filter(
|
||||
failed_at__isnull=False,
|
||||
)
|
||||
if within:
|
||||
time_limit = timezone.now() - within
|
||||
qs = qs.filter(failed_at__gt=time_limit)
|
||||
return qs
|
||||
|
||||
def succeeded(self, within=None):
|
||||
"""
|
||||
:param within: A timedelta object
|
||||
:return: A queryset of CompletedTasks that completed successfully within the given timeframe
|
||||
(e.g. less than 1h ago)
|
||||
"""
|
||||
qs = self.filter(
|
||||
failed_at__isnull=True,
|
||||
)
|
||||
if within:
|
||||
time_limit = timezone.now() - within
|
||||
qs = qs.filter(run_at__gt=time_limit)
|
||||
return qs
|
||||
|
||||
|
||||
@python_2_unicode_compatible
|
||||
class CompletedTask(models.Model):
|
||||
# the "name" of the task/function to be run
|
||||
task_name = models.CharField(max_length=190, db_index=True)
|
||||
# the json encoded parameters to pass to the task
|
||||
task_params = models.TextField()
|
||||
# a sha1 hash of the name and params, to lookup already scheduled tasks
|
||||
task_hash = models.CharField(max_length=40, db_index=True)
|
||||
|
||||
verbose_name = models.CharField(max_length=255, null=True, blank=True)
|
||||
|
||||
# what priority the task has
|
||||
priority = models.IntegerField(default=0, db_index=True)
|
||||
# when the task should be run
|
||||
run_at = models.DateTimeField(db_index=True)
|
||||
|
||||
repeat = models.BigIntegerField(choices=Task.REPEAT_CHOICES, default=Task.NEVER)
|
||||
repeat_until = models.DateTimeField(null=True, blank=True)
|
||||
|
||||
# the "name" of the queue this is to be run on
|
||||
queue = models.CharField(max_length=190, db_index=True,
|
||||
null=True, blank=True)
|
||||
|
||||
# how many times the task has been tried
|
||||
attempts = models.IntegerField(default=0, db_index=True)
|
||||
# when the task last failed
|
||||
failed_at = models.DateTimeField(db_index=True, null=True, blank=True)
|
||||
# details of the error that occurred
|
||||
last_error = models.TextField(blank=True)
|
||||
|
||||
# details of who's trying to run the task at the moment
|
||||
locked_by = models.CharField(max_length=64, db_index=True,
|
||||
null=True, blank=True)
|
||||
locked_at = models.DateTimeField(db_index=True, null=True, blank=True)
|
||||
|
||||
creator_content_type = models.ForeignKey(
|
||||
ContentType, null=True, blank=True,
|
||||
related_name='completed_background_task', on_delete=models.CASCADE
|
||||
)
|
||||
creator_object_id = models.PositiveIntegerField(null=True, blank=True)
|
||||
creator = GenericForeignKey('creator_content_type', 'creator_object_id')
|
||||
|
||||
objects = CompletedTaskQuerySet.as_manager()
|
||||
|
||||
def locked_by_pid_running(self):
|
||||
"""
|
||||
Check if the locked_by process is still running.
|
||||
"""
|
||||
if self.locked_by:
|
||||
try:
|
||||
# won't kill the process. kill is a bad named system call
|
||||
os.kill(int(self.locked_by), 0)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
else:
|
||||
return None
|
||||
locked_by_pid_running.boolean = True
|
||||
|
||||
def has_error(self):
|
||||
"""
|
||||
Check if the last_error field is empty.
|
||||
"""
|
||||
return bool(self.last_error)
|
||||
has_error.boolean = True
|
||||
|
||||
def __str__(self):
|
||||
return u'{} - {}'.format(
|
||||
self.verbose_name or self.task_name,
|
||||
self.run_at,
|
||||
)
|
@ -760,6 +760,18 @@ class TasksView(ListView):
|
||||
queryset = self.get_queryset()
|
||||
now = timezone.now()
|
||||
for task in queryset:
|
||||
# There was broken logic in `Task.objects.locked()`, work around it.
|
||||
# With that broken logic, the tasks never resume properly.
|
||||
# This check unlocks the tasks without a running process.
|
||||
# `task.locked_by_pid_running()` returns:
|
||||
# - `True`: locked and PID exists
|
||||
# - `False`: locked and PID does not exist
|
||||
# - `None`: not `locked_by`, so there was no PID to check
|
||||
if task.locked_by_pid_running() is False:
|
||||
task.locked_by = None
|
||||
# do not wait for the task to expire
|
||||
task.locked_at = None
|
||||
task.save()
|
||||
obj, url = map_task_to_instance(task)
|
||||
if not obj:
|
||||
# Orphaned task, ignore it (it will be deleted when it fires)
|
||||
|
Loading…
Reference in New Issue
Block a user