Skip to content

Assign urls to edx contentfiles when possible #2420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 125 additions & 3 deletions learning_resources/etl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
from pathlib import Path
from subprocess import check_call
from tempfile import TemporaryDirectory
from typing import Optional

import boto3
import rapidjson
import requests
from defusedxml import ElementTree
from django.conf import settings
from django.utils.dateparse import parse_duration
from django.utils.text import slugify
Expand Down Expand Up @@ -341,7 +343,7 @@ def documents_from_olx(
"mime_type": mimetype,
"archive_checksum": archive_checksum,
"file_extension": extension_lower,
"source_path": f"{path}/{filename}",
"source_path": f"{path}/{filename.replace(' ', '_')}",
},
)

Expand Down Expand Up @@ -407,7 +409,126 @@ def text_from_sjson_content(content: str):
return " ".join(data.get("text", []))


def get_root_url_for_source(etl_source: str) -> tuple[str, str]:
"""
Get the base URL and path for an ETL source

Args:
etl_source (str): The ETL source path

Returns:
tuple[str, str]: The base URL and path
"""
mapping = {
ETLSource.mitxonline.value: settings.CONTENT_BASE_URL_MITXONLINE,
ETLSource.xpro.value: settings.CONTENT_BASE_URL_XPRO,
ETLSource.oll.value: settings.CONTENT_BASE_URL_OLL,
ETLSource.mit_edx.value: settings.CONTENT_BASE_URL_EDX,
}
return mapping.get(etl_source)


def is_valid_uuid(uuid_string: str) -> bool:
"""
Check if a string is a valid UUID
"""
try:
uuid.UUID(uuid_string)
except ValueError:
return False
return True


def get_url_from_module_id(
module_id: str,
run: LearningResourceRun,
video_srt_metadata: Optional[dict] = None,
) -> str:
"""
Get the URL for a module based on its ID

Args:
module_id (str): The module ID
run (LearningResourceRun): The run associated with the module

Returns:
str: The URL for the module
"""
if not module_id:
return None
root_url = get_root_url_for_source(run.learning_resource.etl_source)
# OLL needs to have 'course-v1:' added to the run_id
run_id = (
f"course-v1:{run.run_id}"
if run.learning_resource.etl_source == ETLSource.oll.value
else run.run_id
)
if module_id.startswith("asset"):
video_meta = video_srt_metadata.get(module_id, {}) if video_srt_metadata else {}
if video_meta:
# Link to the parent video
return f"{root_url}/courses/{run_id}/jump_to/{video_meta.split('@')[-1]}"
return f"{root_url}/{module_id}"
elif module_id.startswith("block") and is_valid_uuid(module_id.split("@")[-1]):
return f"{root_url}/courses/{run_id}/jump_to_id/{module_id.split('@')[-1]}"
else:
return None


def parse_video_transcripts_xml(
run: LearningResourceRun, xml_content: str, path: Path
) -> dict:
"""
Parse video XML content and create a mapping of
transcript edx_module_id to video edx_module_id
"""
transcript_mapping = {}
try:
root = ElementTree.fromstring(xml_content)

# Get the video url_name from the root video element
video_url_name = root.get("url_name")
if not video_url_name:
log.warning("No url_name found in video XML")
return {}

# Find all transcript elements and extract their src attributes
for transcript in root.findall(".//transcript"):
transcript_src = transcript.get("src")
if transcript_src:
transcript_mapping[
get_edx_module_id(f"static/{transcript_src}", run)
] = get_edx_module_id(str(path), run)
except ElementTree.ParseError:
log.exception("Error parsing video XML for %s: %s", run, path)
return transcript_mapping


def get_video_metadata(olx_path: str, run: LearningResourceRun) -> dict:
"""
Get metadata for video SRT/VTT files in an OLX path
"""
video_transcript_mapping = {}
video_path = Path(olx_path, "video")
if not video_path.exists():
log.debug("No video directory found in OLX path: %s", olx_path)
return video_transcript_mapping
for root, _, files in os.walk(str(Path(olx_path, "video"))):
for filename in files:
extension_lower = Path(filename).suffix.lower()
if extension_lower == ".xml":
with Path.open(Path(root, filename), "rb") as f:
video_xml = f.read().decode("utf-8")

# Parse the XML and get transcript mappings
transcript_mapping = parse_video_transcripts_xml(run, video_xml, f)
video_transcript_mapping.update(transcript_mapping)

return video_transcript_mapping


def _process_olx_path(olx_path: str, run: LearningResourceRun, *, overwrite):
video_srt_metadata = get_video_metadata(olx_path, run)
for document, metadata in documents_from_olx(olx_path):
source_path = metadata.get("source_path")
edx_module_id = get_edx_module_id(source_path, run)
Expand Down Expand Up @@ -465,6 +586,7 @@ def _process_olx_path(olx_path: str, run: LearningResourceRun, *, overwrite):
"file_extension": file_extension,
"source_path": source_path,
"edx_module_id": edx_module_id,
"url": get_url_from_module_id(edx_module_id, run, video_srt_metadata),
**content_dict,
}
)
Expand Down Expand Up @@ -741,7 +863,7 @@ def parse_certification(offeror, runs_data):
)


def iso8601_duration(duration_str: str) -> str or None:
def iso8601_duration(duration_str: str) -> str | None:
"""
Parse the duration from a string and return it in ISO-8601 format

Expand Down Expand Up @@ -821,7 +943,7 @@ def calculate_weeks(num: int, from_unit: str) -> int:
return num


def transform_interval(interval_txt: str) -> str or None:
def transform_interval(interval_txt: str) -> str | None:
"""
Transform any interval units to standard English units
Only languages currently supported are English and Spanish
Expand Down
Loading
Loading