How to write your own small parser in Python?
Hi all. I think some people have wondered how to write their own small parser in Python. In this article I want to consider a fairly simple case – this is data parsing from a ready-made dataset. stackexchange.com
. In the future it will be possible to work with more complex tasks. In fact, creating a parser can be divided into two stages: preparation and coding.
Stage I. Preparation
First, we need to create a folder for our parser. It will store our code and a file with parsed data. Personally, I will use Visual Studio Code, but there is not much difference.
Add the created folder to the workspace of your program and create a file in it (for example parser.py)
Open a terminal and check that you have the correct interpreter selected and that you are in your working folder (by default this is always the case, but just in case). Further in the terminal, using pip
download libraries:
pip install requests
pip install tqdm
It is important that these libraries are installed in your virtual environment venv
.
Libraries json
And time
are built in to Python. There is no need to install them.
This completes the first stage.
Stage II. Code
Open the file and start coding. First we need to import the libraries:
import requests
import time
import json
from tqdm import tqdm
In this case the library requests
sends requests using the path requested by the user, time
used to format time, json
to work with .json
files. Library tqdm
needed for convenience and beauty of use. It shows the progress bar and the approximate time remaining.
Next we write our paths to the server or site from where we will collect information:
quest_url = "https://api.stackexchange.com/2.3/questions"
ans_url = "https://api.stackexchange.com/2.3/questions/{question_id}/answers"
In this case quest_url
goes to the place where user questions are stored, ans_url
to the place where the answers to them are stored.
Let's write our main function fetch_stackexchange_questions
which will capture the questions:
def fetch_stackexchange_questions(site="stackoverflow", page=1, pagesize=100):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"pagesize": pagesize,
"page": page,
"filter": "withbody"
}
response = requests.get(quest_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {response.status_code}")
return None
IN params
We store the parameters accordingly:
site
– website from the networkStackExchange
(for example “stackoverflow”).page
– page number (default 1).pagesize
– the number of requests on one page (the default is 100 and it’s better to use this number, because if you increase the number you can get into error 400. In short, 100 is better).filter
– our filter (parameterwithbody
allows us to capture the body of the question. Yes, we can grab information that we don’t need, but it’s better to take too much and then use what we need, than to miss something).
Next, a GET request is executed, and if the response status is successful (code 200), then the data is returned in JSON format. Otherwise, an error message like Я ошибся #номер ошибки
.
Let's write a function fetch_answers_for_question
which gets a list of answers for a specific question by its ID:
def fetch_answers_for_question(question_id, site="stackoverflow"):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"filter": "withbody"
}
response = requests.get(ans_url.format(question_id=question_id), params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {question_id}: {response.status_code}")
return None
A similar GET request to the API is executed with substitution question_id
. Responses are returned in JSON format.
Let's write a function parse_questions_with_answers
which processes questions and adds appropriate answers to them.
def parse_questions_with_answers(data, site="stackoverflow"):
parsed_data = []
if "items" in data:
for item in tqdm(data["items"], desc="Parsing questions and answers"):
question = {
"question_id": item.get("question_id"),
"title": item.get("title"),
"body": item.get("body"),
"tags": item.get("tags"),
"link": item.get("link"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(item.get("creation_date"))),
"score": item.get("score"),
"answers": []
}
answers_data = fetch_answers_for_question(item.get("question_id"), site)
if answers_data and "items" in answers_data:
for answer in answers_data["items"]:
answer_info = {
"answer_id": answer.get("answer_id"),
"body": answer.get("body"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(answer.get("creation_date"))),
"score": answer.get("score"),
"is_accepted": answer.get("is_accepted")
}
question["answers"].append(answer_info)
parsed_data.append(question)
return parsed_data
It goes through the list of questions, and for each question it adds metadata such as title, body, tags, link, creation date, and ratings. Then, for each question, an additional request is sent to obtain answers, which are then added to the question. The processing progress is displayed using tqdm
.
Let's write a function save_to_json
which saves the received and processed data to a JSON file. If there is no such file in the directory, the file will be automatically created stackexchange_data_with_answers.json
.
def save_to_json(data, filename="stackexchange_data_with_answers.json"):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"Данные сохранены в {filename}")
Let's write the main logic of the program:
if __name__ == "__main__":
site = "stackoverflow"
page = 1
pagesize = 100
questions_data = fetch_stackexchange_questions(site=site, page=page, pagesize=pagesize)
if questions_data:
parsed_questions = parse_questions_with_answers(questions_data, site)
save_to_json(parsed_questions, f"{site}_questions_with_answers.json")
In the block if __name__ == "__main__"
the steps of receiving questions by the function are performed fetch_stackexchange_questions
parsing questions and their answers through parse_questions_with_answers
and saving data to a JSON file via save_to_json
.
Full code:
import requests
import time
import json
from tqdm import tqdm
quest_url = "https://api.stackexchange.com/2.3/questions"
ans_url = "https://api.stackexchange.com/2.3/questions/{question_id}/answers"
def fetch_stackexchange_questions(site="stackoverflow", page=1, pagesize=100):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"pagesize": pagesize,
"page": page,
"filter": "withbody"
}
response = requests.get(quest_url, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {response.status_code}")
return None
def fetch_answers_for_question(question_id, site="stackoverflow"):
params = {
"order": "desc",
"sort": "activity",
"site": site,
"filter": "withbody"
}
response = requests.get(ans_url.format(question_id=question_id), params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Я ошибся {question_id}: {response.status_code}")
return None
def parse_questions_with_answers(data, site="stackoverflow"):
parsed_data = []
if "items" in data:
for item in tqdm(data["items"], desc="Parsing questions and answers"):
question = {
"question_id": item.get("question_id"),
"title": item.get("title"),
"body": item.get("body"),
"tags": item.get("tags"),
"link": item.get("link"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(item.get("creation_date"))),
"score": item.get("score"),
"answers": []
}
answers_data = fetch_answers_for_question(item.get("question_id"), site)
if answers_data and "items" in answers_data:
for answer in answers_data["items"]:
answer_info = {
"answer_id": answer.get("answer_id"),
"body": answer.get("body"),
"creation_date": time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(answer.get("creation_date"))),
"score": answer.get("score"),
"is_accepted": answer.get("is_accepted")
}
question["answers"].append(answer_info)
parsed_data.append(question)
return parsed_data
def save_to_json(data, filename="stackexchange_data_with_answers.json"):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"Данные сохранены в {filename}")
if __name__ == "__main__":
site = "stackoverflow"
page = 1
pagesize = 100
questions_data = fetch_stackexchange_questions(site=site, page=page, pagesize=pagesize)
if questions_data:
parsed_questions = parse_questions_with_answers(questions_data, site)
save_to_json(parsed_questions, f"{site}_questions_with_answers.json")
How does our parser actually work?
Now we need to understand what our code does:
The code first makes a request to the StackExchange API to get a list of questions. Then, for each question, another request is sent to obtain answers. Next, all data is structured and written to a JSON file.
What is the main purpose of parsing various information?
Information scraping can be useful in different areas and for different purposes. Let me give you a few examples:
For data analysis
For competitive analysis
To collect information during research
To create content on websites or blogs
For automation
For user analytics
For niche research
Useful information and additional materials
This article uses fairly standard libraries, but still not everyone is familiar with them. To do this, I suggest you read these articles:
That's all. I will be glad to see your comments.