Parse LeetCode data using Airflow

Updated:

Programming problem solving sites like Baekjoon and Leetcode have chrome extensions available for storing successfully solved problems. However, since I primarily use Safari as my browser and host my code on gitea rather than GitHub, I was unable to use these extensions.

These sites have Chrome extensions that automatically push submitted code to a repository when a solution is marked correct. However, since I mainly use Safari and host my own Gitea instance rather than GitHub, I couldn’t use these extensions.

These extensions parse and push code along with runtime and memory usage metrics to a specified repository whenever a submission is accepted. Since I don’t need real-time processing, I created a system to periodically check and process submissions instead.

Scheme

The code has three main objectives:

  1. Store accepted solution code, with the ability to store multiple submissions per problem since solutions may differ
  2. Save each problem description as a Readme.md
  3. Maintain links to each problem directory in the repository root

I plan to implement this in the following way:

  1. Fetch all previously processed submission records from local DB
  2. Parse recent submission history from LeetCode and filter for unprocessed records
  3. Parse problem descriptions from LeetCode
  4. Push source code and problem descriptions based on submission records
  5. Record processed submissions in DB

Configuration

There are two main configurations needed: First is setting up the DB, and second is finding and recording the session and token needed for LeetCode API calls.

DB setup

For the database, I decided to use the same PostgreSQL Docker container that Airflow uses. For convenience, let’s store the DB SQLAlchemy connection URI as an Airflow variable. This allows us to access the value in code as shown below:

from airflow.models import Variable
db_url = Variable.get("DB_URL")

Database is set to the same PostgreSQL Docker container that Airflow uses, and the Table is set up as follows:

CREATE TABLE leetcodee(
    submission_id INTEGER PRIMARY KEY,
    title_slug TEXT
);

At this time, submission_id is set to the value defined by Leetcode, so it is not set to SERIAL.

Leetcode session, CSRFToken

To check the submission history on Leetcode, we need to use the https://leetcode.com/api path. Since it obviously requires member information, we need to find and record the session and token information assigned to me. Fortunately, it is a value that can be reused for a certain period of time after being received once, so we can receive it once and use it until it expires. In the search results, there are many cases where the Cookie is referenced to find the value, but there are many suggestions that do not work over time. I checked the request to go to the submission detail page on submission history page directly through the Developer Tool > Network record. We can find the LEETCODE_SESSION and csrftoken values. Let’s set them as default parameters for the Airflow dag.

Submission record parser

Now, let’s check the submission history on Leetcode. Two tasks need to be performed: getting the latest submission history from Leetcode and checking if it has been processed.

Web session configuration

First, let’s get the latest submission history from Leetcode. Let’s set the session to send requests to Leetcode.

import requests

@task()
def set_web_session_cookie(web_session, leetcode_session, csrftoken):
    web_session.cookies.set(
        "LEETCODE_SESSION",
        leetcode_session,
        domain="leetcode.com",
    )
    web_session.cookies.set(
        "csrftoken",
        csrftoken,
        domain="leetcode.com",
    )

    web_session.headers.update(
        {
            "X-CSRFToken": csrftoken,
            "Referer": "leetcode.com",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        }
    )

    return web_session

with DAG('...',
    params={
        "csrftoken": Param("", type="string"),
        "leetcode_session": Param("", type="string"),
        "repo_dir": Param("", type="string"),
    },
):
    web_session = requests.Session()
    web_session = set_web_session_cookie(
        web_session, "", ""
    )

Since the variables received as parameters are only available inside the task, they must be set inside the task.

Parse submission record

Now, let’s use that session to get data from Leetcode.

from dataclasses import dataclass

@dataclass
class Submission:
    id: int
    lang: str
    timestamp: int
    runtime: str
    memuse: str
    code: str
    title_slug: str

    def __init__(self, data):
        self.id = data["id"]
        self.lang = data["lang"]
        self.timestamp = data["timestamp"]
        self.runtime = data["runtime"]
        self.memuse = data["memory"]
        self.code = data["code"]
        self.title_slug = data["title_slug"]

@task()
def get_acsub(session):
    try:
        response = session.get("https://leetcode.com/api/submissions")
        content = response.json()["submissions_dump"]

        result = []
        for sub in content:
            if sub["status_display"] == "Accepted":
                result.append(Submission(sub))

        return result
    except KeyError:
        time.sleep(1)
        return get_acsub(session)


# In dag block:
    list_sub = get_acsub(web_session)

For readability, a dataclass is defined and implemented to convert the submission record json object directly to the dataclass.

Get already processed record from DB

Now, let’s get the data already processed from the DB.

from sqlalchemy import Column, Integer, String, create_engine, select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class LeetCodeTable(Base):
    __tablename__ = "leetcode"

    submission_id = Column(Integer, primary_key=True)
    title_slug = Column(String)

@task()
def get_all_sub(db_session):
    query = select((LeetCodeTable.submission_id, LeetCodeTable.title_slug))

    # Execute the query and get the results
    existing_records = db_session.execute(query).fetchall()

    return existing_records

# In dag block:
    db_url = Variable.get("DB_URL")
    engine = create_engine(db_url)
    PostgresSession = sessionmaker(bind=engine)
    db_session = PostgresSession()
    all_subs = get_all_sub(db_session)

Remove already processed

Now, let’s keep only the new submission records based on the entire record.

@task()
def check_already_copied(list_submission: list[Submission], all_subs: list):
    # Convert the results to a set for easy comparison
    existing_set = set([record[0] for record in all_subs])

    # Check each object and mark as existing or not
    results = []
    if list_submission:
        for submission in list_submission:
            if submission.id not in existing_set:
                results.append(submission)

    return results

# In dag block:
    list_new_sub = check_already_copied(list_sub, all_subs)

Problem description parser

Now that we have separated the new submission records, we need to parse and save the description of the corresponding problem. We parse the description from the submission detail page based on the submission_id in the submission record.

# Get description for new problems
def get_description(session, submission_id: int) -> str:
    response = session.get(f"https://leetcode.com/submissions/detail/{submission_id}")

    soup = bs(response.text, "html.parser")
    for tag in soup.find_all("meta"):
        if "description" == tag.get("name"):
            content = unescape(tag.get("content"))

            with open("test.md", "w") as f:
                f.write(content)

            return content

    return ""

Git push

Now, let’s commit and push the parsed code and description.

# Git clone
@task()
def git_clone(url: str, repo_dir: str):
    if not os.path.exists(repo_dir):
        Repo.clone_from(url, repo_dir)
    else:
        repo = Repo(repo_dir)
        remote = Remote(repo, "origin")
        remote.pull()

# Commit my submission
def commit_problem_description(title_slug, description, repo_dir):
    prob_dir = os.path.join(repo_dir, "leetcode", title_slug)
    if os.path.exists(prob_dir):
        return

    os.makedirs(prob_dir)
    with open(os.path.join(prob_dir, "Readme.md"), "w") as f:
        f.write(description)

    repo = Repo(repo_dir)
    repo.index.add([os.path.join("leetcode", title_slug, "Readme.md")])
    repo.index.commit(f"Problem description for {title_slug}")

# Commit my solution
def commit_problem_solution(submission, repo_dir):
    prob_dir = os.path.join(repo_dir, "leetcode", submission.title_slug)
    file_name = f"solution_{submission.id}.{extension[submission.lang]}"

    file_path = os.path.join(prob_dir, file_name)
    if not os.path.exists(file_path):
        with open(file_path, "w") as f:
            f.write(submission.code)

        repo = Repo(repo_dir)
        repo.index.add([os.path.join("leetcode", submission.title_slug, file_name)])
        repo.index.commit(
            f"Update solution for {submission.title_slug} w/ id {submission.id}\nTime: {submission.runtime}\nMemUsage: {submission.memuse}"
        )



@task()
def sub_to_repo(session, list_submission: list[Submission], repo_dir):
    for sub in list_submission:
        desc = get_description(session, sub.id)
        commit_problem_description(sub.title_slug, desc, repo_dir)
        commit_problem_solution(sub, repo_dir)

@task()
def get_leetcode_root_readme(list_new_sub, all_sub, repo_url):
    distinct_probs = set()
    for sub in list_new_sub:
        distinct_probs.add(sub.title_slug)

    for sub in all_sub:
        distinct_probs.add(sub[1])

    distinct_probs = sorted(list(distinct_probs))

    markdown = ["## LeetCode"]
    repo_root = repo_url.replace("CodeTest.git", "CodeTest") + "/src/branch/main"
    for title_slug in distinct_probs:
        url = os.path.join(repo_root, "leetcode", title_slug)
        markdown.append(f"- [{title_slug}]({url})")

    return "\n".join(markdown)


@task()
def write_root_readme(leetcode_markdown, repo_dir):
    root_readme_path = os.path.join(repo_dir, "Readme.md")
    with open(root_readme_path, "w") as f:
        f.write("# CodeTest\n\nSolved problem code in LeetCode, Baekjoon.\n")
        f.write(leetcode_markdown)
        f.write("\n\n")

    repo = Repo(repo_dir)
    repo.index.add(["Readme.md"])
    repo.index.commit("Update Root Readme.md")


# Git push
@task()
def git_push(repo_dir):
    repo = Repo(repo_dir)
    remote = Remote(repo, name="origin")
    remote.push()

# dag
    code_test_url = Variable.get("CODE_REPO_URL")
    task_clone = git_clone(code_test_url, "")
    task_write = sub_to_repo(web_session, list_new_sub, "")
    task_push = git_push("")

    leetcode_markdown = get_leetcode_root_readme(list_new_sub, all_subs, code_test_url)
    task_root = write_root_readme(leetcode_markdown, "")

    task_clone >> task_write >> task_push
    task_root >> task_push

Add submission record to DB

Now that we have finished pushing, let’s add the processed results to the DB.

# Add record to db
@task()
def submission_to_db(db_session, list_submission: list[Submission]):
    for sub in list_submission:
        record = LeetCodeTable(submission_id=sub.id, title_slug=sub.title_slug)
        db_session.add(record)

    db_session.commit()

# dag
    sub_to_db = submission_to_db(db_session, list_new_sub)

Whole DAG result

The entire DAG written in this way is as follows.

with DAG(
    dag_id="code_solution_parser",
    description="LeetCode에 제출한 Solution을 parsing하는 코드",
    default_args={
        "owner": "",
        "retries": 1,
        "retry_delay": timedelta(minutes=1),
    },
    params={
        "csrftoken": Param(
            "",
            type="string",
        ),
        "leetcode_session": Param(
            "",
            type="string",
        ),
        "repo_dir": Param("", type="string"),
    },
    start_date=datetime(2024, 1, 1, tzinfo=Timezone("Asia/Seoul")),
    schedule_interval="@weekly",
    tags=["programming"],
) as dag:
    db_url = os.path.join(Variable.get("DB_URL"), "codesite")
    engine = create_engine(db_url)
    PostgresSession = sessionmaker(bind=engine)
    db_session = PostgresSession()

    web_session = requests.Session()

    web_session = set_web_session_cookie(
        web_session, "", ""
    )

    list_sub = get_acsub(web_session)
    all_subs = get_all_sub(db_session)

    list_new_sub = check_already_copied(list_sub, all_subs)

    code_test_url = Variable.get("CODE_REPO_URL")
    task_clone = git_clone(code_test_url, "")
    task_write = sub_to_repo(web_session, list_new_sub, "")
    task_push = git_push("")
    sub_to_db = submission_to_db(db_session, list_new_sub)

    leetcode_markdown = get_leetcode_root_readme(list_new_sub, all_subs, code_test_url)
    task_root = write_root_readme(leetcode_markdown, "")

    task_clone >> task_write >> task_push >> sub_to_db
    task_root >> task_push