Everything is fine, but you need to rewrite or why your code does not get into production
I will analyze a simple task, getting an rss feed, and how the code that simply receives the feed will differ from the one that is actually used in production.
I hope the material will be useful to novice programmers and will show how approximately development should be carried out with an eye to obtaining a result applicable in projects.
It still works, why not?
Let’s start with the conflict: the solution to the problem of getting an rss feed looks very simple, like this:
import requests
import feedparser
response = requests.get('https://lenta.ru/rss')
lenta = feedparser.parse(response.text)
for items in lenta['entries']:
print(items["title"])
For a one-time satisfaction of curiosity, it is enough, but what if we need to receive several RSS feeds in a dense mode (for example, every minute)?
Perhaps the main difference between the sale is that the code is not run manually (for the sake of curiosity: “What will we get interestingly?”), But to obtain a stable predictable result in large quantities. In our case, this is monitoring the rss feed, i.e. we will need to send a lot of requests and get a lot of responses.
I will analyze the most necessary elements that need to be added to this code so that it can work stably, receiving several feeds or even several feeds at the same time).
How the code will work, what result is needed
Question number 1 is where and how the code will run. In this case, it will either be an infinite while true loop as a service on the server, or a scheduled launch. In general, both approaches require one thing: we we need a stable restart so that if we get one error, the whole system does not fall. But this is a bit of a jump ahead. Let’s deal with the simplest first.
It is important to understand where and under what conditions what you write will be launched.
Checking the 200 response
So requests.get(url)what is wrong and what needs to be added.
To begin with, requests.get is a tricky story, and if you plan to send regular requests to the server, it’s a good idea to handle responses with a response code other than 200.
We add a check, it would be better to place the line in a function.
def get_response(url):
response = requests.get(url)
if response.status_code != 200:
return response
else:
return False
If the answer is 200, we get the answer and move on, if not, we also move on with a small but.
Logging, execution control and debugging
If you think about the situation when the server does not return the 200th response, then I think there should be an intuitive desire to record what is happening in order to:
Track when no response is received
Understand why this is happening.
It is better to move this check into a separate function:
def response_res(response):
status = True
if response.status_code != 200:
status = False
return {'status':status, 'code': response.status_code, 'reason':response.reason}
The function returns a dictionary containing a code (needed to check the next step) and a reason (needed to debug).
Thanks to this check, we can get something like:
HTTPSConnectionPool(host=”riafan.ru”, port=443): Max retries exceeded with url: /feed (Caused by ProxyError(‘Cannot connect to proxy.’, timeout(‘_ssl.c:1114: The handshake operation timed out ‘)))
And think about what to do with it.
A little disguise
As can be understood from the error above, automatic data collection is not very welcome, even in such a seemingly legal field as receiving an RSS feed. Therefore, for the stable functioning of the code, this situation must also be taken into account.
As more experienced comrades are well aware, a naked request will most likely either catch a captcha for the second or third time or simply be blocked by the server, it would be nice to add masking and some kind of header. Let’s improve the function a bit:
import fake_useragent
import logging
def get_response(url):
s = requests.Session()
user = fake_useragent.UserAgent().random
header = {"user-agent": user}
response = s.get(url, headers=header)
logging.info(f"{url}, {res.status_code}, {s.cookies}")
return response
Of course, this is not enough, at least it’s not good to send requests with empty cookies, referer, and so on, but I won’t go into such details in this article, the main thing is that the direction of further bottleneck research is clear.
If it doesn’t work at all
Going further, the capriciousness of requests is not limited to server responses, very often it can return trouble to us in the form of an error. If we work with requests from several feeds, an error in one will kill the entire collection.
We add try-except, so beloved by many, we get one more function:
def try_request():
try:
return get_response(url)
except:
return False
Here you can see that if successful, we get our response, but with an exception, the question arises, how to process it correctly.
In order not to write additional functions, we use a Response() object with a response other than 200 in the exception, and pass an error with it. Like that:
from requests.models import Response
def try_request():
try:
return get_response(url)
except Exception as ex:
response = Response()
response.reason = ex
response.status_code = 444
return response
Let’s add some variety to the process and make the try_request() function look like a decorator.
import sys
def try_request(req):
def wrap(url):
status = False
try:
response = req(url)
error = 0
status = True
except Exception as ex:
response = Response()
response.reason = ex
response.status_code = 444
error = sys.exc_info()[1]
return {"status": status, "response": response, "error": error}
return wrap
@try_request
def get_response(url):
s = requests.Session()
user = fake_useragent.UserAgent().random
header = {"user-agent": user}
response = s.get(url, headers=header)
logging.info(f"{url}, {response.status_code}, {s.cookies}")
return response
Again we use the dictionary, in case of receiving errors and debugging them. If the function does not work, the response generated by us will be returned, which the function will catch by the wrong response code.
Everything is ready for deployment
Bottlenecks are taken into account, you can try to deploy the script in a working version. At the same time, you can use multi-threaded mode, and individual possible problems will not affect the overall performance. For simultaneous requests, multithreading is generally good, as it saves a lot of execution time.
from multiprocessing.pool import ThreadPool
def pool_data(rss_list):
pool = ThreadPool(len(rss_list))
try:
feeds = pool.map(harvest_all, rss_list)
pool.close()
return feeds
except Exception as ex:
logging.info(f"многопоточность сломалась {ex}")
return []
Everything is simple here, with the help of ThreadPool we create the number of threads equal to the number of tapes, and we work everything out at the same time.
I didn’t have to catch errors on this function, maybe try-except is superfluous here, but it seems like it doesn’t ask for it and doesn’t really interfere.
Everything seems to be ready..
Run the program… and put the server
It won’t work consistently. We forgot to include timeout in s.get!
If you run the program in schedule mode (for example, every 30 seconds), a situation may arise when a server response is expected, and a new request leaves, then another and another, and
out of memory killed process
Let’s add a timeout:
response = s.get(url, headers=header, timeout=3)
A 200 response does not guarantee that you got what you wanted, one more check
And you also need to check that the server response contains what you want. The server response may be with the code 200, but it may not contain the content you are waiting for. For example, a captcha may be complete with a 200 code or a page blocking your endless requests without a header.
In our case, we get a dictionary with certain fields, so we can do a universal check.
def check_feed(response):
status = False
lenta = feedparser.parse(response.text)
if lenta['entries']:
status = True
return {'status':status, 'lenta': lenta['entries']}
What the requests.get(url) line looks like in the finished project
The final view is as follows: we have a list of addresses of rss feeds, which we pass to the program, requests to all addresses are sent simultaneously, after which it is checked:
That the request worked without error at all
Received server response (fixing the causes of problems)
That the answer has the right content.
At the same time, if some link does not work for some reason, we will get a code and a value that will allow us to understand the causes of the problems without stopping the script.
Finally, this is how the response = requests.get(url) line looks like in the working draft:
hidden text
import requests
import feedparser
import sys
from requests.models import Response
import fake_useragent
from multiprocessing.pool import ThreadPool
import logging
def response_res(response):
status = True
if response.status_code != 200:
status = False
return {"status": status, "code": response.status_code, "reason": response.reason}
def try_request(req):
def wrap(url):
status = False
try:
response = req(url)
error = 0
status = True
except Exception as ex:
response = Response()
response.reason = ex
response.status_code = 444
error = sys.exc_info()[1]
return {"status": status, "response": response, "error": error}
return wrap
@try_request
def get_response(url):
s = requests.Session()
user = fake_useragent.UserAgent().random
header = {"user-agent": user}
response = s.get(url, headers=header)
logging.info(f"{url}, {response.status_code}, {s.cookies}")
return response
def check_feed(response):
status = False
lenta = feedparser.parse(response.text)
if lenta["entries"]:
status = True
return {"status": status, "lenta": lenta["entries"]}
def harvest_all(url):
response = get_response(url)
response_stat = response_res(response["response"])
feed_res = check_feed(response["response"])
res_dict = {
"feed": url,
"response": response,
"response_status": response_stat,
"feed_cheker": feed_res,
}
return res_dict
def pool_data(rss_list):
pool = ThreadPool(len(rss_list))
try:
feeds = pool.map(harvest_all, rss_list)
pool.close()
return feeds
except Exception as ex:
logging.info(f"многопоточность сломалась")
return []
def main():
rss_list = [
"https://feed1.xml",
"https://feed2.xml",
"https://feed3.xml",
]
feeds = pool_data(rss_list)
for item in feeds:
if item["feed_cheker"]["status"]:
lenta = feedparser.parse(item["response"]["response"].text)
for titles in lenta["entries"]:
print(titles["title"])
if __name__ == "__main__":
main()