feat: auto import course structure on course publish (#43)

This commit is contained in:
Danyal Faheem 2024-09-02 18:11:13 +05:00 committed by GitHub
parent 76fcf04cbb
commit e1fd75464c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 187 additions and 2 deletions

View File

@ -132,7 +132,9 @@ To restrict a given user to one or more courses or organizations, select the cou
Refreshing course block data Refreshing course block data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Course block IDs and names are loaded from the Open edX modulestore into the datalake. After making changes to your course, you might want to refresh the course structure stored in the datalake. To do so, run:: Cairn has a ``cairn-watchcourses`` service that looks for changes to the course structure and refreshes the course structure in the datalake automatically. However, the changes may take up to 5 minutes to show up in superset as this service utilizes batch processing.
If you would like to manually refresh the course structure, run::
tutor local do init --limit=cairn tutor local do init --limit=cairn

View File

@ -0,0 +1 @@
- [Improvement] Auto import course structure to clickhouse on course publish by parsing CMS logs. (by @Danyal-Faheem)

View File

@ -327,3 +327,56 @@ spec:
persistentVolumeClaim: persistentVolumeClaim:
claimName: cairn-postgresql claimName: cairn-postgresql
{% endif %} {% endif %}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: cairn-watchcourses
labels:
app.kubernetes.io/name: cairn-watchcourses
spec:
selector:
matchLabels:
app.kubernetes.io/name: cairn-watchcourses
template:
metadata:
labels:
app.kubernetes.io/name: cairn-watchcourses
spec:
containers:
- name: cairn-watchcourses
image: {{ DOCKER_IMAGE_OPENEDX }}
command: ["/bin/bash"]
args: ["-c", "python /openedx/scripts/server.py"]
volumeMounts:
- mountPath: /openedx/edx-platform/lms/envs/tutor/
name: settings-lms
- mountPath: /openedx/edx-platform/cms/envs/tutor/
name: settings-cms
- mountPath: /openedx/config
name: config
- mountPath: /openedx/scripts
name: scripts
- mountPath: /openedx/clickhouse-auth.json
name: clickhouse-auth
subPath: auth.json
securityContext:
allowPrivilegeEscalation: false
ports:
- containerPort: 9282
volumes:
- name: settings-lms
configMap:
name: openedx-settings-lms
- name: settings-cms
configMap:
name: openedx-settings-cms
- name: config
configMap:
name: openedx-config
- name: scripts
configMap:
name: cairn-openedx-scripts
- name: clickhouse-auth
configMap:
name: cairn-clickhouse-auth

View File

@ -43,3 +43,15 @@ spec:
protocol: TCP protocol: TCP
selector: selector:
app.kubernetes.io/name: cairn-superset app.kubernetes.io/name: cairn-superset
---
apiVersion: v1
kind: Service
metadata:
name: cairn-watchcourses
spec:
type: NodePort
ports:
- port: 9282
protocol: TCP
selector:
app.kubernetes.io/name: cairn-watchcourses

View File

@ -13,3 +13,11 @@ cairn-superset-worker-beat:
environment: environment:
FLASK_ENV: development FLASK_ENV: development
cairn-watchcourses:
<<: *openedx-service
ports:
- "9282:9282"
networks:
default:
aliases:
- "cairn-watchcourses"

View File

@ -85,3 +85,13 @@ cairn-postgresql:
depends_on: depends_on:
- permissions - permissions
{% endif %} {% endif %}
cairn-watchcourses:
image: {{ DOCKER_IMAGE_OPENEDX }}
command: "python /openedx/scripts/server.py"
restart: unless-stopped
volumes:
- ../apps/openedx/settings/lms:/openedx/edx-platform/lms/envs/tutor:ro
- ../apps/openedx/settings/cms:/openedx/edx-platform/cms/envs/tutor:ro
- ../apps/openedx/config:/openedx/config:ro
- ../plugins/cairn/apps/openedx/scripts:/openedx/scripts:ro
- ../plugins/cairn/apps/clickhouse/auth.json:/openedx/clickhouse-auth.json:ro

View File

@ -24,7 +24,8 @@ def main():
description="Import course block information into the datalake" description="Import course block information into the datalake"
) )
parser.add_argument( parser.add_argument(
"-c", "--course-id", action="append", help="Limit import to these courses" "-c", "--course-id", action="extend", nargs='*',
help="Limit import to these courses"
) )
args = parser.parse_args() args = parser.parse_args()

View File

@ -0,0 +1,62 @@
"""
This module provides an HTTP service for importing course data into ClickHouse.
It defines a single HTTP endpoint that allows for the submission of course IDs,
which are then processed and used to trigger a subprocess for data import.
Functions:
- import_courses_to_clickhouse(request): Handles POST requests to '/courses/published/'.
Validates the input data, verifies course IDs, and triggers an external Python script
to import the data into ClickHouse.
Usage:
- python server.py
- Run this module to start the HTTP server. It listens on port 9282 and processes
requests sent to the '/courses/published/' endpoint.
"""
import logging
import subprocess
from aiohttp import web
from opaque_keys.edx.locator import CourseLocator
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)
async def import_courses_to_clickhouse(request):
data = await request.json()
if not isinstance(data, list):
return web.json_response({"error": f"Incorrect data format. Expected list, got {data.__class__}."}, status=400)
course_ids = []
for course in data:
course_id = course.get("course_id")
if not isinstance(course_id, str):
return web.json_response({"error": f"Incorrect course_id format. Expected str, got {course_id.__class__}."}, status=400)
# Get the list of unique course_ids
unique_courses = list({course['course_id']: course for course in data}.values())
course_ids = []
for course in unique_courses:
course_id = course['course_id']
# Verify course_id is a valid course_id
try:
CourseLocator.from_string(course_id)
except Exception as e:
log.exception(f"An error occured: {str(e)}")
return web.json_response({"error": f"Incorrect arguments. Expected valid course_id, got {course_id}."}, status=400)
course_ids.append(course_id)
subprocess.run(["python", "/openedx/scripts/importcoursedata.py", "-c", *course_ids], check=True)
return web.Response(status=204)
app = web.Application()
app.router.add_post('/courses/published/', import_courses_to_clickhouse)
web.run_app(app, host='0.0.0.0', port=9282)

View File

@ -38,6 +38,21 @@ source = '''
.message = parse_json!(.message) .message = parse_json!(.message)
''' '''
# Parse CMS logs for course publishing event
[transforms.course_published]
type="remap"
inputs = ["openedx_containers"]
source = '''
parsed, err_regex = parse_regex(.message, r'Updating course overview for (?P<course_id>\S+?)(?:\s|\.|$)')
if err_regex != null {
log("Unable to parse course_id from log message: " + err_regex, level: "error")
abort
}
. = {"course_id": parsed.course_id}
'''
drop_on_error = true
drop_on_abort = true
### Sinks ### Sinks
# Log all events to stdout, for debugging # Log all events to stdout, for debugging
@ -58,4 +73,25 @@ database = "{{ CAIRN_CLICKHOUSE_DATABASE }}"
table = "_tracking" table = "_tracking"
healthcheck = true healthcheck = true
# Log course_published event to stdout for debugging
[sinks.course_published_out]
type = "console"
inputs = ["course_published"]
encoding.codec = "json"
encoding.only_fields = ["course_id"]
# Send course_id to watchcourses
[sinks.watchcourse]
type = "http"
method = "post"
encoding.codec = "json"
inputs = ["course_published"]
# Batch events together to reduce the number of times
# the importcoursedata script is run
# Vector will wait 300 secs (5 mins) from the first event
# or until there are 10 events to trigger the watchcourses service
batch.timeout_secs = 300
batch.max_events = 10
uri = "http://cairn-watchcourses:9282/courses/published/"
{{ patch("cairn-vector-common-toml") }} {{ patch("cairn-vector-common-toml") }}