Everything is fine, but you need to rewrite or why your code does not get into production

I will analyze a simple task, getting an rss feed, and how the code that simply receives the feed will differ from the one that is actually used in production.

I hope the material will be useful to novice programmers and will show how approximately development should be carried out with an eye to obtaining a result applicable in projects.

It still works, why not?

Getting server response via hyperlink_Kandinsky 2.1

Getting server response via hyperlink_Kandinsky 2.1

Let’s start with the conflict: the solution to the problem of getting an rss feed looks very simple, like this:

import requests
import feedparser

response = requests.get('https://lenta.ru/rss')
lenta = feedparser.parse(response.text)

for items in lenta['entries']:
    print(items["title"])

For a one-time satisfaction of curiosity, it is enough, but what if we need to receive several RSS feeds in a dense mode (for example, every minute)?

Perhaps the main difference between the sale is that the code is not run manually (for the sake of curiosity: “What will we get interestingly?”), But to obtain a stable predictable result in large quantities. In our case, this is monitoring the rss feed, i.e. we will need to send a lot of requests and get a lot of responses.

I will analyze the most necessary elements that need to be added to this code so that it can work stably, receiving several feeds or even several feeds at the same time).

How the code will work, what result is needed

A programmer who understands the purpose of the code he writes_Kandinsky 2.1

A programmer who understands the purpose of the code he writes_Kandinsky 2.1

Question number 1 is where and how the code will run. In this case, it will either be an infinite while true loop as a service on the server, or a scheduled launch. In general, both approaches require one thing: we we need a stable restart so that if we get one error, the whole system does not fall. But this is a bit of a jump ahead. Let’s deal with the simplest first.

It is important to understand where and under what conditions what you write will be launched.

Checking the 200 response

Checking 200 response_Kandinsky 2.1

Checking 200 response_Kandinsky 2.1

So requests.get(url)what is wrong and what needs to be added.

To begin with, requests.get is a tricky story, and if you plan to send regular requests to the server, it’s a good idea to handle responses with a response code other than 200.

We add a check, it would be better to place the line in a function.

def get_response(url):
    response = requests.get(url)
    if response.status_code != 200:
        return response
    else:
        return False

If the answer is 200, we get the answer and move on, if not, we also move on with a small but.

Logging, execution control and debugging

Logging, execution control and debugging_Kandinsky 2.1

Logging, execution control and debugging_Kandinsky 2.1

If you think about the situation when the server does not return the 200th response, then I think there should be an intuitive desire to record what is happening in order to:

  1. Track when no response is received

  2. Understand why this is happening.

It is better to move this check into a separate function:

def response_res(response):
    status = True
    if response.status_code != 200:
        status = False    
    return {'status':status, 'code': response.status_code, 'reason':response.reason}

The function returns a dictionary containing a code (needed to check the next step) and a reason (needed to debug).

Thanks to this check, we can get something like:

HTTPSConnectionPool(host=”riafan.ru”, port=443): Max retries exceeded with url: /feed (Caused by ProxyError(‘Cannot connect to proxy.’, timeout(‘_ssl.c:1114: The handshake operation timed out ‘)))

And think about what to do with it.

A little disguise

A bit of disguise_Kandinsky 2.1

A bit of disguise_Kandinsky 2.1

As can be understood from the error above, automatic data collection is not very welcome, even in such a seemingly legal field as receiving an RSS feed. Therefore, for the stable functioning of the code, this situation must also be taken into account.

As more experienced comrades are well aware, a naked request will most likely either catch a captcha for the second or third time or simply be blocked by the server, it would be nice to add masking and some kind of header. Let’s improve the function a bit:

import fake_useragent
import logging

def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {res.status_code}, {s.cookies}")
    return response

Of course, this is not enough, at least it’s not good to send requests with empty cookies, referer, and so on, but I won’t go into such details in this article, the main thing is that the direction of further bottleneck research is clear.

If it doesn’t work at all

try-except decorator_Kandinsky 2.1

try-except decorator_Kandinsky 2.1

Going further, the capriciousness of requests is not limited to server responses, very often it can return trouble to us in the form of an error. If we work with requests from several feeds, an error in one will kill the entire collection.

We add try-except, so beloved by many, we get one more function:

def try_request():
    try:
        return get_response(url)
    except:
        return False

Here you can see that if successful, we get our response, but with an exception, the question arises, how to process it correctly.

In order not to write additional functions, we use a Response() object with a response other than 200 in the exception, and pass an error with it. Like that:

from requests.models import Response

def try_request():
    try:
        return get_response(url)
    except Exception as ex:
        response = Response()
        response.reason = ex    
        response.status_code = 444
        return response

Let’s add some variety to the process and make the try_request() function look like a decorator.

import sys

def try_request(req):
    def wrap(url):
        status = False
        try:
            response = req(url)
            error = 0
            status = True
        except Exception as ex:
            response = Response()
            response.reason = ex
            response.status_code = 444
            error = sys.exc_info()[1]
        return {"status": status, "response": response, "error": error}

    return wrap


@try_request
def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random    
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {response.status_code}, {s.cookies}")
    return response

Again we use the dictionary, in case of receiving errors and debugging them. If the function does not work, the response generated by us will be returned, which the function will catch by the wrong response code.

Everything is ready for deployment

And now let's start the multi-threaded mode_Kandinsky 2.1

And now let’s start the multi-threaded mode_Kandinsky 2.1

Bottlenecks are taken into account, you can try to deploy the script in a working version. At the same time, you can use multi-threaded mode, and individual possible problems will not affect the overall performance. For simultaneous requests, multithreading is generally good, as it saves a lot of execution time.

from multiprocessing.pool import ThreadPool

def pool_data(rss_list):
    pool = ThreadPool(len(rss_list))
    try:
        feeds = pool.map(harvest_all, rss_list)
        pool.close()
        return feeds
    except Exception as ex:
        logging.info(f"многопоточность сломалась {ex}")
        return []

Everything is simple here, with the help of ThreadPool we create the number of threads equal to the number of tapes, and we work everything out at the same time.

I didn’t have to catch errors on this function, maybe try-except is superfluous here, but it seems like it doesn’t ask for it and doesn’t really interfere.

Everything seems to be ready..

Run the program… and put the server

Crashed Server_Kandinsky 2.1

Crashed Server_Kandinsky 2.1

It won’t work consistently. We forgot to include timeout in s.get!

If you run the program in schedule mode (for example, every 30 seconds), a situation may arise when a server response is expected, and a new request leaves, then another and another, and

out of memory killed process

Let’s add a timeout:

response = s.get(url, headers=header, timeout=3)

A 200 response does not guarantee that you got what you wanted, one more check

What do you have inside?_Kandinsky 2.1

What do you have inside?_Kandinsky 2.1

And you also need to check that the server response contains what you want. The server response may be with the code 200, but it may not contain the content you are waiting for. For example, a captcha may be complete with a 200 code or a page blocking your endless requests without a header.

In our case, we get a dictionary with certain fields, so we can do a universal check.

def check_feed(response):
    status = False
    lenta = feedparser.parse(response.text)     
    if lenta['entries']:
        status = True    
    return {'status':status, 'lenta': lenta['entries']} 

What the requests.get(url) line looks like in the finished project

A line of code turns into a Lernaean Hydra_Kandinsky 2.1

A line of code turns into a Lernaean Hydra_Kandinsky 2.1

The final view is as follows: we have a list of addresses of rss feeds, which we pass to the program, requests to all addresses are sent simultaneously, after which it is checked:

  1. That the request worked without error at all

  2. Received server response (fixing the causes of problems)

  3. That the answer has the right content.

At the same time, if some link does not work for some reason, we will get a code and a value that will allow us to understand the causes of the problems without stopping the script.

Finally, this is how the response = requests.get(url) line looks like in the working draft:

hidden text
import requests
import feedparser
import sys
from requests.models import Response
import fake_useragent
from multiprocessing.pool import ThreadPool
import logging


def response_res(response):
    status = True
    if response.status_code != 200:
        status = False
    return {"status": status, "code": response.status_code, "reason": response.reason}


def try_request(req):
    def wrap(url):
        status = False
        try:
            response = req(url)
            error = 0
            status = True
        except Exception as ex:
            response = Response()
            response.reason = ex
            response.status_code = 444
            error = sys.exc_info()[1]
        return {"status": status, "response": response, "error": error}

    return wrap


@try_request
def get_response(url):
    s = requests.Session()
    user = fake_useragent.UserAgent().random   
    header = {"user-agent": user}
    response = s.get(url, headers=header)
    logging.info(f"{url}, {response.status_code}, {s.cookies}")
    return response


def check_feed(response):
    status = False
    lenta = feedparser.parse(response.text)
    if lenta["entries"]:
        status = True
    return {"status": status, "lenta": lenta["entries"]}


def harvest_all(url):
    response = get_response(url)
    response_stat = response_res(response["response"])
    feed_res = check_feed(response["response"])
    res_dict = {
        "feed": url,
        "response": response,
        "response_status": response_stat,
        "feed_cheker": feed_res,
    }
    return res_dict


def pool_data(rss_list):
    pool = ThreadPool(len(rss_list))
    try:
        feeds = pool.map(harvest_all, rss_list)
        pool.close()
        return feeds
    except Exception as ex:
        logging.info(f"многопоточность сломалась")
        return []


def main():
    rss_list = [
        "https://feed1.xml",
        "https://feed2.xml",
        "https://feed3.xml",
    ]
    feeds = pool_data(rss_list)
    for item in feeds:
        if item["feed_cheker"]["status"]:
            lenta = feedparser.parse(item["response"]["response"].text)
            for titles in lenta["entries"]:
                print(titles["title"])


if __name__ == "__main__":
    main()

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *