From 2e1b96bb611472e8207c69558a07be7c0f29562d Mon Sep 17 00:00:00 2001 From: tcely Date: Wed, 28 May 2025 05:16:37 -0400 Subject: [PATCH] Write to the database in batched transactions --- tubesync/sync/tasks.py | 118 ++++++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 38 deletions(-) diff --git a/tubesync/sync/tasks.py b/tubesync/sync/tasks.py index bf5e43ed..3ceb31b0 100644 --- a/tubesync/sync/tasks.py +++ b/tubesync/sync/tasks.py @@ -10,6 +10,7 @@ import random import requests import time import uuid +from collections import deque as queue from io import BytesIO from hashlib import sha1 from pathlib import Path @@ -33,7 +34,7 @@ from common.errors import ( NoFormatException, NoMediaException, from common.utils import ( django_queryset_generator as qs_gen, remove_enclosed, ) from .choices import Val, TaskQueue -from .models import Source, Media, MediaServer +from .models import Source, Media, MediaServer, Metadata from .utils import ( get_remote_image, resize_image_to_height, write_text_file, filter_response, seconds_to_timestr, ) from .youtube import YouTubeError @@ -302,6 +303,24 @@ def cleanup_removed_media(source, video_keys): schedule_media_servers_update() +def save_db_batch(qs, objs, fields, /): + assert hasattr(qs, 'bulk_update') + assert callable(qs.bulk_update) + assert hasattr(objs, '__len__') + assert callable(objs.__len__) + assert isinstance(fields, (tuple, list, set, frozenset)) + + num_updated = 0 + num_objs = len(objs) + with atomic(durable=False): + num_updated = qs.bulk_update(objs=objs, fields=fields) + if num_objs == num_updated: + # this covers at least: list, set, deque + if hasattr(objs, 'clear') and callable(objs.clear): + objs.clear() + return num_updated + + @background(schedule=dict(priority=20, run_at=30), queue=Val(TaskQueue.NET), remove_existing_tasks=True) def index_source_task(source_id): ''' @@ -347,6 +366,17 @@ def index_source_task(source_id): tvn_format = '{:,}' + f'/{num_videos:,}' vn = 0 video_keys = set() + db_batch_data = queue(list(), maxlen=50) + db_fields_data = frozenset(( + 'retrieved', + 'value', + )) + db_batch_media = queue(list(), maxlen=10) + db_fields_media = frozenset(( + 'duration', + 'published', + 'title', + )) while len(videos) > 0: vn += 1 video = videos.popleft() @@ -355,14 +385,24 @@ def index_source_task(source_id): if not key: # Video has no unique key (ID), it can't be indexed continue + if len(db_batch_data) == db_batch_data.maxlen: + save_db_batch(Metadata.objects, db_batch_data, db_fields_data) + if len(db_batch_media) == db_batch_media.maxlen: + save_db_batch(Media.objects, db_batch_media, db_fields_media) video_keys.add(key) update_task_status(task, tvn_format.format(vn)) - # media, new_media = Media.objects.get_or_create(key=key, source=source) - try: - media = Media.objects.get(key=key, source=source) - except Media.DoesNotExist: - media = Media(key=key) - media.source = source + data, new_data = source.videos.defer('value').filter( + media__isnull=True, + ).get_or_create(key=key) + data.retrieved = source.last_crawl + data.value = video + db_batch_data.append(data) + media, new_media = source.media_source.only( + 'uuid', + 'source', + 'key', + *db_fields_media, + ).get_or_create(key=key) media.duration = float(video.get(fields('duration', media), None) or 0) or None media.title = str(video.get(fields('title', media), ''))[:200] timestamp = video.get(fields('timestamp', media), None) @@ -373,45 +413,47 @@ def index_source_task(source_id): else: if published_dt: media.published = published_dt - try: - media.save() - except IntegrityError as e: - log.error(f'Index media failed: {source} / {media} with "{e}"') - else: + db_batch_media.append(media) + if not new_media: log.debug(f'Indexed media: {vn}: {source} / {media}') + else: # log the new media instances - new_media_instance = ( - # new_media or - media.created and - source.last_crawl and - media.created >= source.last_crawl - ) - if new_media_instance: - log.info(f'Indexed new media: {source} / {media}') - log.info(f'Scheduling tasks to download thumbnail for: {media.key}') - thumbnail_fmt = 'https://i.ytimg.com/vi/{}/{}default.jpg' - vn_fmt = _('Downloading {} thumbnail for: "{}": {}') - for prefix in ('hq', 'sd', 'maxres',): - thumbnail_url = thumbnail_fmt.format( - media.key, - prefix, - ) - download_media_thumbnail( - str(media.pk), - thumbnail_url, - verbose_name=vn_fmt.format(prefix, media.key, media.name), - ) - log.info(f'Scheduling task to download metadata for: {media.url}') - verbose_name = _('Downloading metadata for: "{}": {}') - download_media_metadata( - str(media.pk), - verbose_name=verbose_name.format(media.key, media.name), + log.info(f'Indexed new media: {source} / {media}') + log.info(f'Scheduling tasks to download thumbnail for: {media.key}') + thumbnail_fmt = 'https://i.ytimg.com/vi/{}/{}default.jpg' + vn_fmt = _('Downloading {} thumbnail for: "{}": {}') + for prefix in ('hq', 'sd', 'maxres',): + thumbnail_url = thumbnail_fmt.format( + media.key, + prefix, ) + download_media_thumbnail( + str(media.pk), + thumbnail_url, + verbose_name=vn_fmt.format(prefix, media.key, media.name), + ) + log.info(f'Scheduling task to download metadata for: {media.url}') + verbose_name = _('Downloading metadata for: "{}": {}') + download_media_metadata( + str(media.pk), + verbose_name=verbose_name.format(media.key, media.name), + ) # Reset task.verbose_name to the saved value update_task_status(task, None) + # Update any remaining items in the batches + save_db_batch(Metadata.objects, db_batch_data, db_fields_data) + save_db_batch(Media.objects, db_batch_media, db_fields_media) # Cleanup of media no longer available from the source cleanup_removed_media(source, video_keys) videos = video = None + db_batch_data.clear() + db_batch_media.clear() + # Trigger any signals that we skipped with batches + vn_fmt = _('Checking all media for "{}"') + save_all_media_for_source( + str(source.pk), + verbose_name=vn_fmt.format(source.name), + ) @background(schedule=dict(priority=0, run_at=0), queue=Val(TaskQueue.FS))