Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 148 additions & 4 deletions invenio_app_rdm/upgrade_scripts/migrate_13_0_to_14_0.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 CERN.
# Copyright (C) 2026 Graz University of Technology.
#
# Invenio-App-RDM is free software; you can redistribute it and/or modify
# it under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -18,14 +19,21 @@
2. Record with resource type publication-thesis with a DOI and no draft
3. Record with resource type publication-thesis with a no DOI and an existing draft
4. Records with multiple versions

5. Update requests to make it ready for the commenting feature
"""

from click import secho
import time
from datetime import datetime, timezone

from click import echo, progressbar, secho
from invenio_access.permissions import system_identity
from invenio_db import db
from invenio_drafts_resources.resources.records.errors import DraftNotCreatedError
from invenio_rdm_records.proxies import current_rdm_records_service as records_service
from invenio_search.api import RecordsSearchV2
from invenio_requests.records.api import RequestEvent
from invenio_search import current_search_client
from invenio_search.api import RecordsSearchV2, build_alias_name


def run_upgrade(migrate_record, migrate_draft):
Expand Down Expand Up @@ -79,8 +87,15 @@ def run_upgrade(migrate_record, migrate_draft):
secho(f"Draft {draft_id} failed to update", fg="red")
errored_draft_ids.append((draft_id, error))

print(f"Errored record IDs:", *errored_record_ids, sep="\n")
print(f"Errored draft IDs:", *errored_draft_ids, sep="\n")
if len(errored_record_ids) > 0:
secho(f"Errored record IDs: {errored_record_ids}", fg="red")
else:
secho("records have been updated successfully", fg="green")

if len(errored_draft_ids) > 0:
secho(f"Errored draft IDs: {errored_draft_ids}", fg="red")
else:
secho("drafts have been updated successfully", fg="green")


def run_update_for_resource_type():
Expand Down Expand Up @@ -243,6 +258,135 @@ def migrate_resource_type_in_draft(draft_id):
secho("Resource type update has finished.", fg="green")


def run_update_for_request():
"""Update request events."""
secho("Starting parent_child field migration for parent comments...", fg="green")

# Get the OpenSearch client and index name
index_name = build_alias_name(RequestEvent.index._name)

echo(f"Target index: {index_name}")

# Capture migration start timestamp
migration_start_time = datetime.now(timezone.utc).isoformat()
echo(f"Migration timestamp: {migration_start_time}")

# Query for documents that need updating (created before migration, without parent_child field)
def get_pending_count():
"""Get count of documents that still need updating."""
return current_search_client.count(
index=index_name,
body={
"query": {
"bool": {
"must": [
# Created before migration started
{"range": {"created": {"lt": migration_start_time}}},
],
"must_not": [
# Has parent_id (is a child)
{"exists": {"field": "parent_id"}},
# Already has parent_child field
{"exists": {"field": "parent_child"}},
],
}
}
},
)

# Initial count
initial_count_response = get_pending_count()
initial_count = initial_count_response["count"]

echo(f"Found {initial_count} parent comments to update")

# Trigger the update (async with wait_for_completion=False)
echo("Triggering update_by_query...")
update_response = current_search_client.update_by_query(
index=index_name,
body={
"query": {
"bool": {
"must": [
# Created before migration started
{"range": {"created": {"lt": migration_start_time}}},
],
"must_not": [
# Has parent_id (is a child)
{"exists": {"field": "parent_id"}},
# Already has parent_child field
{"exists": {"field": "parent_child"}},
],
}
},
"script": {
"source": "ctx._source.parent_child = ['name': 'parent']",
"lang": "painless",
},
},
wait_for_completion=False,
refresh=True,
)

task_id = update_response.get("task")
if task_id:
echo(f"Task ID: {task_id}")

# Poll until all documents are updated
total_updated = 0
poll_count = 0

# Poll interval in seconds to recheck remaining comments without tagged as parents
poll_interval = 10

echo(f"Polling cluster every {poll_interval}s until completion...")
echo(f"Checking for documents created before {migration_start_time}")

with progressbar(
length=initial_count,
label="Migrating parent comments",
show_eta=True,
) as bar:
bar.update(0)

while initial_count != 0:
poll_count += 1
time.sleep(poll_interval)

# Check how many documents still need updating
pending_response = get_pending_count()
pending_count = pending_response["count"]

# Calculate how many have been updated
updated_since_last = (initial_count - total_updated) - pending_count
if updated_since_last > 0:
bar.update(updated_since_last)
total_updated += updated_since_last

# Check if we're done
if pending_count == 0:
# Make sure we update to 100%
remaining = initial_count - total_updated
if remaining > 0:
bar.update(remaining)
break

# Progress update every 10 polls
if poll_count % 10 == 0:
echo(f"Still processing... {pending_count} documents remaining")

secho("✓ Migration complete!", fg="green")
echo(f"Total updated: {initial_count} parent comments")

# Final verification
final_pending = get_pending_count()["count"]

if final_pending == 0:
secho("✓ Verification passed: All documents updated", fg="green")
else:
secho(f"⚠ Warning: {final_pending} documents still pending", fg="yellow")


def execute_upgrade():
"""Execute the upgrade from InvenioRDM 13.0 to 14.0.

Expand Down
Loading