In this article, we create the passing part of the exam, the architecture is composed of:
kubernetes service for Taking Exam UI.
Knative service for taking exams.
MongoDb for storing student answers.
KafkaConnect to capture changed data on MongoDb and move it to a KafkaTopic.
Knative service for calculating the scoreboard and send it into a topic.
We will follow the same approach in the previous article, we tackle dependency-free services first. the the UI app will require the knative service and mongo to work, so we start with knative service and mongo and then we move into UI and finally the Kafka connect integration with mongodb and knative.
We download the function take_exam.py file, create a FastAPI wrapper but this time there is no event instead the function is called directly from the UI.
If we inspect the take_exam.py code, we can see it’s expecting a queryStringParameters dict inside the event and inside it a string called: object_name which will be referencing the exam file.
and reference the image and we don’t forget the Minio environment values.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
image:repository:gitea.enkinineveh.space/gitea_admin/exam-take-fnpullPolicy:IfNotPresent# Overrides the image tag whose default is the chart appVersion.tag:"v1"...env:normal:BUCKET_NAME:"exams"secret:AWS_ACCESS_KEY_ID:"minio"AWS_SECRET_ACCESS_KEY:"minio123"AWS_ENDPOINT_URL_S3:"http://minions-hl.minio-tenant:9000"
After adding the helm chart to helmfile release and apply it, we inspect the knative service to get the ingress URL, which will be used by the frontend later.
1
2
3
4
kubectl get kservice exam-taking-fn -n exam
NAME URL LATESTCREATED LATESTREADY READY REASON
exam-taking-fn http://exam-taking-fn.exam.svc.cluster.local exam-taking-fn-00001 exam-taking-fn-00001 True
A token to decode the authenticated user’s email, we will escape this part for now as it requires an authenticated service, we will pass a static email for every student.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# headers = _get_websocket_headers()# token = headers.get('X-Amzn-Oidc-Data')# parts = token.split('.')# if len(parts) > 1:# payload = parts[1]# # Decode the payload# decoded_bytes = base64.urlsafe_b64decode(payload + '==') # Padding just in case# decoded_str = decoded_bytes.decode('utf-8')# decoded_payload = json.loads(decoded_str)# # Extract the email# email = decoded_payload.get('email', 'Email not found')# print(email)# else:# print("Invalid token")email="tunis@gmail.com"
And to save the answers, a Dynamodb was used. We replaced the code with MongoClient and passed the host and table name as environment variables.
image:repository:gitea.enkinineveh.space/gitea_admin/exam-take-frontendpullPolicy:IfNotPresent# Overrides the image tag whose default is the chart appVersion.tag:"v1"service:type:ClusterIPport:8501ingress:enabled:trueclassName:"nginx"annotations:nginx.org/proxy-connect-timeout:"3600s"nginx.org/proxy-read-timeout:"3600s"nginx.org/client-max-body-size:"4m"nginx.org/proxy-buffering:"false"nginx.org/websocket-services:exam-taking-frontend-chartshosts:- host:exam-taking-frontend.enkinineveh.spacepaths:- path:/pathType:ImplementationSpecificbackend:service:name:exam-taking-frontend-chartsport:number:8501tls:- secretName:enkinineveh.space-tls-prodhosts:- exam-taking-frontend.enkinineveh.space
Add the chart into helmfile, apply it
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
releases:....- name:exam-generation-frontendchart:./frontend/exam-generation-app/chartsnamespace:"exam"- name:exam-taking-frontend# we added thischart:./frontend/exam-taking-app/chartsnamespace:"exam"- name:exam-generation-fnchart:./knative/ExamGenFn/chartsnamespace:"exam"- name:exam-taking-fn# we added thischart:./knative/ExamTakeFn/chartsnamespace:"exam"
and we can access the UI Now.
Now we try the whole thing, remove past files and start, upload the supposed test pdf file, wait for generation, check the taking-exam app, we see it listed the exam, we click, and it generates a form, we answer and voila.
Finally let’s check the mongodb table for the answers, we list documents inside the score table, everything looks good.
here is a demo for more visual pleasing experience
One of the reasons I choose Kafka besides its seamless integration with knative, is the ability to capture data changes (CDC) from other sources and forward it to topics, this ability is called Kafka Connect.
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka® and other data systems. If we want to capture data the time it gets added into Mongodb, we must configure the Kafka Connect to use the Mongodb Connector which internally listens for Mongodb CDC.
Kafka Connect will be deployed as a separate Cluster but under the same kubernetes cluster of course
Here is the yaml kafka-connect.yaml for creating the cluster:
bootstrapServers for connecting to the Kafka server.
group.id unique id for defining group of workers
And those config are internal topic for managing connector and task status, configuration, storage data
And the most important property is image, the image must be from strimzi/Kafka, and it should add the mongodb connector under plugins folder to be used later by connectors, I deployed my own image , you can find the code in the repo.
Setting use-connector-resources to true enables KafkaConnectors to create, remove, and reconfigure connectors
The last interesting part now, is adding the connector:
We specify the class for MongoDbConnector plugin, and the last properties are needed for mongodb, here the topic prefix is “mongo-trigger-topic”, but we must create a topic with following name, mongo-trigger-topic.exam.score, why?
Because KafkaConnector use this format: prefix.db.colletion to forward events, means if we add record in score collection the topic name should, <topic.prefix>.exam.score
{"payload":{"before":null,"after":"{\"_id\": {\"$oid\": \"66a279b8f98a9d0379570575\"},\"email\": \"tunisia@gmail.com\",\"score\": 0,\"result\": \"failed\",\"details\": [{\"question\": \"Which country of those countries located in balkans ?\",\"user_answer\": \"Germany\",\"correct_answer\": \"Romania\",\"is_correct\": false}]}","updateDescription":null,"source":{"version":"2.7.0.Final","connector":"mongodb","name":"email-topic","ts_ms":1721924024000,"snapshot":"false","db":"exams","sequence":null,"ts_us":1721924024000000,"ts_ns":1721924024000000000,"collection":"score","ord":1,"lsid":null,"txnNumber":null,"wallTime":null},"op":"c","ts_ms":1721924024568,"transaction":null}}
as we’re adding a new data the before property is empty and the after should contains the persisted data.
From the architecture you may notice we need a function to consume the stored events in mongo-trigger-topic.exam.score. The code is using SNS and DynamoDb we replace them with Kafka and mongo.
We start by initialising the KafkaProducer for sending to email Topic. The dynamodb_to_json function will be replaced by mongodb_to_json.
1
2
3
4
5
6
7
8
9
10
...fromkafkaimportKafkaProducerproducer=KafkaProducer(bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVER"])# Utility function to convert DynamoDB item to regular JSONdefmongodb_to_json(mongo_item):returnjson.loads(mongo_item)...
Then we remove the for loop as knative doesn’t use batching stream and replaceit with a code to get event data from ‘after’ property and at the end we send messsage to Email Topic.
...deflambda_handler(event,context):topic_arn=os.environ['EMAIL_TOPIC_ARN']image=event['payload']['after']# Convert DynamoDB JSON to regular JSONitem_json=mongodb_to_json(image)# Format the message as a score cardmessage=format_score_card(item_json)try:producer.send(topic_arn,message.encode())exceptExceptionase:print(f"Error sending Kafka notification: {e}")raisereturn{'statusCode':200,'body':json.dumps('Lambda executed successfully!')}...
as the Kafka Connector will persist the added data into a topic, we should create a KafkaSource to consume it and forward it to the mongo function.
Now let’s test the entire process, we first open a side terminal for watching email-topic data.
Visit the frontend app, select an exam, answer the questions, wait for second and here is the data sent to email-topic.
Now, we finished both parts, the generation and the taking of the exam parts, we will move into securing the access to authenticated accounts and later send notification to educator about student scores.