Parse Baekjoon data using Airflow
지난번에는 Leetcode에서 맞춘 문제의 설명과 내 코드를 개인 repository에 자동으로 정리해주는 airflow 코드를 공유했다. 이번에는 한국인들이 많이 활용하는 Baekjoon 문제 풀이를 정리해주는 코드를 공유한다. 지난번 코드와 달라져야하는 지점은
- url 구조가 다르다.
문제 source code 페이지 url은 제출번호를 이용하는 것으로 비슷하지만, 문제 설명 페이지는 문제 이름이 아닌 문제 번호를 이용하고 있다.
https://www.acmicpc.net/source/{submission_id} https://www.acmicpc.net/problem/{problem_id} - 필요한 Cookie 정보가 다르다.
당연하게도 Leetcode의 인증정보를 담고 있는 cookie와 Baekjoon 사이트의 인증정보 cookie가 다르다. Baekjoon페이지로 보내는 request에서 아래 cookie 정보를 확인하자.
bojautologin FCNEC OnlineJudge - Submission을 읽어오는 api가 따로 없다.
정확히는 내가 그걸 찾아보지 않고 Submission을 확인할 수 있는 페이지에서 직접 데이터를 파싱하기로 했다.
그 주소는 아래와 같다.
https://www.acmicpc.net/status?user_id={user_id}&result_id=4&top={last_subid}여기서
result_id는 제출의 결과를 나타내며, 4는 정답 코드를 의미한다. 즉 맞춘 제출들만 리스트업해주는 페이지다.
그 결과 이번 DAG는 아래와 같은 수순으로 이뤄진다.
- Local DB에서 기존에 처리한 제출 기록을 모두 가져옴.
- Baekjoon의 최근 제출 기록 중 Local DB에 없는 데이터가 나올 때까지 파싱
- Baekjoon 페이지에서 Problem description과 제출 코드 parsing
- 제출 기록을 토대로 source code와 problem description push
- 처리한 제출 기록을 DB에 기록.
아래에는 세부적인 차이들을 기록해본다.
DB setup
Root Readme에 적힐 링크나, 문제 디렉토리들은 문제 제목을 활용하는 것이 나중에 보기가 편할 것 같다. Leetcode는 애초에 문제 id를 제목으로 만들어두어서 상관이 없었으나 Baekjoon은 문제 번호와 제목을 따로 관리하고 있어 매번 새로 parsing하지 않도록 이 부분 역시 DB에 저장해둘 필요가 있었다.
CREATE TABLE leetcodee(
submission_id INTEGER PRIMARY KEY,
problem_id INTEGER,
problem_name TEXT
);
class BaekjoonTable(Base):
__tablename__ = "baekjoon"
submission_id = Column(Integer, primary_key=True)
problem_id = Column(Integer)
problem_name = Column(String)
Parsing Submission record
Leetcode와는 다르게 Baekjoon 사이트에는 내 제출기록이 꽤 많았다. 물론 한 번 파싱한 후에는 웬만해서는 쌓일 일이 없겠지만, 적어도 몇 페이지 정도의 기록은 한 번에 처리할 수 있어야 했다.
@task()
def get_acsub(session, user_id, all_sub):
id_in_db = set([sub[0] for sub in all_sub])
try:
meet_end = False
response = session.get(
f"https://www.acmicpc.net/status?user_id={user_id}&result_id=4"
)
soup = bs(response.text, "html.parser")
content = soup.find_all("tr", id=lambda x: x and x.startswith("solution-"))
result = []
for data in content:
sub = Submission(data.find_all("td"))
if sub.id in id_in_db:
meet_end = True
break
result.append(sub)
while not meet_end:
last_subid = result[-1].id
response = session.get(
f"https://www.acmicpc.net/status?user_id={user_id}&result_id=4&top={last_subid}"
)
soup = bs(response.text, "html.parser")
content = soup.find_all("tr", id=lambda x: x and x.startswith("solution-"))
if len(content) == 1:
meet_end = True
break
for data in content[1:]:
sub = Submission(data.find_all("td"))
if sub.id in id_in_db:
meet_end = True
break
result.append(sub)
return result
except KeyError:
time.sleep(1)
return get_acsub(session, all_sub)
Parse code
지난번에는 code를 api에서 바로 가져올 수 있었지만 이번에는 특정 코드에서 parsing할 필요가 있었다.
def get_code(session, submission_id: int) -> str:
response = session.get(f"https://www.acmicpc.net/source/{submission_id}")
soup = bs(response.text, "html.parser")
code = soup.find("textarea", {"name": "source"}).text
return code
Parse problem description
제출한 문제의 설명을 긁어오는 코드도 변화가 있었다. 무엇보다 html에서 markdown으로 변화시키기 위해 함수를 만들어야했다.
def html_to_markdown(html_content) -> str:
soup = bs(html_content, "html.parser")
markdown = []
# Problem title
title = soup.find("span", id="problem_title")
if title:
markdown.append(f"# {title.text.strip()}\n")
# Problem info table
info_table = soup.find("table", id="problem-info")
if info_table:
headers = info_table.find_all("th")
values = info_table.find_all("td")
for header, value in zip(headers, values):
markdown.append(f"**{header.text.strip()}:** {value.text.strip()}")
markdown.append("")
# Problem sections
sections = ["description", "input", "output", "hint", "source"]
for section in sections:
section_div = soup.find("section", id=section)
if section_div:
headline = section_div.find("div", class_="headline")
if headline:
markdown.append(f"## {headline.text.strip()}")
content = section_div.find("div", class_="problem-text")
if content:
markdown.append(content.get_text(strip=True, separator="\n"))
markdown.append("")
# Sample input/output
samples = soup.find_all("section", id=re.compile(r"sampleinput|sampleoutput"))
for sample in samples:
headline = sample.find("div", class_="headline")
if headline:
markdown.append(f"## {headline.h2.text.strip()}")
pre = sample.find("pre", class_="sampledata")
if pre:
markdown.append("```")
markdown.append(pre.text.strip())
markdown.append("```")
markdown.append("")
# Problem tags
tags_section = soup.find("section", id="problem_tags")
if tags_section:
markdown.append("## 알고리즘 분류")
tags = tags_section.find_all("a", class_="spoiler-link")
for tag in tags:
markdown.append(f"- {tag.text.strip()}")
markdown.append("")
# Language restrictions
lang_restrict = soup.find("section", id="language_restrict")
if lang_restrict:
markdown.append("## 제출할 수 없는 언어")
markdown.append(lang_restrict.find("div", class_="problem-text").text.strip())
markdown.append("")
# Time and memory limits
for limit_type in ["time", "memory"]:
limit_section = soup.find("section", id=f"problem-{limit_type}-limit")
if limit_section:
markdown.append(f"## {limit_section.h2.text.strip()}")
limits = limit_section.find_all("li")
for limit in limits:
markdown.append(f"- {limit.text.strip()}")
markdown.append("")
return "\n".join(markdown)
def get_description(session, problem_id: int) -> str:
response = session.get(f"https://www.acmicpc.net/problem/{problem_id}")
return html_to_markdown(response.text)
Root Readme
이제는 Root directory의 Readme에 LeetCode와 Baekjoon 결과가 모두 담겨져있다. 새로 추가되는 데이터 뿐만 아니라 기존 데이터도 잘 보존해야하므로 어떤 사이트 데이터인지 확인해 다른 사이트 데이터는 그대로 보존하도록 코드를 수정했다.
@task()
def write_root_readme(baekjoon_markdown, repo_dir):
root_readme_path = os.path.join(repo_dir, "Readme.md")
new_readme_path = os.path.join(repo_dir, "New_Readme.md")
with open(root_readme_path, "r") as f, open(new_readme_path, "w") as g:
is_baekjoon = False
meet_baekjoon = False
for line in f:
if line.startswith("#"):
if line.lstrip("#").strip() == "Baekjoon":
is_baekjoon = True
meet_baekjoon = True
g.write(baekjoon_markdown)
g.write("\n\n")
else:
is_baekjoon = False
if is_baekjoon:
continue
g.write(line)
if not meet_baekjoon:
g.write(baekjoon_markdown)
g.write("\n\n")
os.rename(new_readme_path, root_readme_path)
repo = Repo(repo_dir)
repo.index.add(["Readme.md"])
repo.index.commit("Update Root Readme.md")