Source code for django_program.pretalx.sync

"""Synchronization service for importing Pretalx data into Django models.

Provides :class:`PretalxSyncService` which orchestrates the import of speakers,
talks, and schedule slots from a Pretalx event into the corresponding Django
models.  Each sync method is idempotent and uses bulk operations for
performance.
"""

import logging
import zoneinfo
from datetime import UTC, datetime
from typing import TYPE_CHECKING

from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models import Max, Min
from django.db.models.functions import Lower
from django.utils import timezone
from django.utils.text import slugify

from django_program.pretalx.models import Room, ScheduleSlot, Speaker, SubmissionTypeDefault, Talk
from django_program.pretalx.profiles import resolve_pretalx_profile
from django_program.programs.models import Activity
from django_program.settings import get_config
from pretalx_client.adapters.normalization import localized as _localized
from pretalx_client.client import PretalxClient

if TYPE_CHECKING:
    from collections.abc import Iterator

    from django.contrib.auth.models import AbstractBaseUser
    from django.db.models import QuerySet

    from django_program.conference.models import Conference

logger = logging.getLogger(__name__)

_PROGRESS_CHUNK = 10

# Maps Pretalx submission_type names (case-insensitive) to ActivityType values.
_SUBMISSION_TYPE_TO_ACTIVITY: dict[str, str] = {
    "tutorial": Activity.ActivityType.TUTORIAL,
    "workshop": Activity.ActivityType.WORKSHOP,
    "lightning talk": Activity.ActivityType.LIGHTNING_TALK,
    "sprint": Activity.ActivityType.SPRINT,
    "summit": Activity.ActivityType.SUMMIT,
    "open space": Activity.ActivityType.OPEN_SPACE,
}


[docs] class PretalxSyncService: """Synchronizes speaker, talk, and schedule data from Pretalx to Django models. Builds a :class:`~django_program.pretalx.client.PretalxClient` from the conference's ``pretalx_event_slug`` and the global Pretalx configuration, then provides methods to sync each entity type individually or all at once. Args: conference: The conference whose Pretalx data should be synced. Raises: ValueError: If the conference has no ``pretalx_event_slug`` configured. """
[docs] def __init__(self, conference: Conference) -> None: """Initialize the sync service for the given conference. Args: conference: The conference whose Pretalx data should be synced. Raises: ValueError: If the conference has no ``pretalx_event_slug`` configured. """ if not conference.pretalx_event_slug: msg = f"Conference '{conference.slug}' has no pretalx_event_slug configured" raise ValueError(msg) self.conference = conference config = get_config() base_url = config.pretalx.base_url api_token = config.pretalx.token or "" self.client = PretalxClient( conference.pretalx_event_slug, base_url=base_url, api_token=api_token, ) self._schedule_delete_guard_enabled = config.pretalx.schedule_delete_guard_enabled self._schedule_delete_guard_min_existing_slots = config.pretalx.schedule_delete_guard_min_existing_slots self._schedule_delete_guard_max_fraction_removed = float( config.pretalx.schedule_delete_guard_max_fraction_removed ) self._rooms: dict[int, Room] | None = None self._room_names: dict[int, str] | None = None self._submission_types: dict[int, str] | None = None self._tracks: dict[int, str] | None = None self._tags: dict[int, str] | None = None self.profile = resolve_pretalx_profile( event_slug=conference.pretalx_event_slug, conference_slug=conference.slug, )
def _ensure_mappings(self) -> None: """Pre-fetch room, submission type, and track ID-to-name mappings. Fetches each mapping once and caches it on the instance so that subsequent sync methods can resolve integer IDs from the real Pretalx API into human-readable names. Safe to call multiple times; only fetches on the first call. """ if self._rooms is None: logger.debug("Fetching room mappings for %s", self.conference.slug) self._rooms = { room.pretalx_id: room for room in Room.objects.filter(conference=self.conference) if room.pretalx_id is not None } self._room_names = {pid: str(room.name) for pid, room in self._rooms.items()} if self._submission_types is None: logger.debug("Fetching submission type mappings for %s", self.conference.slug) self._submission_types = self.client.fetch_submission_types() if self._tracks is None: logger.debug("Fetching track mappings for %s", self.conference.slug) self._tracks = self.client.fetch_tracks() if self._tags is None: logger.debug("Fetching tag mappings for %s", self.conference.slug) try: self._tags = self.client.fetch_tags() except RuntimeError: logger.warning( "Could not fetch tag mappings for %s; continuing without tags", self.conference.slug, ) self._tags = {}
[docs] def sync_rooms(self) -> int: """Fetch rooms from Pretalx and upsert into the database. Returns: The number of rooms synced. """ api_rooms = self.client.fetch_rooms_full() now = timezone.now() count = 0 for raw_room in api_rooms: room_id = raw_room.get("id") if room_id is None: continue name = _localized(raw_room.get("name")) description = _localized(raw_room.get("description")) capacity = raw_room.get("capacity") position = raw_room.get("position") Room.objects.update_or_create( conference=self.conference, pretalx_id=int(room_id), defaults={ "name": name, "description": description, "capacity": capacity, "position": position, "synced_at": now, }, ) count += 1 logger.info("Synced %d rooms for %s", count, self.conference.slug) self._rooms = None self._room_names = None self._ensure_mappings() return count
[docs] def sync_speakers(self) -> int: """Fetch speakers from Pretalx and upsert into the database. Uses bulk operations for performance and delegates to :meth:`sync_speakers_iter` which yields progress dicts. Returns: The number of speakers synced. """ count = 0 for progress in self.sync_speakers_iter(): if "count" in progress: count = progress["count"] return count
[docs] def sync_speakers_iter(self) -> Iterator[dict[str, int | str]]: """Bulk sync speakers from Pretalx, yielding progress updates. Yields: A ``{"phase": "fetching"}`` dict before the API call, dicts with ``current``/``total`` keys during processing, and a final dict with ``count`` when complete. """ yield {"phase": "fetching"} api_speakers = self.client.fetch_speakers() total = len(api_speakers) if total == 0: yield {"count": 0} return now = timezone.now() yield {"current": 0, "total": total} existing = {s.pretalx_code: s for s in Speaker.objects.filter(conference=self.conference)} emails = {s.email.lower() for s in api_speakers if s.email} user_model: type[AbstractBaseUser] = get_user_model() # type: ignore[assignment] users_by_email: dict[str, object] = {} if emails: for u in user_model.objects.annotate( email_lower=Lower("email"), ).filter(email_lower__in=emails): users_by_email[u.email_lower] = u to_create: list[Speaker] = [] to_update: list[Speaker] = [] for i, api_speaker in enumerate(api_speakers): target = to_update if api_speaker.code in existing else to_create target.append( _build_speaker(api_speaker, self.conference, existing, users_by_email, now), ) if (i + 1) % _PROGRESS_CHUNK == 0 or (i + 1) == total: yield {"current": i + 1, "total": total} if to_create: Speaker.objects.bulk_create(to_create, batch_size=500) if to_update: Speaker.objects.bulk_update( to_update, fields=["name", "biography", "avatar_url", "email", "synced_at", "user"], batch_size=500, ) count = len(to_create) + len(to_update) logger.info( "Synced %d speakers (%d new, %d updated) for %s", count, len(to_create), len(to_update), self.conference.slug, ) yield {"count": count}
[docs] def sync_talks(self) -> int: """Fetch talks from Pretalx and upsert into the database. Uses bulk operations for performance and delegates to :meth:`sync_talks_iter` which yields progress dicts. Returns: The number of talks synced. """ count = 0 for progress in self.sync_talks_iter(): if "count" in progress: count = progress["count"] return count
def _bulk_set_talk_speakers(self, m2m_map: dict[str, list[int]]) -> None: """Replace M2M speaker relationships for synced talks in bulk. Clears all existing speaker associations for the synced talks, then re-creates only the relationships present in *m2m_map*. Talks with empty speaker lists will have their associations cleared. Args: m2m_map: Mapping of talk pretalx_code to lists of speaker PKs. """ synced_codes = set(m2m_map.keys()) synced_talk_pks = dict( Talk.objects.filter( conference=self.conference, pretalx_code__in=synced_codes, ).values_list("pretalx_code", "pk") ) TalkSpeaker = Talk.speakers.through # noqa: N806 TalkSpeaker.objects.filter( talk_id__in=synced_talk_pks.values(), ).delete() through_entries = [] for talk_code, spk_pks in m2m_map.items(): talk_pk = synced_talk_pks.get(talk_code) if talk_pk: through_entries.extend(TalkSpeaker(talk_id=talk_pk, speaker_id=spk_pk) for spk_pk in spk_pks) if through_entries: TalkSpeaker.objects.bulk_create( through_entries, ignore_conflicts=True, batch_size=500, )
[docs] def sync_talks_iter(self) -> Iterator[dict[str, int | str]]: """Bulk sync talks from Pretalx, yielding progress updates. Yields: A ``{"phase": "fetching"}`` dict before the API call, dicts with ``current``/``total`` keys during processing, and a final dict with ``count`` when complete. """ self._ensure_mappings() yield {"phase": "fetching"} api_talks = self.client.fetch_talks( submission_types=self._submission_types, tracks=self._tracks, tags=self._tags, rooms=self._room_names, ) total = len(api_talks) if total == 0: yield {"count": 0} return now = timezone.now() yield {"current": 0, "total": total} existing = {t.pretalx_code: t for t in Talk.objects.filter(conference=self.conference)} speaker_pk_map = {s.pretalx_code: s.pk for s in Speaker.objects.filter(conference=self.conference)} to_create: list[Talk] = [] to_update: list[Talk] = [] m2m_map: dict[str, list[int]] = {} for i, api_talk in enumerate(api_talks): room = self._resolve_room(api_talk.room) fields = { "title": api_talk.title, "abstract": api_talk.abstract, "description": api_talk.description, "submission_type": api_talk.submission_type, "track": self.profile.sync_track(api_talk), "tags": self.profile.sync_tags(api_talk), "duration": api_talk.duration, "state": api_talk.state, "room": room, "slot_start": _parse_iso_datetime(api_talk.slot_start), "slot_end": _parse_iso_datetime(api_talk.slot_end), "synced_at": now, } if api_talk.code in existing: talk = existing[api_talk.code] for k, v in fields.items(): setattr(talk, k, v) to_update.append(talk) else: to_create.append( Talk( conference=self.conference, pretalx_code=api_talk.code, **fields, ) ) m2m_map[api_talk.code] = [speaker_pk_map[code] for code in api_talk.speaker_codes if code in speaker_pk_map] if (i + 1) % _PROGRESS_CHUNK == 0 or (i + 1) == total: yield {"current": i + 1, "total": total} if to_create: Talk.objects.bulk_create(to_create, batch_size=500) if to_update: Talk.objects.bulk_update( to_update, fields=[ "title", "abstract", "description", "submission_type", "track", "tags", "duration", "state", "room", "slot_start", "slot_end", "synced_at", ], batch_size=500, ) self._bulk_set_talk_speakers(m2m_map) count = len(to_create) + len(to_update) logger.info( "Synced %d talks (%d new, %d updated) for %s", count, len(to_create), len(to_update), self.conference.slug, ) self._sync_activities_from_talks(now) yield {"count": count}
def _sync_activities_from_talks(self, now: datetime) -> None: """Auto-create or update Activities for Pretalx submission types. For each unique ``submission_type`` on synced talks that maps to a known :class:`~django_program.programs.models.Activity.ActivityType`, creates or updates an Activity linked to that submission type. Also populates the ``talks`` M2M with matching talks and enriches the activity with scheduling data derived from those talks. """ sub_types = ( Talk.objects.filter(conference=self.conference) .exclude(submission_type="") .values_list("submission_type", flat=True) .distinct() ) for sub_type in sub_types: activity_type = _SUBMISSION_TYPE_TO_ACTIVITY.get(sub_type.lower()) if activity_type is None: continue base_slug = slugify(sub_type) or "activity" matching_talks = Talk.objects.filter( conference=self.conference, submission_type=sub_type, ) enrichment = self._build_activity_enrichment(matching_talks) activity, created = Activity.objects.update_or_create( conference=self.conference, pretalx_submission_type=sub_type, defaults={ "name": f"{sub_type}s", "activity_type": activity_type, "synced_at": now, **enrichment, }, ) if created: activity.slug = self._unique_activity_slug(base_slug) activity.save(update_fields=["slug"]) activity.talks.set(matching_talks) verb = "Created" if created else "Updated" logger.info( "%s activity '%s' for submission type '%s' (%d talks)", verb, activity.name, sub_type, matching_talks.count(), ) def _build_activity_enrichment(self, talks: QuerySet[Talk]) -> dict[str, object]: """Derive activity metadata from a set of linked talks. Computes a description summary, earliest/latest times, and a shared room (if all talks are in the same room). Args: talks: QuerySet of Talk instances linked to this activity. Returns: A dict of field names to values suitable for Activity defaults. """ enrichment: dict[str, object] = {} talk_count = talks.count() if talk_count == 0: return enrichment rooms = talks.exclude(room__isnull=True).order_by().values_list("room__name", flat=True).distinct() room_names = list(rooms) room_count = len(room_names) if room_count == 1: enrichment["description"] = f"{talk_count} talk{'s' if talk_count != 1 else ''} in {room_names[0]}" elif room_count > 1: enrichment["description"] = f"{talk_count} talk{'s' if talk_count != 1 else ''} across {room_count} rooms" else: enrichment["description"] = f"{talk_count} talk{'s' if talk_count != 1 else ''}" agg = talks.exclude(slot_start__isnull=True).aggregate( earliest=Min("slot_start"), latest=Max("slot_end"), ) if agg["earliest"]: enrichment["start_time"] = agg["earliest"] if agg["latest"]: enrichment["end_time"] = agg["latest"] if room_count == 1: shared_room = talks.exclude(room__isnull=True).first() if shared_room: enrichment["room"] = shared_room.room return enrichment def _unique_activity_slug(self, base: str) -> str: """Generate a unique Activity slug within this conference.""" candidate = base counter = 1 while Activity.objects.filter(conference=self.conference, slug=candidate).exists(): counter += 1 candidate = f"{base}-{counter}" return candidate def _check_schedule_deletion_safety( self, *, existing_count: int, stale_count: int, allow_large_deletions: bool, ) -> None: """Raise when schedule deletion volume looks anomalous. The guard is intended to prevent accidental local schedule wipes when Pretalx returns an unexpectedly small/empty payload. """ if allow_large_deletions or not self._schedule_delete_guard_enabled: return if existing_count < self._schedule_delete_guard_min_existing_slots: return if existing_count <= 0 or stale_count <= 0: return removed_fraction = stale_count / existing_count if removed_fraction < self._schedule_delete_guard_max_fraction_removed: return msg = ( "Aborting schedule sync: would remove " f"{stale_count}/{existing_count} existing slots ({removed_fraction:.1%}), " "which exceeds the configured safety threshold. " "Retry with allow_large_deletions=True only if this is intentional." ) raise RuntimeError(msg)
[docs] def sync_schedule(self, *, allow_large_deletions: bool = False) -> tuple[int, int]: """Fetch schedule slots from Pretalx and upsert into the database. Slots that no longer appear in the Pretalx schedule are deleted after the sync completes. Args: allow_large_deletions: When ``True``, bypasses the schedule-drop safety guard and permits large stale-slot deletions. Returns: A tuple of ``(synced_count, unscheduled_count)`` where *unscheduled_count* is the number of talks that still have no scheduled slot after the sync. """ with transaction.atomic(): self._ensure_mappings() api_slots = self.client.fetch_schedule(rooms=self._room_names) existing_count = ScheduleSlot.objects.filter(conference=self.conference).count() now = timezone.now() count = 0 for api_slot in api_slots: talk = None slot_type = _classify_slot(api_slot.title, api_slot.code) title = api_slot.title if api_slot.code: try: talk = Talk.objects.get( conference=self.conference, pretalx_code=api_slot.code, ) title = title or talk.title except Talk.DoesNotExist: # Some schedule slots legitimately have no local Talk # record (for example external events). pass start_dt = api_slot.start_dt or _parse_iso_datetime(api_slot.start) end_dt = api_slot.end_dt or _parse_iso_datetime(api_slot.end) if start_dt is None or end_dt is None: logger.warning("Skipping slot with unparsable times: %s", api_slot) continue room = self._resolve_room(api_slot.room) ScheduleSlot.objects.update_or_create( conference=self.conference, start=start_dt, room=room, defaults={ "talk": talk, "title": title, "end": end_dt, "slot_type": slot_type, "synced_at": now, }, ) logger.debug("Synced slot %s at %s in %s", title, start_dt, api_slot.room) count += 1 stale_qs = ScheduleSlot.objects.filter( conference=self.conference, ).exclude(synced_at=now) stale_count = stale_qs.count() self._check_schedule_deletion_safety( existing_count=existing_count, stale_count=stale_count, allow_large_deletions=allow_large_deletions, ) stale_count, _ = stale_qs.delete() if stale_count: logger.info("Removed %d stale schedule slots for %s", stale_count, self.conference.slug) logger.info("Synced %d schedule slots for %s", count, self.conference.slug) self._backfill_talks_from_schedule() unscheduled = Talk.objects.filter( conference=self.conference, slot_start__isnull=True, ).count() if unscheduled: logger.info("%d talks remain unscheduled for %s", unscheduled, self.conference.slug) return count, unscheduled
def _backfill_talks_from_schedule(self) -> None: """Populate talk room/slot fields from linked schedule slots. When talks are synced from the ``/submissions/`` fallback endpoint, room and slot times are absent. This method fills them from the schedule slots that reference each talk. """ slots = ScheduleSlot.objects.filter(conference=self.conference, talk__isnull=False).select_related( "talk", "room" ) to_update: list[Talk] = [] for slot in slots: talk = slot.talk changed = False if talk.room_id != slot.room_id: talk.room = slot.room changed = True if talk.slot_start != slot.start: talk.slot_start = slot.start changed = True if talk.slot_end != slot.end: talk.slot_end = slot.end changed = True if changed: to_update.append(talk) if to_update: Talk.objects.bulk_update(to_update, fields=["room", "slot_start", "slot_end"], batch_size=500) logger.info("Back-filled room/slot data for %d talks from schedule", len(to_update)) def _resolve_room(self, room_name: str) -> Room | None: """Look up a Room instance by its display name. Args: room_name: The room display name as resolved from the Pretalx API. Returns: The matching ``Room`` instance, or ``None`` if the name is empty or no match is found. """ if not room_name or self._rooms is None: return None for room in self._rooms.values(): if room.name == room_name: return room return None
[docs] def apply_type_defaults(self) -> int: """Apply SubmissionTypeDefault records to unscheduled talks. For each configured submission type default, finds talks of that type that have no room assigned and applies the default room and time slot. Returns: The number of talks that were modified by type defaults. """ defaults = SubmissionTypeDefault.objects.filter(conference=self.conference).select_related("default_room") if not defaults.exists(): return 0 try: conf_tz = zoneinfo.ZoneInfo(str(self.conference.timezone)) except (zoneinfo.ZoneInfoNotFoundError, KeyError): # fmt: skip logger.warning( "Invalid timezone '%s' for conference %s; falling back to UTC for type defaults.", self.conference.timezone, self.conference.slug, ) conf_tz = UTC to_update: list[Talk] = [] update_fields: set[str] = set() for type_default in defaults: talks = Talk.objects.filter( conference=self.conference, submission_type=type_default.submission_type, room__isnull=True, ) for talk in talks: changed = False if type_default.default_room is not None: talk.room = type_default.default_room update_fields.add("room") changed = True if type_default.default_date and type_default.default_start_time and talk.slot_start is None: talk.slot_start = datetime.combine( type_default.default_date, type_default.default_start_time, tzinfo=conf_tz, ) update_fields.add("slot_start") changed = True if type_default.default_date and type_default.default_end_time and talk.slot_end is None: talk.slot_end = datetime.combine( type_default.default_date, type_default.default_end_time, tzinfo=conf_tz, ) update_fields.add("slot_end") changed = True if changed: to_update.append(talk) if to_update and update_fields: Talk.objects.bulk_update(to_update, fields=list(update_fields), batch_size=500) logger.info( "Applied type defaults to %d talks for %s", len(to_update), self.conference.slug, ) return len(to_update)
[docs] def sync_all(self, *, allow_large_deletions: bool = False) -> dict[str, int]: """Run all sync operations in dependency order. Returns: A mapping of entity type to the number synced. The ``schedule_slots`` key contains only the synced count; ``unscheduled_talks`` is added when any talks lack a slot. ``type_defaults_applied`` is added when type defaults modify any talks. """ schedule_count, unscheduled = self.sync_schedule(allow_large_deletions=allow_large_deletions) result: dict[str, int] = { "rooms": self.sync_rooms(), "speakers": self.sync_speakers(), "talks": self.sync_talks(), "schedule_slots": schedule_count, } if unscheduled: result["unscheduled_talks"] = unscheduled type_defaults_applied = self.apply_type_defaults() if type_defaults_applied: result["type_defaults_applied"] = type_defaults_applied return result
def _build_speaker( api_speaker: object, conference: Conference, existing: dict[str, Speaker], users_by_email: dict[str, object], now: datetime, ) -> Speaker: """Build or update a Speaker instance from an API speaker DTO.""" if api_speaker.code in existing: speaker = existing[api_speaker.code] speaker.name = api_speaker.name speaker.biography = api_speaker.biography speaker.avatar_url = api_speaker.avatar_url speaker.email = api_speaker.email speaker.synced_at = now if api_speaker.email and speaker.user is None: matched = users_by_email.get(api_speaker.email.lower()) if matched: speaker.user = matched return speaker user = users_by_email.get(api_speaker.email.lower()) if api_speaker.email else None return Speaker( conference=conference, pretalx_code=api_speaker.code, name=api_speaker.name, biography=api_speaker.biography, avatar_url=api_speaker.avatar_url, email=api_speaker.email, synced_at=now, user=user, ) def _parse_iso_datetime(value: str) -> datetime | None: """Parse an ISO 8601 string into a datetime, returning ``None`` on failure.""" if not value: return None try: return datetime.fromisoformat(value) except (ValueError, TypeError): # fmt: skip return None def _classify_slot(title: str, code: str) -> str: """Determine the slot type from a Pretalx slot's title and code.""" if code: return ScheduleSlot.SlotType.TALK lower_title = title.lower() if "break" in lower_title or "lunch" in lower_title: return ScheduleSlot.SlotType.BREAK if "social" in lower_title or "party" in lower_title: return ScheduleSlot.SlotType.SOCIAL return ScheduleSlot.SlotType.OTHER