Parse Baekjoon data using Airflow

Updated:

지난번에는 Leetcode에서 맞춘 문제의 설명과 내 코드를 개인 repository에 자동으로 정리해주는 airflow 코드를 공유했다. 이번에는 한국인들이 많이 활용하는 Baekjoon 문제 풀이를 정리해주는 코드를 공유한다. 지난번 코드와 달라져야하는 지점은

  1. url 구조가 다르다. 문제 source code 페이지 url은 제출번호를 이용하는 것으로 비슷하지만, 문제 설명 페이지는 문제 이름이 아닌 문제 번호를 이용하고 있다.
    https://www.acmicpc.net/source/{submission_id}
    https://www.acmicpc.net/problem/{problem_id}
    
  2. 필요한 Cookie 정보가 다르다. 당연하게도 Leetcode의 인증정보를 담고 있는 cookie와 Baekjoon 사이트의 인증정보 cookie가 다르다. Baekjoon페이지로 보내는 request에서 아래 cookie 정보를 확인하자.
    bojautologin
    FCNEC
    OnlineJudge
    
  3. Submission을 읽어오는 api가 따로 없다. 정확히는 내가 그걸 찾아보지 않고 Submission을 확인할 수 있는 페이지에서 직접 데이터를 파싱하기로 했다. 그 주소는 아래와 같다.
    https://www.acmicpc.net/status?user_id={user_id}&result_id=4&top={last_subid}
    

    여기서 result_id는 제출의 결과를 나타내며, 4는 정답 코드를 의미한다. 즉 맞춘 제출들만 리스트업해주는 페이지다.

그 결과 이번 DAG는 아래와 같은 수순으로 이뤄진다.

  1. Local DB에서 기존에 처리한 제출 기록을 모두 가져옴.
  2. Baekjoon의 최근 제출 기록 중 Local DB에 없는 데이터가 나올 때까지 파싱
  3. Baekjoon 페이지에서 Problem description과 제출 코드 parsing
  4. 제출 기록을 토대로 source code와 problem description push
  5. 처리한 제출 기록을 DB에 기록.

아래에는 세부적인 차이들을 기록해본다.

DB setup

Root Readme에 적힐 링크나, 문제 디렉토리들은 문제 제목을 활용하는 것이 나중에 보기가 편할 것 같다. Leetcode는 애초에 문제 id를 제목으로 만들어두어서 상관이 없었으나 Baekjoon은 문제 번호와 제목을 따로 관리하고 있어 매번 새로 parsing하지 않도록 이 부분 역시 DB에 저장해둘 필요가 있었다.

CREATE TABLE leetcodee(
    submission_id INTEGER PRIMARY KEY,
    problem_id INTEGER,
    problem_name TEXT
);

class BaekjoonTable(Base):
    __tablename__ = "baekjoon"

    submission_id = Column(Integer, primary_key=True)
    problem_id = Column(Integer)
    problem_name = Column(String)

Parsing Submission record

Leetcode와는 다르게 Baekjoon 사이트에는 내 제출기록이 꽤 많았다. 물론 한 번 파싱한 후에는 웬만해서는 쌓일 일이 없겠지만, 적어도 몇 페이지 정도의 기록은 한 번에 처리할 수 있어야 했다.

@task()
def get_acsub(session, user_id, all_sub):
    id_in_db = set([sub[0] for sub in all_sub])
    try:
        meet_end = False
        response = session.get(
            f"https://www.acmicpc.net/status?user_id={user_id}&result_id=4"
        )
        soup = bs(response.text, "html.parser")
        content = soup.find_all("tr", id=lambda x: x and x.startswith("solution-"))

        result = []
        for data in content:
            sub = Submission(data.find_all("td"))
            if sub.id in id_in_db:
                meet_end = True
                break
            result.append(sub)

        while not meet_end:
            last_subid = result[-1].id
            response = session.get(
                f"https://www.acmicpc.net/status?user_id={user_id}&result_id=4&top={last_subid}"
            )
            soup = bs(response.text, "html.parser")
            content = soup.find_all("tr", id=lambda x: x and x.startswith("solution-"))

            if len(content) == 1:
                meet_end = True
                break

            for data in content[1:]:
                sub = Submission(data.find_all("td"))
                if sub.id in id_in_db:
                    meet_end = True
                    break
                result.append(sub)

        return result
    except KeyError:
        time.sleep(1)
        return get_acsub(session, all_sub)

Parse code

지난번에는 code를 api에서 바로 가져올 수 있었지만 이번에는 특정 코드에서 parsing할 필요가 있었다.

def get_code(session, submission_id: int) -> str:
    response = session.get(f"https://www.acmicpc.net/source/{submission_id}")
    soup = bs(response.text, "html.parser")
    code = soup.find("textarea", {"name": "source"}).text

    return code

Parse problem description

제출한 문제의 설명을 긁어오는 코드도 변화가 있었다. 무엇보다 html에서 markdown으로 변화시키기 위해 함수를 만들어야했다.

def html_to_markdown(html_content) -> str:
    soup = bs(html_content, "html.parser")
    markdown = []

    # Problem title
    title = soup.find("span", id="problem_title")
    if title:
        markdown.append(f"# {title.text.strip()}\n")

    # Problem info table
    info_table = soup.find("table", id="problem-info")
    if info_table:
        headers = info_table.find_all("th")
        values = info_table.find_all("td")
        for header, value in zip(headers, values):
            markdown.append(f"**{header.text.strip()}:** {value.text.strip()}")
        markdown.append("")

    # Problem sections
    sections = ["description", "input", "output", "hint", "source"]
    for section in sections:
        section_div = soup.find("section", id=section)
        if section_div:
            headline = section_div.find("div", class_="headline")
            if headline:
                markdown.append(f"## {headline.text.strip()}")
            content = section_div.find("div", class_="problem-text")
            if content:
                markdown.append(content.get_text(strip=True, separator="\n"))
            markdown.append("")

    # Sample input/output
    samples = soup.find_all("section", id=re.compile(r"sampleinput|sampleoutput"))
    for sample in samples:
        headline = sample.find("div", class_="headline")
        if headline:
            markdown.append(f"## {headline.h2.text.strip()}")
        pre = sample.find("pre", class_="sampledata")
        if pre:
            markdown.append("```")
            markdown.append(pre.text.strip())
            markdown.append("```")
        markdown.append("")

    # Problem tags
    tags_section = soup.find("section", id="problem_tags")
    if tags_section:
        markdown.append("## 알고리즘 분류")
        tags = tags_section.find_all("a", class_="spoiler-link")
        for tag in tags:
            markdown.append(f"- {tag.text.strip()}")
        markdown.append("")

    # Language restrictions
    lang_restrict = soup.find("section", id="language_restrict")
    if lang_restrict:
        markdown.append("## 제출할 수 없는 언어")
        markdown.append(lang_restrict.find("div", class_="problem-text").text.strip())
        markdown.append("")

    # Time and memory limits
    for limit_type in ["time", "memory"]:
        limit_section = soup.find("section", id=f"problem-{limit_type}-limit")
        if limit_section:
            markdown.append(f"## {limit_section.h2.text.strip()}")
            limits = limit_section.find_all("li")
            for limit in limits:
                markdown.append(f"- {limit.text.strip()}")
            markdown.append("")

    return "\n".join(markdown)

def get_description(session, problem_id: int) -> str:
    response = session.get(f"https://www.acmicpc.net/problem/{problem_id}")
    return html_to_markdown(response.text)

Root Readme

이제는 Root directory의 Readme에 LeetCode와 Baekjoon 결과가 모두 담겨져있다. 새로 추가되는 데이터 뿐만 아니라 기존 데이터도 잘 보존해야하므로 어떤 사이트 데이터인지 확인해 다른 사이트 데이터는 그대로 보존하도록 코드를 수정했다.

@task()
def write_root_readme(baekjoon_markdown, repo_dir):
    root_readme_path = os.path.join(repo_dir, "Readme.md")
    new_readme_path = os.path.join(repo_dir, "New_Readme.md")

    with open(root_readme_path, "r") as f, open(new_readme_path, "w") as g:
        is_baekjoon = False
        meet_baekjoon = False
        for line in f:
            if line.startswith("#"):
                if line.lstrip("#").strip() == "Baekjoon":
                    is_baekjoon = True
                    meet_baekjoon = True
                    g.write(baekjoon_markdown)
                    g.write("\n\n")
                else:
                    is_baekjoon = False

            if is_baekjoon:
                continue

            g.write(line)

        if not meet_baekjoon:
            g.write(baekjoon_markdown)
            g.write("\n\n")

    os.rename(new_readme_path, root_readme_path)

    repo = Repo(repo_dir)
    repo.index.add(["Readme.md"])
    repo.index.commit("Update Root Readme.md")