From 426ee40f447375f0d8c38015bcc2ac902c3aa4d1 Mon Sep 17 00:00:00 2001 From: xinyinghou Date: Sun, 24 Aug 2025 19:31:05 +0000 Subject: [PATCH 01/13] Include codetailor-related modules to book_server_api --- bases/rsptx/book_server_api/routers/coach.py | 170 ++++++ .../routers/personalized_parsons/README.md | 22 + .../buggy_code_checker.py | 49 ++ .../personalized_parsons/end_to_end.py | 211 ++++++++ .../evaluate_fixed_code.py | 349 ++++++++++++ .../generate_parsons_blocks.py | 495 ++++++++++++++++++ .../get_parsons_code_distractors.py | 33 ++ .../get_personalized_solution.py | 234 +++++++++ .../personalize_parsons.py | 251 +++++++++ .../personalized_parsons/token_compare.py | 87 +++ 10 files changed, 1901 insertions(+) create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/README.md create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/buggy_code_checker.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/end_to_end.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/generate_parsons_blocks.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/get_parsons_code_distractors.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/get_personalized_solution.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/personalize_parsons.py create mode 100644 bases/rsptx/book_server_api/routers/personalized_parsons/token_compare.py diff --git a/bases/rsptx/book_server_api/routers/coach.py b/bases/rsptx/book_server_api/routers/coach.py index 5d672728a..53e3e2e6f 100644 --- a/bases/rsptx/book_server_api/routers/coach.py +++ b/bases/rsptx/book_server_api/routers/coach.py @@ -21,6 +21,18 @@ # ------------------------- from rsptx.logging import rslogger +# CodeTailor related imports +# ------------------------- +from fastapi.responses import JSONResponse +from rsptx.logging import rslogger +from .personalized_parsons.end_to_end import get_parsons_help +from typing import Optional +import re +from fastapi import status + +from .assessment import get_question_source, SelectQRequest +# Import function for fetching api - comment out for DEV purposes +from rsptx.db.crud.crud import fetch_api_token # .. _APIRouter config: # @@ -65,3 +77,161 @@ async def python_check(request: Request): resultMessage = f"{filename}:{str(e.lineno)}:{str(e.offset)}: {e.args[0]}\n" return resultMessage + + + +# Starting here -- Added code for CodeTailor --- +DEV_API_KEY = "" +# for dev/test -- replace with your own key for local testing + +def extract_parsons_code(html_block): + """ + Given the full HTML/pre block for a Parsons problem extracted from DB, + return only the Parsons code part. + """ + # Remove all HTML tags and extract the code lines + text = re.sub(r"<.*?>", "", html_block, flags=re.DOTALL) + lines = text.strip().splitlines() + if "-----" in lines: + idx = lines.index("-----") + code_lines = lines[idx+1:] + else: + code_lines = lines + + clean_lines = [line for line in code_lines if line.strip() and line.strip() != "====="] + return "\n".join(clean_lines) + + +@router.get("/get_question_html") +async def get_question_html(request: Request, div_id: str): + """ + Fetch and return just the HTML for a single question (case 1). + No grading — points are set to 0. + Falls back to 'LLM-example' if the question is not found. + """ + request_data = SelectQRequest( + selector_id=div_id, + questions=div_id, + points=0, + proficiency=None, + min_difficulty=None, + max_difficulty=None, + not_seen_ever=False, + autogradable=None, + primary=None, + AB=None, + toggleOptions=None, + timedWrapper=None, + limitBaseCourse=None, + ) + + result = await get_question_source(request, request_data) + + html = None + if isinstance(result, dict): + html = result.get("detail") + else: + html = getattr(result, "detail", None) + + # Handle missing or error cases + if not html or "No Questions" in html or "not in the database" in html: + return {"html": "LLM-example"} + + return {"html": html} + + +@router.post("/parsons_scaffolding") +async def parsons_scaffolding( + request: Request, + course: Optional[str] = None, +): + # Import api key and handles errors + api_token = None + try: + if course is None or course == "personalized_parsons" or course == "overview": # the test course for development + # Dev/Test mode testing + rslogger.info("CodeTailor: Using predefined dev API key") + api_token = DEV_API_KEY + else: + api_token = await fetch_api_token( # handles decryption already - comment out for DEV purposes + course_id=course.id, + provider='openai', # hardcoded as openai for now, prompt structures are different for different providers + # if we found instructors tend to use other platforms, we need to handle this later + ) + except Exception as e: + rslogger.error(f"Codetailor: Error fetching API tokens: {e}") + return JSONResponse( + content={"error": f"Error fetching API tokens: {str(e)}"}, + status_code=status.HTTP_400_BAD_REQUEST + ) + + if api_token is None: + return JSONResponse( + content={"error": "CodeTailor: No openai API found"}, + status_code=status.HTTP_400_BAD_REQUEST, + ) + + # Start to process the request from activecode.js + req_bytes = await request.body() + req = req_bytes.decode("utf-8") + data = await request.json() + + language = data.get("language") # Capture the question language from the front end + student_code = data.get("student_code") # Capture the student code from the front end + problem_id = data.get("problem_id") # Capture the problem name from the front end + personalization_level = data.get("personalization_level") # Capture the personalization level set by the instructor from the front end + parsonsexample = data.get("parsonsexample") # Capture whether the scaffolding puzzle is a pre-defined example or LLM-example + problem_description = data.get("problem_description") # Capture the problem description from the front end + internal_test_case = data.get("internal_test_case") # Capture the internal test case from the front end + print("start_to: get_parsons_help", api_token, language, personalization_level) + + adaptive_attr = 'data-adaptive="true"' + no_indent_attr = 'data-noindent="false"' + language_attr = f'data-language="{language}"' + # this scaffolding_attr is used in the parsons.js to determine whether the Parsons puzzle is created as automatic scaffolding puzzle or not + scaffolding_attr = f'data-scaffolding="true"' + parsons_attrs = f"{language_attr} {adaptive_attr} {no_indent_attr} {scaffolding_attr}".strip() + + # extract the HTML of the example Parsons problem, otherwise it is "LLM-example" + parsonsexample_html = None + if parsonsexample != "LLM-example": + result = await get_question_html(request, div_id=parsonsexample) + parsonsexample_html = result["html"] + parsonsexample_code = extract_parsons_code(parsonsexample_html) + print("Fetched Parsons Example HTML:", parsonsexample_html[:200]) + else: + parsonsexample_code = "LLM-example" + + + def parsons_help(language, student_code, problem_id, problem_description, internal_test_case, parsonsexample_code, personalization_level): + """ + Call the get_parsons_help function to get the personalized Parsons puzzle and the solution code. + """ + input_dict = { + "Problem Name": problem_id, + "Problem Description": problem_description, + "Unittest_Code": internal_test_case, + "Example": parsonsexample_code, # This is the html of the example Parsons problem + "CF (Code)": student_code + } + return get_parsons_help(api_token, language, input_dict, personalization_level) + + if personalization_level in ["Solution", "Multiple"]: + personalized_code_solution, personalized_Parsons_block, personalized_solution_generation_type, personalized_generation_result_type = parsons_help(language, student_code, problem_id, problem_description, internal_test_case, parsonsexample_code, personalization_level) + if personalized_code_solution == "": + return "emptyHelpCode" + "||split||" + "emptyHelpParsons" + "||split||" + personalization_level + "||split||" + personalized_generation_result_type + if personalized_Parsons_block == "Correct_Code": + return personalized_code_solution + "||split||" + "correctCode" + "||split||" + personalization_level + "||split||" + personalized_generation_result_type + else: + personalized_Parsons_block = re.sub(r'<(?=\S)', '< ', personalized_Parsons_block) + personalized_Parsons_html = f""" + + """ + print("personalized_Parsons_html", personalized_Parsons_html, "personalization_level", personalization_level, "personalized_generation_result_type", personalized_generation_result_type) + return personalized_code_solution + "||split||" + personalized_Parsons_html + "||split||" + personalization_level + "||split||" + personalized_generation_result_type + else: + # Handle the case where personalization_level is not valid + rslogger.error(f"Invalid personalization_level: {personalization_level}") + return JSONResponse(content={"error": "Invalid personalization_level"}, status_code=400) diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/README.md b/bases/rsptx/book_server_api/routers/personalized_parsons/README.md new file mode 100644 index 000000000..6e88b40ca --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/README.md @@ -0,0 +1,22 @@ +### CodeTailor - Backend + +The core logic of the CodeTailor backend involves capturing the student's current code from activecode and using it to generate a Parsons puzzle (in .rst), which is then sent back to the frontend. + +In coach.py: +code in editor (activecode) --> Backend captures code --> Puzzle Generation with an LLM --> Response to Frontend (a scaffolding puzzle) + +```text +book_server_api/ +├── routers/ +│ ├── personalized_parsons/ +│ │ ├── end_to_end.py +│ │ ├── buggy_code_checker.py +│ │ ├── get_personalized_solution.py +│ │ ├── evaluate_fixed_code.py +│ │ ├── personalize_parsons.py +│ │ ├── generate_parsons_block.py +│ │ ├── get_parsons_code_distractor.py +│ │ └── token_compare.py +├──coach.py +├──assessment.py + diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/buggy_code_checker.py b/bases/rsptx/book_server_api/routers/personalized_parsons/buggy_code_checker.py new file mode 100644 index 000000000..5e11c5335 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/buggy_code_checker.py @@ -0,0 +1,49 @@ +def clean_python_code(code): + """ + Cleans the given Python code by removing comments, empty lines, + and lines that start with 'def' or 'import' - to figure out if the student has contributed any code. + Input: code (str): The Python code to be cleaned. + Output: cleaned_code (str): The cleaned Python code. + """ + lines = [] + for line in code.split('\n'): + # Remove comments and whitespace + line = line.split('#')[0].strip() + # Skip empty lines and lines starting with 'def' or 'import' + if line and not line.startswith(('def ', 'import ', 'from ', 'class ')): + lines.append(line) + return '\n'.join(lines) + +def clean_java_code(code): + """ + Cleans the given Java code by removing comments, empty lines, + and lines that start with 'public' or 'import' - to figure out if the student has contributed any code. + Input: code (str): The Java code to be cleaned. + Output: cleaned_code (str): The cleaned Java code. + """ + lines = [] + for line in code.split('\n'): + # Remove comments and whitespace + line = line.split('//')[0].strip() + # Skip empty lines and lines starting with 'def' or 'import' + if line and not line.startswith(('public ', 'import ')): + lines.append(line) + return '\n'.join(lines) + +def student_code_checker(language, buggy_code): + """ + Check if the buggy code contains any student-contributed code. + Input: language (str): The programming language of the code ('python' or 'java'). + buggy_code (str): The buggy code to be checked. + Output: has_contributed_code (bool): True if the student has contributed code, False otherwise. + """ + if language == "java": + cleaned_buggy = clean_java_code(buggy_code) + else: + cleaned_buggy = clean_python_code(buggy_code) + + # This is used to check if students contributed any code + if len(cleaned_buggy) == 0: + return False + + return True \ No newline at end of file diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/end_to_end.py b/bases/rsptx/book_server_api/routers/personalized_parsons/end_to_end.py new file mode 100644 index 000000000..009fad9c7 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/end_to_end.py @@ -0,0 +1,211 @@ +# import necessary packages +from .buggy_code_checker import * +from .get_personalized_solution import * +from .evaluate_fixed_code import * +from .generate_parsons_blocks import * +from .personalize_parsons import * +from .token_compare import * +from .get_parsons_code_distractors import * + + +def get_parsons_help(api_token, language, dict_buggy_code, personalize_level): + """ + The main API to connect with coach.py when students request help. + Get personalized Parsons scaffolding artifacts based on the provided buggy code and other parameters. + Args: + api_token: The API token for GenerativeAI -- now only support OpenAI + language: The programming language of the code + dict_buggy_code: A dictionary containing the current buggy code, problem description, unittest code, and a predefined example + personalize_level: The level of personalization to apply (e.g., "Solution", "Multiple") + Returns: + A tuple containing the final fixed code, the final Parsons block, the solution generation type, and the generation result type. + """ + problem_description = dict_buggy_code["Problem Description"].replace("\\n", "\n") + buggy_code = dict_buggy_code["CF (Code)"].replace("\\n", "\n") + # can use a function to extract them in the future + default_start_code = "" + default_test_code = "" + unittest_code = dict_buggy_code["Unittest_Code"].replace("\\n", "\n") + # This can be "LLM-example" or a predefined example code + predefined_example = dict_buggy_code["Example"].replace("\\n", "\n") + # check if language is java, keep it as it is, otherwise, assign it to "python" as default + if language.lower() == "java" or language.lower() == "python": + language = language.lower() + else: + language = "python" # Default to python if not specified + + if personalize_level == "Solution" or personalize_level == "Multiple": + cleaned_fixed_code, generation_result_type = generate_personalized_fixed_code(api_token, language, problem_description, buggy_code, default_start_code, default_test_code, predefined_example, unittest_code) + final_fixed_code = cleaned_fixed_code.lstrip() + else: + return "Error: invalid personalize_level" + + if personalize_level == "Solution": + # generate Parsons puzzles with personalization only at the solution level with paired distractors + final_Parsons_block = generate_multi_personalized_Parsons_blocks("Solution", language, problem_description, buggy_code, final_fixed_code, default_start_code, default_test_code, unittest_code) + elif personalize_level == "Multiple": + # generate Parsons puzzles with personalization at both solution and multiple levels with paired distractors and potentially settled block lines + final_Parsons_block = generate_multi_personalized_Parsons_blocks("Multiple", language, problem_description, buggy_code, final_fixed_code, default_start_code, default_test_code, unittest_code) + else: + return "Error: invalid personalize_level" + + return final_fixed_code, final_Parsons_block, "personalization", generation_result_type + +def request_fixed_code_from_openai(api_token, language, problem_description, buggy_code, default_start_code, default_test_code, example_solution, unittest_code, solution_generation, old_fixed_code, attempt_type, situation, failure_reason, unittest_result): + """ + Request a fixed code from GenerativeAI based on the buggy code and other parameters. + Inputs: + api_token (str): The API token for GenerativeAI -- now only support OpenAI + language (str): The programming language of the code - Python or Java + problem_description (str): textual question description + buggy_code (str): The buggy code provided by the student + default_start_code (str): The default starting code for the question + default_test_code (str): The default test code for the question + example_solution (str): An example solution for the question + unittest_code (str): The unittest code for the question + solution_generation (int): The current generation attempt number -- starts from 2, ends at 0 + old_fixed_code (str): The previously generated fixed code, if any + attempt_type (str): The type of attempt, e.g., "new", "repeat" - the LLM request might be different based on the attempt_type + situation (str): The situation for the request, e.g., "a correct answer" + failure_reason (str): The reason for the failure, e.g., "not correct" + unittest_result (bool): The result of the unittest evaluation for the buggy code + Outputs: + cleaned_fixed_code (str): The cleaned fixed code generated by the LLM + generation_result_type (str): The type of generation result, e.g., "AI_personalized", "example_more_personalized", "example_solution" + """ + cleaned_buggy_code = clean_student_code(buggy_code, default_test_code) + if solution_generation <= 0: + return example_solution.lstrip(), "example_solution" + + # For solution_generation >= 1, fix the code and run tests + fixed_code = get_fixed_code(api_token, language, problem_description, buggy_code, unittest_code, example_solution, attempt_type=attempt_type, situation=situation, old_fixed_code=old_fixed_code) + unittest_result, cleaned_fixed_code = unittest_evaluation(language, fixed_code, default_start_code, default_test_code, unittest_case=unittest_code) + + print("this-round-result:", unittest_result, cleaned_fixed_code) + if not unittest_result: + return example_solution.lstrip(), "example_solution" + + if unittest_result == True: + similarity_personalized = code_similarity_score(cleaned_buggy_code, cleaned_fixed_code, language) + similarity_most_common = code_similarity_score(cleaned_buggy_code, example_solution, language) + + # For other cases, return the more similar one as the personalized result + if similarity_personalized >= similarity_most_common: + return cleaned_fixed_code.lstrip(), "AI_personalized" + else: + return example_solution.lstrip(), "example_more_personalized" + + else: + print("not correct, retrying ... current solution_generation=", solution_generation) + # If the unittest result is not correct, we will retry with the same solution_generation_type, but will provide the incorrect code as part of the system message (attachment) + solution_generation -= 1 + return request_fixed_code_from_openai(language, problem_description, buggy_code, default_start_code, default_test_code, example_solution, unittest_code, solution_generation, old_fixed_code=cleaned_fixed_code, attempt_type="repeat", situation="a correct answer", failure_reason="not correct", unittest_result = False) + +def generate_example_solution(api_token, language, problem_description, unittest_code, predefined_example): + """ + Generate or retrieve an example solution based on the provided parameters. + Inputs: + api_token (str): The API token for GenerativeAI -- now only support OpenAI + language (str): The programming language of the code - Python or Java + problem_description (str): textual question description + unittest_code (str): The unittest code for the question + predefined_example (str): A predefined example solution or the keyword "LLM-example" to generate one using LLM + Outputs: + example_solution (str): The example solution, either generated by LLM or the predefined one + """ + if predefined_example == "LLM-example": + example_solution = get_example_solution(api_token, language, problem_description, unittest_code) + else: + example_solution = predefined_example + return example_solution + + +def generate_personalized_fixed_code(api_token, language, problem_description, buggy_code, default_start_code, default_test_code, predefined_example, unittest_code, API_attempt=0): + """ + Generate a personalized fixed code based on the buggy code and other parameters. + Inputs: + api_token (str): The API token for GenerativeAI -- now only support OpenAI + language (str): The programming language of the code - python or java + problem_description (str): textual question description + buggy_code (str): The buggy code provided by the student + default_start_code (str): The default starting code for the question, current "" + default_test_code (str): The default test code for the question, current "" + predefined_example (str): A predefined example solution or the keyword "LLM-example" to generate one using LLM + unittest_code (str): The unittest code for the question + API_attempt (int): The current API attempt number -- starts from 0 (reserved in case we want to use different API keys in the future) + Outputs: + cleaned_fixed_code (str): The cleaned fixed code generated by the LLM + generation_result_type (str): The type of generation result, e.g., "AI_personalized", "example_more_personalized", "example_solution", "written_code_correct", "empty_beginning" + """ + # check if students contributed any code -- If it is True, then check the correctness + if bool(student_code_checker(language, buggy_code)): + # check whether the existing code is already correct + unittest_result, cleaned_buggy_but_correct_code = unittest_evaluation(language, buggy_code, default_start_code, default_test_code, unittest_case=unittest_code) + if unittest_result != True: + # If the code is not correct, we will TRY to get an example solution first + example_solution = generate_example_solution(api_token, language, problem_description, unittest_code, predefined_example) + + try: + # The first personalized fixed code attempt + result_personalized = request_fixed_code_from_openai( + api_token, language, problem_description, buggy_code, + default_start_code, default_test_code, example_solution, + unittest_code, 2, "", + attempt_type="new", situation="", failure_reason="", unittest_result="" + ) + + return result_personalized + + except Exception as e: + # When there is an error from the LLM API, we will return the example solution + return ("error_personalized", example_solution.lstrip(), "example_solution") + # If the code is correct, we will return the cleaned_buggy_but_correct_code and raise a message + else: + return "written_code_correct", cleaned_buggy_but_correct_code.lstrip(), "written_code" + # If the code is empty, directly return the example solution + else: + example_solution = generate_example_solution(api_token, language, problem_description, unittest_code, predefined_example) + + return "empty_beginning", example_solution.lstrip(), "example_solution" + + +def generate_multi_personalized_Parsons_blocks(personalize_level, language, problem_description, buggy_code, fixed_code, default_start_code, default_test_code, unittest_code): + """ + Generate personalized Parsons blocks based on the student buggy code and fixed code. + Inputs: + personalize_level (str): The level of personalization to apply (e.g., "Solution", "Multiple") + language (str): The programming language of the code - python or java + problem_description (str): textual question description + buggy_code (str): The buggy code provided by the student + fixed_code (str): The fixed code generated by the LLM + default_start_code (str): The default starting code for the question, current "" + default_test_code (str): The default test code for the question, current "" + unittest_code (str): The unittest code for the question + Outputs: + personalized_Parsons_block (str): The generated personalized Parsons block code + """ + buggy_code_for_blocks = clean_student_code(buggy_code, default_test_code) + # add paired distractors on their code when there are some meaningful comparison (one line similarity > a threshold) + code_comparison_pairs, fixed_lines, removed_lines, unchanged_lines, total_similarity = compare_code(buggy_code_for_blocks, fixed_code, default_start_code, language) + + # decide the types of Parsons problems and generate correspoding distractors + Parsons_type, distractors = personalize_Parsons_block(language, problem_description, code_comparison_pairs, buggy_code, fixed_lines, removed_lines, unchanged_lines, total_similarity) + unittest_flag = True + if len(distractors) > 0: + for distractor in distractors.copy().items(): + distractor_correct_line = distractor[0] + # Prepare the code with distractors for unittest evaluation - should not pass the tests this time + code_with_distrator = generate_code_with_distrator(unchanged_lines, fixed_lines, distractor) + unittest_flag, cleaned_code_with_distractors = code_distractor_unittest_evaluation(language, code_with_distrator, default_start_code, default_test_code, unittest_code) + # If the code with distractors passes the unittest, we will remove the distractor from the distractors list + if unittest_flag == True: + distractors.pop(distractor_correct_line) + + if Parsons_type == "Correct": + # if the student code is already correct, no need to generate Parsons problems, just return a message + return "Correct_Code" + else: + # If the student code is incorrect, we will generate Parsons blocks + personalized_Parsons_block = generate_Parsons_block(personalize_level, language, Parsons_type, problem_description, fixed_code, unchanged_lines, fixed_lines, distractors) + + return personalized_Parsons_block diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py new file mode 100644 index 000000000..9c4b23a76 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py @@ -0,0 +1,349 @@ +import unittest +import re +import difflib +from types import ModuleType +import difflib +import threading +import signal +import subprocess +import tempfile +import os +import shutil + +class NullOutput: + def write(self, _): + pass + def flush(self): + pass + +class TimeoutError(Exception): + pass + +def handler(signum, frame): + raise TimeoutError("Test execution exceeded time limit") + +def load_and_run_java_tests(java_code, test_code): + """ + Compile and run Java code with test cases. + Inputs: + java_code (str): The Java code to be tested. + test_code (str): The Java test cases. The test code should contain a public class with a main method to run the tests. + The test code is automatically reformatted based on the unittest_code provided by instructors in the RST file. + Output: bool: True if all tests pass, False otherwise. + """ + def extract_class_name(code): + match = re.search(r'public\s+class\s+(\w+)', code) + if match: + return match.group(1) + else: + raise ValueError("Could not find a public class declaration.") + + temp_dir = tempfile.mkdtemp() + try: + # Extract class names from the code + class_name = extract_class_name(java_code) + test_class_name = extract_class_name(test_code) + + # Write main Java file + code_path = os.path.join(temp_dir, f"{class_name}.java") + with open(code_path, "w") as f: + f.write(java_code) + + # Write test Java file + test_path = os.path.join(temp_dir, f"{test_class_name}.java") + with open(test_path, "w") as f: + f.write(test_code) + + # Compile both + compile_result = subprocess.run( + ["javac", f"{class_name}.java", f"{test_class_name}.java"], + cwd=temp_dir, + capture_output=True, + text=True + ) + if compile_result.returncode != 0: + print("Compilation error:\n", compile_result.stderr) + return False + + # Run the test class + run_result = subprocess.run( + ["java", test_class_name], + cwd=temp_dir, + capture_output=True, + text=True + ) + + if run_result.returncode == 0: + return True + else: + return False + + except Exception as e: + print("Error while running Java tests:", str(e)) + return False + finally: + shutil.rmtree(temp_dir) + +def load_and_run_tests(unittest_case, code_to_test, time_limit=6): + """ + Load and run Python test cases against the provided code. + Inputs: + unittest_case (str): The Python test cases. The test code is automatically reformatted based on the unittest_code provided by instructors in the RST file. + code_to_test (str): The Python code to be tested. + time_limit (int): The time limit for running the tests in seconds. + Output: unittest.TestResult: The result of the test run. + """ + # Set the alarm signal for timeout + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGALRM, handler) + signal.alarm(time_limit) + + try: + # Create a dummy module to hold the test cases + test_module = ModuleType("test_module") + test_module.unittest = unittest + + # Execute the test cases string within the dummy module's namespace + exec(unittest_case, test_module.__dict__) + # Execute the code to test within the desired scope + exec(code_to_test, test_module.__dict__) + # Retrieve the loaded test cases + test_suite = unittest.TestLoader().loadTestsFromModule(test_module) + print("test_suite",test_suite) + # Run the test suite + test_results = unittest.TextTestRunner(verbosity=0, failfast=True, stream=NullOutput()).run(test_suite) + print("test_results",test_results) + + except TimeoutError: + print("test_results", test_results) + return False + finally: + signal.alarm(0) + + return test_results + +def fix_indentation(text): + lines = text.split('\n') + + # Remove leading whitespace for the first line + if lines: + lines[0] = lines[0].lstrip() + + # Find the indentation of the first def/class line + indentation = 0 + for line in lines: + if line.strip().startswith(('def ', 'class ')): + indentation = len(line) - len(line.lstrip()) + break + + # Remove indentation for subsequent lines + for i in range(1, len(lines)): + lines[i] = lines[i][indentation:] + + return '\n'.join(lines) + +def contain_default_starting_code(default_code, code, similarity_threshold=0.95): + if default_code is None or default_code != default_code: # NaN check + return True + else: + # Remove leading whitespace from each line in the subset and code + default_code = '\n'.join(line.lstrip() for line in default_code.split('\n') if line.strip()) + code_stripped = '\n'.join(line.lstrip() for line in code.split('\n') if line.strip()) + # Remove whitespace within strings + default_code_no_whitespace = re.sub(r'(?<=\S)\s+(?=\S)', '', default_code) + code_stripped_no_whitespace = re.sub(r'(?<=\S)\s+(?=\S)', '', code_stripped) + + if default_code_no_whitespace in code_stripped_no_whitespace: + return True + else: + default_lines = [line.lstrip() for line in default_code.split('\n') if line.strip()] + code_lines = [line.lstrip() for line in code.split('\n') if line.strip()] + code_before_last_default_code_line = code_lines[: code_lines.index(default_lines[-1])] + # Calculate similarity ratio using difflib + similarity_ratio = difflib.SequenceMatcher(None, default_lines, code_before_last_default_code_line).ratio() + return similarity_ratio >= similarity_threshold + +def extract_code_line(code): + fixed_pattern = r'\[fixed-code\]:\s*([\s\S]*)' + + if re.findall(fixed_pattern, code, re.DOTALL): + fixed_code = re.findall(fixed_pattern, code, re.DOTALL)[0].strip() + # if the content inside of [fixed-code] does not start with def + else: + fixed_code = code + + fenced_pattern = r'```(.*?)```' + + if re.findall(fenced_pattern, fixed_code, flags=re.DOTALL): + extracted_content = re.findall(fenced_pattern, fixed_code, flags=re.DOTALL) + fixed_code = '\n'.join(extracted_content) + else: + fixed_code = fixed_code + + if not fixed_code.startswith(('def', 'import', 'class')): + if re.findall(r'(?:def|public|class|import)(.*)', fixed_code, re.DOTALL): + match = re.search(r"(public|class|import|def)", fixed_code) + if match: + fixed_code = fixed_code[match.start():] + else: + fixed_code = fixed_code + else: + fixed_code = fixed_code + + return fixed_code + + +def remove_default_testline(code, default_test_code): + # Split the lines to remove by newline + default_test_code_list = default_test_code.strip().split('\n') + # remove the leading whitespace in front of each item in default_test_code_list + default_test_code_list = [line.lstrip() for line in default_test_code_list] + # Remove the lines from the code snippet + modified_code_snippet = '' + for line in code.strip().split('\n'): + if (line.strip() not in default_test_code_list) & ('print' not in line) & (not line.startswith('#')): + modified_code_snippet += line + '\n' + ##print("modified_code_snippet\n", modified_code_snippet) + return modified_code_snippet + +def remove_empty_lines(code): + lines = code.splitlines() + non_empty_lines = [line for line in lines if line.strip() != ""] + return "\n".join(non_empty_lines) + +def remove_python_comments(code): + # Split each line and take the part before the "#" symbol + cleaned_lines = [line.split('#', 1)[0] for line in code.split('\n')] + + # Join the modified lines back to form the cleaned code + cleaned_code = '\n'.join(cleaned_lines) + return cleaned_code + +def remove_java_comments(code): + # Remove single-line comments (//) + cleaned_lines = [line.split('//', 1)[0] for line in code.split('\n')] + cleaned_code = '\n'.join(cleaned_lines) + return cleaned_code.strip() + + +def unittest_evaluation(language, fixed_code, starting_code, default_test_code, unittest_case): + if language == "java": + return java_unittest_evaluation(fixed_code, starting_code, default_test_code, unittest_case) + else: + return python_unittest_evaluation(fixed_code, starting_code, default_test_code, unittest_case) + + +def java_unittest_evaluation(fixed_code, starting_code, default_test_code, unittest_case): + try: + fixed_code.split('\n') + fixed_code = extract_code_line(fixed_code) + fixed_code = remove_empty_lines(fixed_code) + fixed_code = remove_java_comments(fixed_code) + except Exception as e: + return f"No enough code-{e}", fixed_code + + try: + print("fixed_code_test", fixed_code) + java_test_result = load_and_run_java_tests(fixed_code, unittest_case) + print("java_results\n", java_test_result) + return java_test_result, fixed_code + except Exception as e: + return f"We got errors, {e}", fixed_code + + +def python_unittest_evaluation(fixed_code, starting_code, default_test_code, unittest_case): + """ + Load and run Python test cases against the provided code. + Inputs: + fixed_code (str): The Python code to be tested. + starting_code (str): The default starting code provided to the student. Now it's "" + default_test_code (str): The default test code provided to the student. Now it's "". + unittest_case (str): The Python test cases. The test code is automatically reformatted based on the unittest_code provided by instructors in the RST file. + Output: + bool: True if all tests pass, False otherwise. + str: The cleaned fixed code after removing comments and empty lines. + """ + try: + fixed_code.split('\n') + fixed_code = extract_code_line(fixed_code) + fixed_code = remove_default_testline(fixed_code, default_test_code) + fixed_code = remove_empty_lines(fixed_code) + fixed_code = remove_python_comments(fixed_code) + #print("cleaned_fixed_code\n", fixed_code) + except Exception as e: + return f"No enough code-{e}", fixed_code + try: + ##print("fixed_code_first attempt", fixed_code) + results = load_and_run_tests(unittest_case, fixed_code) + print("results.wasSuccessful()\n", results.wasSuccessful()) + return results.wasSuccessful(), fixed_code + except Exception as e: + print("Exception", e) + try: + fixed_code = fix_indentation(fixed_code) + results = load_and_run_tests(unittest_case, fixed_code) + #print("fix_indentation", fixed_code) + if contain_default_starting_code(starting_code, fixed_code): + #print("results.wasSuccessful()\n", results.wasSuccessful()) + return results.wasSuccessful(), fixed_code + else: + return "No starting code", fixed_code + except Exception as e: + return f"We got errors, {e}", fixed_code + +def code_distractor_unittest_evaluation(language, code_with_distrator, starting_code, default_test_code, unittest_case): + """ + Evaluate the code with distractors using unit tests. + Inputs: + language (str): The programming language of the code ('python' or 'java'). + code_with_distrator (str): The code with distractors to be evaluated. + starting_code (str): The default starting code provided to the student. Now it's "" + default_test_code (str): The default test code provided to the student. Now it's "". + unittest_case (str): The Python/Java test cases. The test code is automatically reformatted based on the unittest_code provided by instructors in the RST file. + Output: + bool: True if all tests pass, False otherwise. + str: The cleaned code with distractors after removing comments and empty lines. + """ + if language == "java": + try: + java_test_result = load_and_run_java_tests(code_with_distrator, unittest_case) + return java_test_result, code_with_distrator + except Exception as e: + return f"We got errors, {e}", code_with_distrator + else: + try: + results = load_and_run_tests(unittest_case, code_with_distrator) + if contain_default_starting_code(starting_code, code_with_distrator): + return results.wasSuccessful(), code_with_distrator + else: + return "No starting code", code_with_distrator + except: + try: + code_with_distrator = fix_indentation(code_with_distrator) + results = load_and_run_tests(unittest_case, code_with_distrator) + if contain_default_starting_code(starting_code, code_with_distrator): + return results.wasSuccessful(), code_with_distrator + else: + return "No starting code", code_with_distrator + except Exception as e: + return False, code_with_distrator + + +def clean_student_code(student_code, default_test_code): + """ + Clean the student's code by removing default test lines, empty lines, and comments. + Inputs: + student_code (str): The student's code to be cleaned. + default_test_code (str): The default test code provided to the student. Now it's "". + Output: + str: The cleaned student code. + """ + try: + student_code.split('\n') + cleaned_student_code = remove_default_testline(student_code, default_test_code) + cleaned_student_code = remove_empty_lines(cleaned_student_code) + cleaned_student_code = remove_python_comments(cleaned_student_code) + except: + cleaned_student_code = student_code + + return cleaned_student_code diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/generate_parsons_blocks.py b/bases/rsptx/book_server_api/routers/personalized_parsons/generate_parsons_blocks.py new file mode 100644 index 000000000..8dc991e7e --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/generate_parsons_blocks.py @@ -0,0 +1,495 @@ +from collections import namedtuple +import re +from collections import OrderedDict + +def check_indentation_level(line): + """Check the indentation level of a line.""" + match = re.match(r'^(\s*)', line) + indentation_level = len(match.group(1)) + return indentation_level + + +# generate corresponding Parsons puzzles for each type +def generate_Parsons_block(personalize_level, language, Parsons_type, question_info, fixed_code, unchanged_lines, fixed_lines, distractors): + """ + This is the main API to generate Parsons blocks based on the type of Parsons puzzles. + Inputs: + personalize_level (str): The level of personalization ("Multiple" or "Solution"). + language (str): The programming language of the code ("python" or "java"). + Parsons_type (str): The type of Parsons puzzles ("Full" or "Partial"). + question_info (str): The problem description or question information. + fixed_code (str): The fixed code. + unchanged_lines (list): List of unchanged lines (tuples of (line number, length, code)). + fixed_lines (list): List of fixed lines (tuples of (line number, length, code)). + distractors (list): List of distractor code snippets. + Output: Parsons_puzzle (str): The generated Parsons puzzle in string format. + """ + if Parsons_type == "Full": + return generate_full_Parsons(personalize_level, language, fixed_code, question_info) + else: + return generate_partial_Parsons(personalize_level, language, Parsons_type, question_info, unchanged_lines, fixed_lines, distractors) + +def break_and_indent(text, max_line_length, indent=4): + """Break text into lines of a specified maximum length and indent them.""" + lines = [] + current_line = "" + + words = text.split() + + for word in words: + if len(current_line) + len(word) + 1 > max_line_length: + lines.append(" " * indent + current_line) + current_line = word + else: + if current_line: + current_line += " " + word + else: + current_line = word + + if current_line: + lines.append(" " * indent + current_line) + + return '\n'.join(lines) + + +def split_java_code_into_blocks(java_code): + """ + Rule-based criteria to split Java code into Parsons blocks. + Aim to not make empty {} into individual blocks. + """ + lines = java_code.split('\n') + blocks = [] + i = 0 + n = len(lines) + + def get_indent(line): + return len(re.match(r'^(\s*)', line).group(1)) + + def is_real_code(line): + s = line.strip() + return s and s not in ['{', '}'] + + def is_open_brace(line): + return line.strip() == '{' + + def is_close_brace(line): + return line.strip() == '}' + + while i < n: + # Skip empty lines + while i < n and not lines[i].strip(): + i += 1 + if i >= n: + break + + block_lines = [] + base_indent = get_indent(lines[i]) + + # Add first line to block + block_lines.append(lines[i]) + i += 1 + + # If next line is `{`, add it to this block + if i < n and is_open_brace(lines[i]): + block_lines.append(lines[i]) + i += 1 + + # Add all real code lines with same indentation + while i < n and get_indent(lines[i]) == base_indent and is_real_code(lines[i]): + block_lines.append(lines[i]) + i += 1 + + # If next line is `}`, add it to the last real code block + if i < n and is_close_brace(lines[i]): + block_lines.append(lines[i]) + i += 1 + + blocks.append('\n'.join(block_lines)) + + Parsons_puzzle = convert_code_to_block(blocks) + return Parsons_puzzle + + +def generate_full_Parsons(personalize_level, language, fixed_code, question_info): + """ + Generate a fully movable Parsons puzzle without any distractors. - happens when the total similarity is too low (< 0.2) + Inputs: + personalize_level (str): The level of personalization ("Multiple" or "Solution"). + language (str): The programming language of the code ("python" or "java"). + fixed_code (str): The fixed code. + question_info (str): The problem description or question information. + Output: Parsons_puzzle (str): The generated Parsons puzzle in string format. + """ + fixed_code = re.sub(r'\n\s*\n', '\n', fixed_code) + question_info = break_and_indent(question_info, max_line_length=80) + + if language == 'python': + # Extract code blocks based on indentation patterns + matches = re.findall(r'(?<=\n)(\s*)(.*)', fixed_code) + matches = [('',fixed_code.split('\n')[0])] + matches + # if no indentation, add to matches as ("", code_line) + # Remove empty and whitespace-only blocks and add four indentation spaces to each block + blocks = [' ' + block[0] + block[1] for block in matches if block[1].strip()] + blocks = handle_imports_and_splitting(blocks) + blocks = aggregate_code_to_full_Parsons_block(blocks) + Parsons_puzzle = convert_code_to_block(blocks) + return Parsons_puzzle + elif language == 'java': + # for java, we do not want to split empty {} to individual blocks, so we need to do preprocessing + java_Parsons_puzzle = split_java_code_into_blocks(fixed_code) + return java_Parsons_puzzle + + +def generate_partial_Parsons(personalize_level, language, Parsons_type, question_info, unchanged_lines, fixed_lines, distractor_tuple_dict): + """ + Generate a Parsons puzzle with some unmovable blocks and distractors. + Inputs: + personalize_level (str): The level of personalization ("Multiple" or "Solution"). + language (str): The programming language of the code ("python" or "java") - does not affect the current implementation. + Parsons_type (str): The type of Parsons puzzles ("Full" or "Partial"). + question_info (str): The problem description or question information. + unchanged_lines (list): List of unchanged lines (tuples of (line number, length, code)). + fixed_lines (list): List of fixed lines (tuples of (line number, length, code)). + distractor_tuple_dict (dict): Dictionary of distractor code snippets with their corresponding correct lines as keys. + Output: Parsons_puzzle (str): The generated Parsons puzzle in string format. + """ + fixed_lines = [(line[0], line[1], line[2].rstrip()+ '\n') for line in fixed_lines if line[2].strip()] + matched_fixed_lines = [] + if (Parsons_type == "Partial" and personalize_level == "Multiple"): + unchanged_lines = [(line[0], line[1], line[2].rstrip() + ' #settled\n') for line in unchanged_lines if line[2].strip()] + # some unchanged lines might be moved to the distractor list + if (Parsons_type == "Partial" and personalize_level == "Solution"): + unchanged_lines = [(line[0], line[1], line[2].rstrip() + '\n') for line in unchanged_lines if line[2].strip()] + + blocks = fixed_lines + unchanged_lines + matched_fixed_lines + for fixed_line_key in distractor_tuple_dict.keys(): + blocks = [(line[0], line[1], line[2].rstrip() + ' #matched-fixed\n') if line[2].strip() == fixed_line_key[2].strip() else (line[0], line[1], line[2]) for line in blocks] + fixed_line_code = fixed_line_key[2] + line_indentation = fixed_line_code[:len(fixed_line_code) - len(fixed_line_code.lstrip())] + if type(distractor_tuple_dict[fixed_line_key]) == tuple: + distractor_tuple_dict[fixed_line_key] = (fixed_line_key[0]+0.5, fixed_line_key[0], line_indentation + distractor_tuple_dict[fixed_line_key][2].strip() + " #paired") + elif type(distractor_tuple_dict[fixed_line_key]) == str: + distractor_tuple_dict[fixed_line_key] = (fixed_line_key[0]+0.5, fixed_line_key[0], line_indentation + distractor_tuple_dict[fixed_line_key].strip() + " #paired") # add both unchanged_lines and fixed_lines of the fixed solution to blocks + # add distractors to blocks + blocks = blocks + list(distractor_tuple_dict.values()) + # add the fourth value to each tuple in blocks + # Iterate over the list and modify the tuples + for i, tpl in enumerate(blocks): + actual_code = tpl[2] + indentation = check_indentation_level(actual_code) # Modify the second value based on the length of the third value + # Add four indentation spaces to each block + blocks[i] = (tpl[0], tpl[1], indentation, ' ' + actual_code) + + blocks = aggregate_code_to_Parsons_block_with_distractor(blocks) + question_info = break_and_indent(question_info, max_line_length=80) + Parsons_puzzle = convert_code_to_block(blocks) + return Parsons_puzzle + +def keep_last_hash_tag_lines(input_string, hash_tag): + """ + Keep only the last occurrence of a specific hash tag in each line of the input string. + """ + lines = input_string.split('\n') + output_lines = [] + found_last_settled = False + for line in reversed(lines): + if (hash_tag in line) & (not found_last_settled): + output_lines.append(line) + found_last_settled = True + elif (hash_tag in line) & (found_last_settled == True): + line = line.replace(hash_tag, "") + output_lines.append(line) + else: + output_lines.append(line) + return '\n'.join(reversed(output_lines)) + +def check_settled_tag(items): + """ + Check if any code item in the list contains multiple '#settled' tag. + If so, keep only the last occurrence of '#settled' in that item. + """ + tag_pattern = re.compile(r'\s*#settled\b') + processed = [] + for item in items: + matches = list(tag_pattern.finditer(item)) + if not matches: + processed.append(item) + continue + # keep only the last #settled + last_match = matches[-1] + new_item = "" + last_end = 0 + for m in matches: + if m is last_match: + new_item += item[last_end:m.end()] + else: + # remove this occurrence + new_item += item[last_end:m.start()] + last_end = m.end() + new_item += item[last_end:] # rest of the string + processed.append(new_item) + return processed + + +def reset_distractor_flag(distractor_block): + """ + Remove all '#paired' and '#settled' tags from the distractor block list. + If there is any '#paired' tag, keep the last line with '#paired' tag and add it back. + """ + distractor_block = list(filter(None, distractor_block)) + has_paired = False + + for i, item in enumerate(distractor_block): + if "#paired" in item: + distractor_block[i] = item.replace("#paired", "") + has_paired = True + else: + distractor_block[i] = item.replace("#settled", "") + + if has_paired: + distractor_block[-1] = distractor_block[-1].replace("\n", "") + " #paired" + "\n" + + return distractor_block + + +def extract_distractor_Parsons_block(distractor_block_stack): + """ + Extract distractor Parsons blocks from the distractor block stack. + If there is only one distractor, return two separate blocks: fixed_line_block and distractor_line_block. + If there are multiple distractors, return the combined blocks with only the last distractor line kept. + """ + count_distractor = sum(1 for block_tuple in distractor_block_stack if "#paired" in block_tuple[1]) + + if count_distractor == 1: + # Create two separate lists based on the extracted neighbor tuples + fixed_line_block = reset_distractor_flag([tpl[1] for tpl in distractor_block_stack if "#paired" not in tpl[1]]) + fixed_line_block = '\n'.join([block.rstrip('\n') for block in fixed_line_block]) + distractor_line_block = reset_distractor_flag([tpl[1] for tpl in distractor_block_stack if "#matched-fixed" not in tpl[1]]) + distractor_line_block = '\n'.join([block.rstrip('\n') for block in distractor_line_block]) + return fixed_line_block, distractor_line_block + else: + # only keep the last line that has "#paired", one is settled + all paired; one is settled + all match-fixed + d_stack_remove_matched_lines = [tup for tup in distractor_block_stack if '#matched-fixed' not in tup[1]] + d_blocks = ''.join(str(block[1]) for block in d_stack_remove_matched_lines) + d_block_keep_last_distractor = keep_last_hash_tag_lines(d_blocks, "#paired") + d_blocks_list = reset_distractor_flag(d_block_keep_last_distractor.split('\n')) + distractor_line_block = '\n'.join([block.rstrip('\n') for block in d_blocks_list]) + + f_stack_remove_matched_lines = [tup for tup in distractor_block_stack if '#paired' not in tup[1]] + f_blocks = ''.join(str(block[1]) for block in f_stack_remove_matched_lines) + f_block_keep_last_distractor = keep_last_hash_tag_lines(f_blocks, "#matched-fixed") + f_blocks_list = reset_distractor_flag(f_block_keep_last_distractor.split('\n')) + fixed_line_block = '\n'.join([block.rstrip('\n') for block in f_blocks_list]) + + return fixed_line_block, distractor_line_block + +def same_indentation_code_to_Parsons_block(blocks): + """ + Check if the third value is the same for all tuples except the first one + If so, aggregate all code lines into a single Parsons block. + """ + all_Parsons_blocks = {} + for block in blocks: + if not block[3].endswith('\n'): + block += '\n' + + all_Parsons_blocks[block[0]] = str(block[3]) + + return all_Parsons_blocks + +def handle_imports_and_splitting(blocks): + """ + For Python, we want to separate import statements as single blocks to split them from the rest of the code. + """ + processed_blocks = [] + + for block in blocks: + block_stripped = block.strip() + + # Separate import statements into their own block + if block_stripped.startswith('import') or block_stripped.startswith('from'): + processed_blocks.append(block) + continue + + processed_blocks.append(block) + + return processed_blocks + +def aggregate_code_to_Parsons_block_with_distractor(blocks): + """ + Aggregate the code into Parsons blocks, handling distractor blocks specially. + """ + blocks = [(tup[0], tup[1], tup[2], tup[3] if tup[3].endswith('\n') else tup[3] + '\n') for tup in blocks] + # Sort the blocks by their starting line number and then indentation level + blocks = sorted(blocks, key=lambda tpl: (tpl[0], tpl[1])) + current_indent_in_block_stack = blocks[0][2] + distractor_indent = "" + all_Parsons_blocks = {} + block_stack = [] + same_indentation_level = all(item[2] == blocks[1][2] for item in blocks[1:]) + # if all blocks have the same indentation level, then aggregate all code lines into a single Parsons block + if same_indentation_level: + all_Parsons_blocks = same_indentation_code_to_Parsons_block(blocks) + else: + for index, block in enumerate(blocks): + this_indent = block[2] + # check wehther the current stack will be marked as a distractor block stack + if ('#paired' in block[3]): + distractor_indent = this_indent + # store the current block into the block stack + if this_indent == current_indent_in_block_stack: + block_stack.append((index, block[3])) + elif (distractor_indent == "") & (this_indent != current_indent_in_block_stack): + # use the first line number of the block as the line sequence number + all_Parsons_blocks[block_stack[0][0]] = ''.join(str(block[1]) for block in block_stack) + block_stack = [(index, block[3])] + current_indent_in_block_stack = this_indent + # distractor_indent != "" means that detected that this is an end of a distractor block stack or a start of a distractor block stack -- how to distinguish? + # this_indent != current_indent_in_block_stack means that we have finished building a block stack + # so we have stored all the related lines in this distractor block stack + # now we need to do some special processing for the distractor block stack + elif (distractor_indent != "") & (this_indent != current_indent_in_block_stack): + # if this, then we have finished building a distractor block stack + count_fixed= sum(1 for block in block_stack if "#matched-fixed" in block[1]) + if (count_fixed == 0): + #then this is just a start of a distractor block stack, moved what we have stored in the block stack to the all_Parsons_blocks first + all_Parsons_blocks[block_stack[0][0]] = block_stack + # then continue + fixed_line_block, distractor_block = extract_distractor_Parsons_block(block_stack) + # the numbers 0.20 and 0.22 are used to ensure that the distractor block comes right after the fixed line block + all_Parsons_blocks[block_stack[0][0]+0.20] = fixed_line_block + all_Parsons_blocks[block_stack[0][0]+0.22] = distractor_block + # prepare for the next loop + distractor_indent = "" + block_stack = [(index, block[3])] + current_indent_in_block_stack = this_indent + # if it is the last item, then no loop anymore, just store the last block stack + + if index == len(blocks)-1: + if (distractor_indent == ""): + # use the first line number of the block as the line sequence number + all_Parsons_blocks[block_stack[0][0]] = ''.join(str(block[1]) for block in block_stack) + elif (distractor_indent != ""): + count_fixed= sum(1 for block in block_stack if "#matched-fixed" in block[1]) + if (count_fixed == 0): + #then this is just a start of a distractor block stack, moved what we have stored in the block stack to the all_Parsons_blocks first + all_Parsons_blocks[block_stack[0][0]] = block_stack + # then continue + fixed_line_block, distractor_block = extract_distractor_Parsons_block(block_stack) + all_Parsons_blocks[block_stack[0][0]+0.20] = fixed_line_block + all_Parsons_blocks[block_stack[0][0]+0.22] = distractor_block + + + all_Parsons_blocks = OrderedDict(sorted(all_Parsons_blocks.items())) + all_Parsons_blocks = list(all_Parsons_blocks.values()) + all_Parsons_blocks = [item.replace(' #matched-fixed', '') if '#matched-fixed' in item else item for item in all_Parsons_blocks] + # removing all occurrences of "#settled" lines except for the last one + all_Parsons_blocks = check_settled_tag(all_Parsons_blocks) + all_Parsons_blocks = [item for item in all_Parsons_blocks if item is not None and item != ""] + + return all_Parsons_blocks + +def aggregate_code_to_full_Parsons_block(blocks): + """ + Aggregate the code into full Parsons blocks. All code lines with the same indentation level are grouped together. + 1. Function definitions (def) and return statements are treated as separate blocks. + 2. Import statements are grouped together into a single block. + 3. Other lines with the same indentation level are grouped together. + 4. If the indentation level changes, a new block is started. + 5. Blank lines are preserved within blocks. + 6. Each block ends with a newline character. + """ + current_indent = check_indentation_level(blocks[0]) + all_Parsons_blocks = [] + Parsons_block = "" + import_block = "" + + for block in blocks: + if not block.endswith('\n'): + block += '\n' + + this_indent = check_indentation_level(block) + + if block.strip().startswith('import'): + import_block += block # add to the import block + continue # continue processing without disrupting other logic + + if import_block and not block.strip().startswith('import'): + all_Parsons_blocks.append(import_block) # add the collected import block at the beginning + import_block = "" # reset the import block + + if block.strip().startswith(('def', 'return')): + if Parsons_block: # append any current accumulated block + all_Parsons_blocks.append(Parsons_block) + Parsons_block = "" # reset Parsons block + all_Parsons_blocks.append(block) # add the def or return statement as its own block + current_indent = this_indent + elif this_indent == current_indent: + Parsons_block += block + else: + if Parsons_block: # append current block before resetting + all_Parsons_blocks.append(Parsons_block) + Parsons_block = block + current_indent = this_indent + + # add any remaining blocks + if Parsons_block: + all_Parsons_blocks.append(Parsons_block) + + return all_Parsons_blocks + + + +def reduce_whitespace(text): + """ + Reduce multiple spaces and tabs to a single space, preserving indentation and line breaks. + """ + lines = text.split('\n') # split the text into lines based on '\n' + processed_lines = [] + if len(lines) == 1: + indentation = text[:len(text) - len(text.lstrip())][:-1] + rest_of_string = re.sub(r'\s+', ' ', text.lstrip()) + return f"{indentation} {rest_of_string}" + else: + for line in lines: + if line != '': + indentation = text[:len(line) - len(line.lstrip())][:-1] + rest_of_string = re.sub(r'\s+', ' ', line.lstrip()) + processed_lines.append(f"{indentation} {rest_of_string}") + # join the processed lines back using '\n' + # determine the join character based on whether the line ends with '\n' + join_char = '\n' if processed_lines[-1] != '' else '' + + # join the processed lines back using '\n' or '' as the join character + processed_text = join_char.join(processed_lines) + return processed_text + + +def convert_code_to_block(blocks): + """ + Convert a list of code blocks into a single string with '---\n' as separators + and ensure proper formatting. + """ + for i, block in enumerate(blocks): + block = reduce_whitespace(block) + block = re.sub(r'\n+', '\n', block) + # add -----\n at the beginning of the first block + if not block.endswith('\n'): + block += '\n' + + if i == 0: + blocks[0] = block + '---\n' + + elif i == len(blocks) - 1: + blocks[i] = block + # add ===== after each block and then \n + elif (i != 0) & (i < len(blocks) - 1): + blocks[i] = block + '---\n' + + # save the blocks into a string + blocks = ''.join(blocks) + return blocks + + + + diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/get_parsons_code_distractors.py b/bases/rsptx/book_server_api/routers/personalized_parsons/get_parsons_code_distractors.py new file mode 100644 index 000000000..fcc65e922 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/get_parsons_code_distractors.py @@ -0,0 +1,33 @@ + +def generate_code_with_distrator(unchanged_lines, fixed_lines, distractor_tuple): + """ + Generate code with a single distractor line inserted at the appropriate position. + Inputs: + unchanged_lines (list): List of tuples representing unchanged lines in the format (line_number, similarity_score, code). + fixed_lines (list): List of tuples representing fixed lines in the format (line_number, similarity_score, code). + distractor_tuple (tuple): A tuple containing the key fixed line and the distractor line in the format ((line_number, similarity_score, code), distractor_line). + Output: + str: The code with the distractor line inserted. + """ + unchanged_lines = [(line[0], line[1], line[2]) for line in unchanged_lines if line[2].strip()] + fixed_lines = [(line[0], line[1], line[2]) for line in fixed_lines if line[2].strip()] + other_lines = unchanged_lines + fixed_lines + key_fixed_line = distractor_tuple[0] + fixed_line_code = key_fixed_line[2] + line_indentation = fixed_line_code[:len(fixed_line_code) - len(fixed_line_code.lstrip())] + distractor_line = distractor_tuple[1] + # if distractor_dict[key_fixed_line] has a value, then remove the corresponding distractor_dict[key_fixed_line] from the fixed_lines + for i, other_line in enumerate(other_lines): + if key_fixed_line[2] == other_line[2]: + other_lines.pop(i) + else: + continue + blocks = other_lines + [(key_fixed_line[0]+0.5, key_fixed_line[0], line_indentation + distractor_line.strip())] + # sort the blocks by their starting line number + blocks = sorted(blocks, key=lambda x: x[0]) + + # extract the last element of each tuple and store them in a string + actual_code_blocks = [t[-1] for t in blocks] + code_with_distrator = '\n'.join(actual_code_blocks) + + return code_with_distrator \ No newline at end of file diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/get_personalized_solution.py b/bases/rsptx/book_server_api/routers/personalized_parsons/get_personalized_solution.py new file mode 100644 index 000000000..21c689b84 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/get_personalized_solution.py @@ -0,0 +1,234 @@ +from openai import OpenAI +import re + +from .evaluate_fixed_code import * + +# Below is the system message as part of the prompt to generate the fixed code +system_message = """ +Fix the provided {programming_language} [user-code] based on the provided [task-description] and [sample-solution] and generate the [fixed-code]. +The [fixed-code] should pass the provided [unittest-code] and be more similar to the [user-code] than the [sample-solution]. +When possible, the [fixed-code] should follow the existing solving strategy and solution path in [user-code], use the same type of [control_structures], use the same variable names as [user-code]. +The [fixed-code] should require the least amount of edits from the [user-code]. +For example, the [user-code] uses [control_structures], the [fixed-code] should also use these [control_structures] when establishing the solution. +The [fixed-code] should follow the {programming_language} style guide. +[task-description]: '{question_description}' +[end-task-description] + +[sample-solution]: '{example_solution}' +[end-solution] + +[unittest-code]: '{unittest_code}' +[end-unittest-code] + +[control-structures]: '{control_structures}' +[end-control-structures] +""" + +def find_control_structures_java(buggy_code): + """ + Find control structures in Java code. + Include them in the prompt to encourage the LLM to use similar structures in the fixed code. + """ + control_structures_java = [] + + # regular expressions for some common control structures and loops in entry-level Java code + regex_for = r'for\s*\(\s*.*?\s*\)\s*\{' + regex_while = r'while\s*\(\s*.*?\s*\)\s*\{' + regex_if_elif = r'if\s*\(.*?\)\s*\{|\belse if\b\s*\(.*?\)\s*\{' + regex_if_else = r'if\s*\(.*?\)\s*\{|\belse\b\s*\{' + + # check for each regex pattern in the code snippet + if re.search(regex_for, buggy_code): + control_structures_java.append("for-loop") + if re.search(regex_while, buggy_code): + control_structures_java.append("while-loop") + if re.search(regex_if_elif, buggy_code): + control_structures_java.append("if-or-elif-condition") + if re.search(regex_if_else, buggy_code): + control_structures_java.append("if-or-else-condition") + + return control_structures_java + +def find_control_structures_python(buggy_code): + """ + Find control structures in Python code. + Include them in the prompt to encourage the LLM to use similar structures in the fixed code + """ + control_structures = [] + + # regular expressions for some common control structures and loops in entry-level Python code + regex_for = r'for\s+\w+\s+in\s+\w+\s*:' + regex_while = r'while\s+\w+\s*:' + regex_for_range = r'for\s+\w+\s+in\s+range\(' + regex_for_items = r'for\s+\w+\s*,\s*\w+\s+in\s+\w+\.items\(\)' + regex_if_elif = r'if\s+\w+\s*:\s*|elif\s+\w+\s*:' + regex_if_else = r'if\s+\w+\s*:\s*|else\s*:' + + # check for each regex pattern in the code snippet + if re.search(regex_for, buggy_code): + control_structures.append("for-loop") + if re.search(regex_while, buggy_code): + control_structures.append("while-loop") + if re.search(regex_for_range, buggy_code): + control_structures.append("for-range-loop") + if re.search(regex_for_items, buggy_code): + control_structures.append("for-items-loop") + if re.search(regex_if_elif, buggy_code): + control_structures.append("if-or-elif-condition") + if re.search(regex_if_else, buggy_code): + control_structures.append("if-or-else-condition") + + return control_structures + + +def build_code_prompt(language, problem_description, buggy_code, unittest_code, example_solution, system_message, attempt_type): + """ + Build the prompt messages for the LLM to generate the personalized fixed code. Here we use zero-shot prompting. + This decision is made to reduce instructor burden to provide example buggy / personalized solutions. + Inputs: + language (str): The programming language of the code ("python" or "java"). + problem_description (str): The description of the coding problem. + buggy_code (str): The student's buggy code. + unittest_code (str): The unittest code to validate the fixed code. + example_solution (str): The example solution code. + system_message (str): The system message template for the prompt. + attempt_type (str): The type of attempt ("new" or "repeat"). + Output: + list: The list of prompt messages for the LLM. + """ + if attempt_type in ["new", "repeat"]: + if language == "java": + control_structures = find_control_structures_java(buggy_code) + else: + control_structures = find_control_structures_python(buggy_code) + + system_message = system_message.format( + programming_language = language, + question_description = problem_description, + example_solution = example_solution, + unittest_code = unittest_code, + control_structures = control_structures + ) + + prompt_code = "[user-code]:\n" + buggy_code + "\n[end-user-code]" + prompt_messages = [ + {"role": "system", "content": system_message}, + {"role": "user", "content": prompt_code}, + ] + + return prompt_messages + +def generate_personalized_fix(api_token, prompt_messages, attempt_type, situation, old_fixed_code): + """ + Generate the personalized fixed code using the LLM. + Inputs: + api_token (str): The API token for the LLM. + prompt_messages (list): The list of prompt messages for the LLM. + attempt_type (str): The type of attempt ("new" or "repeat"). + situation (str): The situation of the attempt ("a correct answer"). This can be extended to other nuanced situations in the future. + old_fixed_code (str): The old fixed code from the previous attempt (if any). + Output: + str: The generated personalized fixed code. + """ + if attempt_type in ["new"]: + prompt_messages = prompt_messages + elif attempt_type == "repeat": + attachment = f""" + This [old-fixed-code] is not {situation} to the [user-code]. Again, please try to generate a [fixed-code] that is {situation} to the [user-code]. + You can use [sample-solution] as a reference when generating the [fixed-code]. + [old-fixed-code]: '{old_fixed_code}' + [end-old-fixed-code] + """ + prompt_messages[0]["content"] = prompt_messages[0]["content"] + attachment + client = OpenAI(api_key=api_token) + + raw_completion_response = client.chat.completions.create( + model="gpt-5-nano", + messages = prompt_messages, + verbosity="low", + ) + end_marker = "[end-fixed-code]" + start_marker = "[fixed-code]:\n" + + fixed_code = raw_completion_response.choices[0].message.content + + if fixed_code.endswith(end_marker): + fixed_code = fixed_code.removesuffix(end_marker).rstrip() + if fixed_code.startswith(start_marker): + fixed_code = fixed_code.removeprefix(start_marker).lstrip() + return fixed_code + +def get_fixed_code(api_token, language, problem_description, buggy_code, unittest_code, example_solution, attempt_type, situation, old_fixed_code): + """ + Get the personalized fixed code for the student's buggy code. It calls generate_personalized_fix to get the fixed code from the LLM. + Inputs: + api_token (str): The API token for the LLM. + language (str): The programming language of the code ("python" or "java"). + problem_description (str): The description of the coding problem. + buggy_code (str): The student's buggy code. + unittest_code (str): The unittest code to validate the fixed code. + example_solution (str): The example solution code. + attempt_type (str): The type of attempt ("new" or "repeat"). + situation (str): The situation of the attempt ("a correct answer"). This can be extended to other nuanced situations in the future. + old_fixed_code (str): The old fixed code from the previous attempt (if any). + Output: + str: The generated personalized fixed code. + """ + cleaned_buggy_code = clean_student_code(buggy_code, "") + print("get_fixed_code-cleaned_buggy_code", cleaned_buggy_code) + # Future: bring it back if we want to use the cache - also need to add the store_solution_cache.py back + # cached_solution = get_solution_from_cache(cleaned_buggy_code, db_name="personalized_solution_cache.db") + # if cached_solution != None: + # print("Solution found in cache.",get_solution_from_cache(cleaned_buggy_code)) + # return get_solution_from_cache(cleaned_buggy_code) + + if attempt_type in ["new", "repeat"]: + prompt_messages = build_code_prompt(language, problem_description, buggy_code, unittest_code, example_solution, system_message, attempt_type) + + fixed_code_response = generate_personalized_fix(api_token, prompt_messages, attempt_type, situation, old_fixed_code) + print("fixed_code_response", fixed_code_response) + return fixed_code_response + +def get_example_solution(api_token, language, problem_description, unittest_code): + """ + Get an example solution for the coding problem using the LLM, called when we do not have an instructor-provided example solution. + Inputs: + api_token (str): The API token for the LLM. + language (str): The programming language of the code ("python" or "java"). + problem_description (str): The description of the coding problem. + unittest_code (str): The unittest code to validate the fixed code. + Output: + str: The generated example solution code. Or an empty string if the LLM-generated code does not pass the unittest. + """ + # call an LLM to get the example solution + client = OpenAI(api_key=api_token) + + example_solution_system_message = f""" + You are a helpful {language} coding assistant for CS1/CS2 classes. You only provide correct example solutions that uses the entry-level {language} programming language features. + Respond ONLY with valid code. Include imports, classes, or functions if needed. Do NOT include explanations, comments, markdown, or test cases. + """ + + example_solution_user_message = f""" + Generate a correct example solution for the following question in {language}: {problem_description}. + Here is the unittest code: {unittest_code}. The solution should be correct and pass the unittest." + """ + + raw_completion_response = client.chat.completions.create( + model="gpt-5-nano", + messages=[ + {"role": "system", "content": example_solution_system_message}, + {"role": "user", "content": example_solution_user_message} + ] + ) + + completion = raw_completion_response.choices[0].message + LLM_example_code = completion.content + # test if the LLM_example_code is correct remove all potential # + LLM_example_code = LLM_example_code.lstrip("#").rstrip("#").strip() + unittest_result, cleaned_LLM_example_code = unittest_evaluation(language, LLM_example_code, "", "", unittest_case=unittest_code) + if unittest_result == True: + # LLM_example_code is correct + return cleaned_LLM_example_code + else: + # LLM_example_code is not correct, return empty string + return "" diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/personalize_parsons.py b/bases/rsptx/book_server_api/routers/personalized_parsons/personalize_parsons.py new file mode 100644 index 000000000..4f8e8f867 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/personalize_parsons.py @@ -0,0 +1,251 @@ +import difflib +from collections import namedtuple +from .generate_parsons_blocks import * +from .token_compare import * +import random + +# Compare the similarity between the student code and the fixed code +# It returns the difference between the two code snippets line by line using a loop. It also returns the similarity ratio. +# The difflib module compares lines based on their content, so it may not capture more complex differences - but we can use it for our case +CodeComparison = namedtuple('CodeComparison', ['student_removed', 'fixed_modified', 'line_similarity']) + + +def compare_code(buggy_code, fixed_code, default_start_code, language): + """ + Compare the buggy code with the fixed code and return the differences. + Inputs: + buggy_code (str): The student's buggy code. + fixed_code (str): The correct fixed code. + default_start_code (str): The default starting code to be ignored in the comparison. It's "" right now. + language (str): The programming language of the code ('python' or 'java'). + Outputs: + code_comparison_pairs (list): A list of CodeComparison namedtuples representing the differences between the buggy and fixed code. + fixed_lines (list): A list of tuples representing the fixed lines in the format (line_number, similarity_score, code). + removed_lines (list): A list of tuples representing the removed lines in the format (line_number, similarity_score, code). + unchanged_lines (list): A list of tuples representing the unchanged lines in the format (line_number, similarity_score, code). + total_similarity (float): The overall similarity score between the buggy code and the fixed code. + """ + + code_comparison_pairs = [] + + # split the code into lines + student_lines = buggy_code.splitlines(keepends=True) + fixed_lines = fixed_code.splitlines(keepends=True) + + # strip trailing spaces but keep leading spaces + student_lines = [line.rstrip() for line in student_lines] + fixed_lines = [line.rstrip() for line in fixed_lines] + + # perform a line-based comparison + diff = list(difflib.Differ().compare(student_lines, fixed_lines)) + + # calculate similarity ratio without the starting line if provided + if default_start_code.strip() == "": + # this means the total_similarity is calculated based on the whole code and can be unexpectedly high if the starting code is long + buggy_code_no_starting = buggy_code + fixed_code_no_starting = fixed_code + else: + buggy_code_no_starting = '\n'.join([line for line in buggy_code.split('\n') if line != default_start_code]) + fixed_code_no_starting = '\n'.join([line for line in fixed_code.split('\n') if line != default_start_code]) + + total_similarity = code_similarity_score(buggy_code_no_starting, fixed_code_no_starting, language) + + # Get the line similarity pairs + line_similarity_pairs = [] + fixed_lines = [] + removed_lines = [] + unchanged_lines = [] + discarded_lines = [] + for i, line in enumerate(diff): + if line.startswith('+'): + fixed_lines.append((i, len(line[1:].strip()), line[2:])) + elif line.startswith('-'): + removed_lines.append((i, len(line[1:].strip()), line[2:])) + elif line.startswith('?'): + discarded_lines.append((i, len(line[1:].strip()), line[2:])) + else: + unchanged_lines.append((i, len(line[1:].strip()), line[2:])) + + # pair up the added and removed lines + max_len = max(len(fixed_lines), len(removed_lines)) + + for i in range(max_len): + try: + line_similarity_pairs.append((['student', removed_lines[i]], ['fixed', fixed_lines[i]])) + except IndexError: + if len(fixed_lines) > len(removed_lines): + line_similarity_pairs.append((['student', (0, '', '')], ['fixed', fixed_lines[i]])) + else: + line_similarity_pairs.append((['student', removed_lines[i]], ['fixed', (0, '', '')])) + + # calculate similarity ratio only for different lines + for i, pair in enumerate(line_similarity_pairs): + if pair[0][1] != pair[1][1]: + similarity = code_similarity_score(pair[0][1][2], pair[1][1][2], language) + pair = CodeComparison(pair[0][1], pair[1][1], similarity) + code_comparison_pairs.append(pair) + + return code_comparison_pairs, fixed_lines, removed_lines, unchanged_lines, total_similarity + +def normalize_and_compare_lines(line1, line2, line_similarity): + """ + Normalize and compare two lines of code. + Inputs: + line1 (str): The first line of code. + line2 (str): The second line of code. + line_similarity (float): The similarity score between the two lines. + Output: + bool: True if the lines are identical after normalization, False otherwise. + """ + if line_similarity == 1: + return True + # normalize indentation + indentation1 = re.match(r'^(\s*)', line1).group(1) + indentation2 = re.match(r'^(\s*)', line2).group(1) + line1_normalized = line1.replace(indentation1, '') + line2_normalized = line2.replace(indentation2, '') + + # remove extra whitespaces + line1_cleaned = re.sub(r'\s+', '', line1_normalized).strip() + line2_cleaned = re.sub(r'\s+', '', line2_normalized).strip() + + # compare normalized lines, highlight the indentation differences + if line1_cleaned != line2_cleaned: + return False + elif (line1_cleaned == line2_cleaned) and (indentation1!=indentation2): + return False + elif (line1_cleaned == line2_cleaned) and (indentation1 == indentation2): + return True + +def find_distractor(fixed_line, removed_lines, language): + """ + Find a distractor line from the removed lines that is similar to the fixed line. + Inputs: + fixed_line (str): The fixed line of code. + removed_lines (list): A list of tuples representing the removed lines in the format (line_number, similarity_score, code). + language (str): The programming language of the code ('python' or 'java'). + Outputs: + highest_similarity (float): The similarity threshold of the distractor line. + distractor_line (str or bool): The distractor line if found, False otherwise. + """ + removed_lines = [tup[2] for tup in removed_lines] + highest_similarity = 0.70 + distractor_line = False + # check whether there is any line achieved a high similarity than the line of comparable location + for student_line in removed_lines: + similarity = code_similarity_score(student_line, fixed_line, language) + normalized_line_comparision = normalize_and_compare_lines(student_line, fixed_line, similarity) + if (similarity > highest_similarity) & (similarity != 1) & (normalized_line_comparision==False): + highest_similarity = similarity + distractor_line = student_line + + return highest_similarity, distractor_line + + +def generate_unique_distractor_dict(distractor_dict): + """ + Generate a unique distractor dictionary by selecting the highest similarity distractor for each unique distractor line. + Input: + distractor_dict (dict): A dictionary where keys are fixed lines and values are tuples of (similarity_score, distractor_line). + Output: + result_distractor_dict (dict): A dictionary with unique distractor lines and their highest similarity scores. + """ + + value_groups = {} + for key, value in distractor_dict.items(): + _, code = value + if code not in value_groups: + value_groups[code] = [] + value_groups[code].append((key, value)) + + # select the highest similarity value for each value[1] + result_distractor_dict = {} + for code, group in value_groups.items(): + highest_similarity = max(group, key=lambda x: x[1][0]) + result_distractor_dict[highest_similarity[0]] = highest_similarity[1][1] + + return result_distractor_dict + +def find_control_flow_lines(distractor_candidate_depot): + """ + Find control flow lines in the distractor candidate depot. + """ + # each element in distractor_candidate_depot is like (location, length, actual code) + flow_keywords = ['if', 'else', 'elif', 'for', 'while', "==", "!=", "<", ">"] + flow_lines = [] + + for element in distractor_candidate_depot: + for keyword in flow_keywords: + pattern = rf'\b{re.escape(keyword)}\b' + if re.search(pattern, element[2]): + flow_lines.append(element) + break + return flow_lines + + +def get_distractor_candidates(distractor_candidate_depot, candidate_num): + """ + Get distractor candidates from the distractor candidate depot. + Inputs: + distractor_candidate_depot (list): A list of tuples representing the distractor candidate lines in the format (line_number, length, code). + candidate_num (int): The number of distractor candidates to select. + Output: + distractor_candidates (list): A list of tuples representing the selected distractor candidates. + """ + # each element in distractor_candidate_depot is like (location, length, actual code) + distractor_candidates = [] + control_flow_lines = find_control_flow_lines(distractor_candidate_depot) + # select all the lines from the code that contains control flow keywords + # if the find_control_flow_lines are more than candidate_num, randomly select candidate_num lines + if len(control_flow_lines) >= candidate_num: + distractor_candidates = random.sample(control_flow_lines, candidate_num) + # if there do have some control flow lines, but less than candidate_num, get all the control flow lines into distractor_candidates + # and add more lines from the top N longest lines + elif (len(control_flow_lines) < candidate_num) and (len(control_flow_lines) > 0): + distractor_candidates = control_flow_lines + distractor_candidates = distractor_candidates + sorted(distractor_candidate_depot, key=lambda x: x[1], reverse=True)[:candidate_num-len(control_flow_lines)] + else: + distractor_candidates = sorted(distractor_candidate_depot, key=lambda x: x[1], reverse=True)[:candidate_num] + + return distractor_candidates + + +def personalize_Parsons_block(language, problem_description, code_comparison_pairs, buggy_code, fixed_lines, removed_lines, unchanged_lines, total_similarity): + """ + Decide which type of Parsons puzzle we will generate and generate the corresponding distractors. + Inputs: + problem_description (str): The description of the problem. + code_comparison_pairs (list): A list of CodeComparison namedtuples representing the differences + buggy_code (str): The student's buggy code. + fixed_lines (list): A list of tuples representing the fixed lines in the format (line + removed_lines (list): A list of tuples representing the removed lines in the format (line_number, similarity_score, code). + unchanged_lines (list): A list of tuples representing the unchanged lines in the format (line + total_similarity (float): The overall similarity score between the buggy code and the fixed code. + Outputs: + puzzle_type (str): The type of Parsons puzzle to generate ("Full", "Correct", "Partial"). + distractors (dict): A dictionary where keys are fixed lines and values are distractor lines. + """ + distractors = {} + + if total_similarity < 0.20: # if the total similarity is too low, then we won't generate any distractors + return "Full", {} + elif total_similarity >= 0.99: + return "Correct", {} + else: + # use students' own buggy code as resource to build distractors + for pair in code_comparison_pairs: + normalize_and_compare = normalize_and_compare_lines(pair[0][2], pair[1][2],pair[2]) + if normalize_and_compare == False: + # if the student code is wrong (not just a different way to write the same code), generate a distractor using student buggy code + distractor_similarity, distractor = find_distractor(pair[1][2], removed_lines, language) + if distractor != False: + distractors[pair[1]] = (distractor_similarity, distractor) + else: + continue + # check to make sure all the paired distractors are different, if some are same, pop up the key value with the least similarity - leave it as a movable line + if len(distractors) > 0: + distractors = generate_unique_distractor_dict(distractors) + else: + distractors = {} + return "Partial", distractors + \ No newline at end of file diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/token_compare.py b/bases/rsptx/book_server_api/routers/personalized_parsons/token_compare.py new file mode 100644 index 000000000..414a7a8a6 --- /dev/null +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/token_compare.py @@ -0,0 +1,87 @@ +import io +import tokenize +import sys +import difflib +import tokenize +import io +import javalang + +def normalize_indentation(code): + """Normalize indentation by removing comments and empty lines.""" + lines = code.split('\n') + normalized_lines = [] + for line in lines: + # Remove comments (anything after #) + line = line.split("#")[0].strip() + if line: # Ignore empty lines + normalized_lines.append(line) + return "\n".join(normalized_lines) + +def tokenize_code(code, language): + """Tokenize the code into meaningful components. Try using the tokenize module first, if it fails, fall back to splitting by whitespace.""" + if language == "python": + try: + tokens = [token.string for token in tokenize.tokenize(io.BytesIO(normalize_indentation(code).encode('utf-8')).readline)] + return tokens, "Token" + except tokenize.TokenError: + return code.split(), "Segment" + except Exception: + return [code], "Code" + elif language == "java": + try: + tokens = [t.value for t in javalang.tokenizer.tokenize(code)] + return tokens, "Token" + except javalang.tokenizer.LexerError: + return code.split(), "Segment" + except Exception: + return [code], "Code" + +def following_tokenize_code(code, type, language): + """Tokenize the code based on the type provided.""" + if language == "python": + if type == "Token": + tokens = [token.string for token in tokenize.tokenize(io.BytesIO(normalize_indentation(code).encode('utf-8')).readline)] + elif type == "Segment": + tokens = code.split() + else: + tokens = code + return tokens + elif language == "java": + if type == "Token": + tokens = [t.value for t in javalang.tokenizer.tokenize(code)] + elif type == "Segment": + tokens = code.split() + else: + tokens = code + return tokens + +def code_similarity_score(code1, code2, language): + """ + Calculate the similarity score between two code snippets using tokenization and SequenceMatcher (can be improved). + 1. Tokenize the first code snippet using the tokenize module. - returns tokens1 and the tokenization type + 2. Try to tokenize the second code snippet using the same method as the first snippet. + If it fails, tokenize the first code snippet using the method used for the second snippet. + Inputs: + code1 (str): The first code snippet. + code2 (str): The second code snippet. + Output: + similarity_ratio (float): The similarity ratio between the two code snippets (0 to 1). + """ + + try: + tokens1, type1 = tokenize_code(code1, language) + tokens2 = following_tokenize_code(code2, type1, language) + except: + tokens2, type2 = tokenize_code(code2, language) + tokens1 = following_tokenize_code(code1, type2, language) + + # Levenshtein cons: sensitive to small changes, even a single-character edit affects the ratio significantly. + # SequenceMatcher.ratio cons: Focuses on sequential matches, so it may produce higher similarity even if there are reordered substrings. + # Choose SequenceMatcher for now, as it better captures the overall structure of the code. This can be improved later. + # Create a SequenceMatcher object + matcher = difflib.SequenceMatcher(None, tokens1, tokens2) + + similarity_ratio = matcher.ratio() + + return similarity_ratio + From 5bde9e5025b2844ab66d36605d1ec08ff030bf19 Mon Sep 17 00:00:00 2001 From: xinyinghou Date: Sun, 24 Aug 2025 19:44:08 +0000 Subject: [PATCH 02/13] Include codetailor-related modules to activecode component --- .../runestone/activecode/activecode.py | 26 +- .../runestone/activecode/css/activecode.css | 81 ++- .../runestone/activecode/js/activecode.js | 552 ++++++++++++++++++ 3 files changed, 656 insertions(+), 3 deletions(-) diff --git a/bases/rsptx/interactives/runestone/activecode/activecode.py b/bases/rsptx/interactives/runestone/activecode/activecode.py index ff3d72bf1..6beea3441 100644 --- a/bases/rsptx/interactives/runestone/activecode/activecode.py +++ b/bases/rsptx/interactives/runestone/activecode/activecode.py @@ -100,7 +100,7 @@ def setup(app): TEMPLATE_END = """