Write to the database in batched transactions

This commit is contained in:
tcely 2025-05-28 05:16:37 -04:00 committed by GitHub
parent 811e36abe0
commit 2e1b96bb61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -10,6 +10,7 @@ import random
import requests import requests
import time import time
import uuid import uuid
from collections import deque as queue
from io import BytesIO from io import BytesIO
from hashlib import sha1 from hashlib import sha1
from pathlib import Path from pathlib import Path
@ -33,7 +34,7 @@ from common.errors import ( NoFormatException, NoMediaException,
from common.utils import ( django_queryset_generator as qs_gen, from common.utils import ( django_queryset_generator as qs_gen,
remove_enclosed, ) remove_enclosed, )
from .choices import Val, TaskQueue from .choices import Val, TaskQueue
from .models import Source, Media, MediaServer from .models import Source, Media, MediaServer, Metadata
from .utils import ( get_remote_image, resize_image_to_height, from .utils import ( get_remote_image, resize_image_to_height,
write_text_file, filter_response, seconds_to_timestr, ) write_text_file, filter_response, seconds_to_timestr, )
from .youtube import YouTubeError from .youtube import YouTubeError
@ -302,6 +303,24 @@ def cleanup_removed_media(source, video_keys):
schedule_media_servers_update() schedule_media_servers_update()
def save_db_batch(qs, objs, fields, /):
assert hasattr(qs, 'bulk_update')
assert callable(qs.bulk_update)
assert hasattr(objs, '__len__')
assert callable(objs.__len__)
assert isinstance(fields, (tuple, list, set, frozenset))
num_updated = 0
num_objs = len(objs)
with atomic(durable=False):
num_updated = qs.bulk_update(objs=objs, fields=fields)
if num_objs == num_updated:
# this covers at least: list, set, deque
if hasattr(objs, 'clear') and callable(objs.clear):
objs.clear()
return num_updated
@background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) @background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True)
def index_source_task(source_id): def index_source_task(source_id):
''' '''
@ -347,6 +366,17 @@ def index_source_task(source_id):
tvn_format = '{:,}' + f'/{num_videos:,}' tvn_format = '{:,}' + f'/{num_videos:,}'
vn = 0 vn = 0
video_keys = set() video_keys = set()
db_batch_data = queue(list(), maxlen=50)
db_fields_data = frozenset((
'retrieved',
'value',
))
db_batch_media = queue(list(), maxlen=10)
db_fields_media = frozenset((
'duration',
'published',
'title',
))
while len(videos) > 0: while len(videos) > 0:
vn += 1 vn += 1
video = videos.popleft() video = videos.popleft()
@ -355,14 +385,24 @@ def index_source_task(source_id):
if not key: if not key:
# Video has no unique key (ID), it can't be indexed # Video has no unique key (ID), it can't be indexed
continue continue
if len(db_batch_data) == db_batch_data.maxlen:
save_db_batch(Metadata.objects, db_batch_data, db_fields_data)
if len(db_batch_media) == db_batch_media.maxlen:
save_db_batch(Media.objects, db_batch_media, db_fields_media)
video_keys.add(key) video_keys.add(key)
update_task_status(task, tvn_format.format(vn)) update_task_status(task, tvn_format.format(vn))
# media, new_media = Media.objects.get_or_create(key=key, source=source) data, new_data = source.videos.defer('value').filter(
try: media__isnull=True,
media = Media.objects.get(key=key, source=source) ).get_or_create(key=key)
except Media.DoesNotExist: data.retrieved = source.last_crawl
media = Media(key=key) data.value = video
media.source = source db_batch_data.append(data)
media, new_media = source.media_source.only(
'uuid',
'source',
'key',
*db_fields_media,
).get_or_create(key=key)
media.duration = float(video.get(fields('duration', media), None) or 0) or None media.duration = float(video.get(fields('duration', media), None) or 0) or None
media.title = str(video.get(fields('title', media), ''))[:200] media.title = str(video.get(fields('title', media), ''))[:200]
timestamp = video.get(fields('timestamp', media), None) timestamp = video.get(fields('timestamp', media), None)
@ -373,45 +413,47 @@ def index_source_task(source_id):
else: else:
if published_dt: if published_dt:
media.published = published_dt media.published = published_dt
try: db_batch_media.append(media)
media.save() if not new_media:
except IntegrityError as e:
log.error(f'Index media failed: {source} / {media} with "{e}"')
else:
log.debug(f'Indexed media: {vn}: {source} / {media}') log.debug(f'Indexed media: {vn}: {source} / {media}')
else:
# log the new media instances # log the new media instances
new_media_instance = ( log.info(f'Indexed new media: {source} / {media}')
# new_media or log.info(f'Scheduling tasks to download thumbnail for: {media.key}')
media.created and thumbnail_fmt = 'https://i.ytimg.com/vi/{}/{}default.jpg'
source.last_crawl and vn_fmt = _('Downloading {} thumbnail for: "{}": {}')
media.created >= source.last_crawl for prefix in ('hq', 'sd', 'maxres',):
) thumbnail_url = thumbnail_fmt.format(
if new_media_instance: media.key,
log.info(f'Indexed new media: {source} / {media}') prefix,
log.info(f'Scheduling tasks to download thumbnail for: {media.key}')
thumbnail_fmt = 'https://i.ytimg.com/vi/{}/{}default.jpg'
vn_fmt = _('Downloading {} thumbnail for: "{}": {}')
for prefix in ('hq', 'sd', 'maxres',):
thumbnail_url = thumbnail_fmt.format(
media.key,
prefix,
)
download_media_thumbnail(
str(media.pk),
thumbnail_url,
verbose_name=vn_fmt.format(prefix, media.key, media.name),
)
log.info(f'Scheduling task to download metadata for: {media.url}')
verbose_name = _('Downloading metadata for: "{}": {}')
download_media_metadata(
str(media.pk),
verbose_name=verbose_name.format(media.key, media.name),
) )
download_media_thumbnail(
str(media.pk),
thumbnail_url,
verbose_name=vn_fmt.format(prefix, media.key, media.name),
)
log.info(f'Scheduling task to download metadata for: {media.url}')
verbose_name = _('Downloading metadata for: "{}": {}')
download_media_metadata(
str(media.pk),
verbose_name=verbose_name.format(media.key, media.name),
)
# Reset task.verbose_name to the saved value # Reset task.verbose_name to the saved value
update_task_status(task, None) update_task_status(task, None)
# Update any remaining items in the batches
save_db_batch(Metadata.objects, db_batch_data, db_fields_data)
save_db_batch(Media.objects, db_batch_media, db_fields_media)
# Cleanup of media no longer available from the source # Cleanup of media no longer available from the source
cleanup_removed_media(source, video_keys) cleanup_removed_media(source, video_keys)
videos = video = None videos = video = None
db_batch_data.clear()
db_batch_media.clear()
# Trigger any signals that we skipped with batches
vn_fmt = _('Checking all media for "{}"')
save_all_media_for_source(
str(source.pk),
verbose_name=vn_fmt.format(source.name),
)
@background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS)) @background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS))