Solving the problem of “falling” processes in the application, working 24/7 in multiprocessing mode

Formulation of the problem

There is an application in which it performs several functions, for example, collecting data from various sources, processing it and placing the results in a database. The application, as planned, should work 24/7 so that at any time you can connect to the database and get the latest information.

But here’s the problem… It seems that all the code is debugged, the application is stable, but at some point it is noticed that “bang” and the process is gone. No errors in the logs, no signals, nothing. And how to catch it is not very clear, but the work is worth it and you need to somehow start it. There is not much time for debugging.

Simulation

Imagine that there are two simple functions that do something (no matter what they do). If we are talking about multiprocessing, then we are saying that each function is launched separately in the system, as a separate task, which can be tracked in the Task Manager. So the process has a PID.

Let’s imagine that the third process is a “tracker”. The tracker’s job is to keep track of whether there is still a process with the same PID in the system. If the function ends or the process spontaneously dies, then its task is to see this (ie not to see such a PID) and restart the process by re-placing the function there.

It is obvious that it is necessary to simulate a process crash, i.e. there must be a function that artificially kills the process.

I also want to be as close to reality as possible, not to transfer variables between processes, so that everyone works autonomously ..

Implementation

Here is an example of two simple functions that live and do something (you can not look at the hidden text, the so-called functions will also be presented in the full texts of the solution)

hidden text
def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)

Two examples were implemented, which differ in that the function responsible for tracking and restoring, in the first case, lives in the main application process, and in the second, it is launched by a separate process. The second case is complicated by the fact that the new process is restarted from the restore function, and if you execute join in it, then she herself will also be waiting. The solution to this problem was to launch join in a separate thread.

The second option, by the way, looks nicer and more logical.

A separate point is the understanding of which operating system you are running the application on. I had to make a “reservation” on Linux for obvious reasons.

Here is the first implementation option (full text of the decision):

# Работа с процессами - рабочая версия

import multiprocessing as mp
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess


class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
    
def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)


def process_killer(PID_list, timepass, exclusion_list=[]):
    while True:
        system = platform.system()
        x = random.uniform(0, timepass)
        time.sleep(x)
        this_process_pid = os.getpid()
        this_process = psutil.Process(this_process_pid)
        this_process_info_as_dict = this_process.as_dict()
        parent_process_pid = this_process.ppid()
        parent_process = psutil.Process(parent_process_pid)
        parent_process_as_dict = parent_process.as_dict()
        parent_process_children = parent_process.children(recursive=True)
        child_pid_list = []
        for i in range(len(parent_process_children)):
            child_info_as_dict = parent_process_children[i].as_dict()
            if child_info_as_dict['pid'] != this_process_pid:
                child_pid_list.append(child_info_as_dict['pid'])
        child_pid_list = list(set(child_pid_list) - set(exclusion_list))
        child_pid_list = list(set(child_pid_list) - set([this_process_pid]))
        # for i in range(len(child_pid_list)):
        #     print(f'Process_{i+1} PID: {child_pid_list[i]}')
        temp_str="\n".join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
        print(f"{temp_str}")

        if len(child_pid_list) > 0:
            if len(child_pid_list) > 1:
                number = random.randint(0, len(child_pid_list) - 1)
            else:
                number = 0
            kill_proc = psutil.Process(child_pid_list[number])
            kill_process_info_as_dict = kill_proc.as_dict()
            if psutil.pid_exists(kill_process_info_as_dict['pid']):
                if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
                    print("We kill the process with PID", kill_process_info_as_dict['pid'])
                    try:
                        process = psutil.Process(kill_process_info_as_dict['pid'])
                    except psutil.NoSuchProcess:
                        print(f"Process with PID {child_pid_list[number]} not found.")
                        continue
                    else:
                        if system == "Windows":
                            kill_proc.kill()
                        elif system == "Linux":
                            os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
                            # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
                            # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])

                        print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
                        child_pid_list.remove(kill_process_info_as_dict['pid'])




if __name__ == "__main__":
    print(f'{bcolors.OKGREEN}Начало работы программы!{bcolors.ENDC}')

    process_name_list = ["process_1", "process_2"]

    process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
    process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})

    process_1.start()
    process_2.start()

    PID_list = [process_1.pid, process_2.pid]

    # process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
    # process_recov.start()

    process_kill = mp.Process(target=process_killer,
                              kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': []})
    process_kill.start()

    PID_list.append(process_kill.pid)

    system = platform.system()

    while True:
        if system == "Linux":
            os.wait()
        for i in range(len(PID_list)):
            try:
                if psutil.pid_exists(PID_list[i]):
                    pass
                    # print(f'Process with PID {process_pid} is alive')
                else:
                    print(f'Process with PID {PID_list[0]} is dead')
                    print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
                    if PID_list[i] == PID_list[0]:
                        process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
                    if PID_list[i] == PID_list[1]:
                        process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
                    process.start()
                    old_pid = PID_list[i]
                    PID_list[i] = process.pid

                    temp_str=""
                    for i in range(len(PID_list)):
                        if  PID_list[i]==process.pid:
                            temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {PID_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
                        else:
                            temp_str += f'Process_{i + 1} PID: {PID_list[i]}\n'
                    temp_str = temp_str.rstrip()
                    # temp_str="\n".join(
                    #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
                    print('Recovery result:\n' + temp_str)
            except:
                pass
        time.sleep(0.2)





    print("Main PID:", os.getpid())
    print("Process_1 PID:", process_1.pid)
    print("Process_2 PID:", process_2.pid)
    print("Process_killer PID:", process_kill.pid)

    process_1.join()
    process_2.join()
    process_kill.join()
    # process_recov.join()
    process.join()

    time.sleep(5)

    print("Program completed")

And here is the second one:

# Работа с процессами - рабочая, но сомнительная версия
import multiprocessing as mp
import threading
import time
import random
import psutil
import os
import platform
import fnmatch
import signal
import subprocess

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

def proc_1(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_1: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_1 COMPLETED!")
            break
        time.sleep(x)


def proc_2(timepass=1, repeat=20):
    repeated = True
    i = 0
    while repeated:
        x = random.uniform(0, timepass)
        i += 1
        if i < repeat:
            print("Proc_2: " + str(i) + ", will fall asleep for: " + str(x))
        else:
            repeated = False
            print("Proc_2 COMPLETED!")
            break
        time.sleep(x)


def process_killer(PID_list, timepass, exclusion_list=[]):
    while True:
        system = platform.system()
        x = random.uniform(0, timepass)
        time.sleep(x)
        this_process_pid = os.getpid()
        this_process = psutil.Process(this_process_pid)
        this_process_info_as_dict = this_process.as_dict()
        parent_process_pid = this_process.ppid()
        parent_process = psutil.Process(parent_process_pid)
        parent_process_as_dict = parent_process.as_dict()
        parent_process_children = parent_process.children(recursive=True)
        child_pid_list = []
        for i in range(len(parent_process_children)):
            child_info_as_dict = parent_process_children[i].as_dict()
            if child_info_as_dict['pid'] != this_process_pid:
                child_pid_list.append(child_info_as_dict['pid'])
        child_pid_list = list(set(child_pid_list) - set(exclusion_list))
        # for i in range(len(child_pid_list)):
        #     print(f'Process_{i+1} PID: {child_pid_list[i]}')
        temp_str="\n".join([f'Process_{i + 1} PID: {child_pid_list[i]}' for i in range(len(child_pid_list))])
        print(f"{temp_str}")

        if len(child_pid_list) > 0:
            if len(child_pid_list) > 1:
                number = random.randint(0, len(child_pid_list) - 1)
            else:
                number = 0
            kill_proc = psutil.Process(child_pid_list[number])
            kill_process_info_as_dict = kill_proc.as_dict()
            if psutil.pid_exists(kill_process_info_as_dict['pid']):
                if fnmatch.fnmatch(kill_process_info_as_dict['name'], "python*"):
                    print("We kill the process with PID", kill_process_info_as_dict['pid'])
                    try:
                        process = psutil.Process(kill_process_info_as_dict['pid'])
                    except psutil.NoSuchProcess:
                        print(f"Process with PID {child_pid_list[number]} not found.")
                        continue
                    else:
                        if system == "Windows":
                            kill_proc.kill()
                        elif system == "Linux":
                            os.kill(kill_process_info_as_dict['pid'], signal.SIGTERM)
                            # os.kill(kill_process_info_as_dict['pid'],signal.SIGKILL)
                            # subprocess.call(["kill", str(kill_process_info_as_dict['pid'])])

                        print(f"{bcolors.FAIL}Process with PID {kill_process_info_as_dict['pid']} killed.{bcolors.ENDC}")
                        child_pid_list.remove(kill_process_info_as_dict['pid'])


def process_recovery(process_pid_list):
    PID_list = process_pid_list
    this_process_pid = os.getpid()
    this_process = psutil.Process(this_process_pid)
    this_process_info_as_dict = this_process.as_dict()
    parent_process_pid = this_process.ppid()
    parent_process = psutil.Process(parent_process_pid)
    parent_process_as_dict = parent_process.as_dict()
    parent_process_children = parent_process.children(recursive=True)
    child_pid_list = []
    for i in range(len(parent_process_children)):
        child_info_as_dict = parent_process_children[i].as_dict()
        if child_info_as_dict['pid'] != this_process_pid:
            child_pid_list.append(child_info_as_dict['pid'])

    system = platform.system()

    while True:
        if system == "Linux":
            try:
                os.wait()
            except:
                pass
        for i in range(len(child_pid_list)):
            try:
                if psutil.pid_exists(child_pid_list[i]):
                    pass
                    # print(f'Process with PID {process_pid} is alive')
                else:
                    print(f'Process with PID {child_pid_list[i]} is dead')
                    print(f"{bcolors.WARNING}Restoring the process{bcolors.ENDC}")
                    if child_pid_list[i] == PID_list[0]:
                        process = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
                    elif child_pid_list[i] == PID_list[1]:
                        process = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})
                    else:
                        process = mp.Process(target=process_killer,
                                                  kwargs={'PID_list': PID_list, 'timepass': 10,
                                                          'exclusion_list': [process_recov.pid, ]})
                    process.start()
                    old_pid = child_pid_list[i]
                    child_pid_list[i] = process.pid
                    if old_pid == PID_list[0]:
                        PID_list[0] = process.pid
                    if old_pid == PID_list[1]:
                        PID_list[1] = process.pid

                    temp_str = ""
                    for i in range(len(child_pid_list)):
                        if child_pid_list[i] == process.pid:
                            temp_str += f'{bcolors.OKGREEN}Process_{i + 1} PID: {child_pid_list[i]}{bcolors.ENDC} (old: {old_pid})\n'
                        else:
                            temp_str += f'Process_{i + 1} PID: {child_pid_list[i]}\n'
                    temp_str = temp_str.rstrip()
                    # temp_str="\n".join(
                    #     [f'Process_{i + 1} PID: {PID_list[i]}' for i in range(len(PID_list))])
                    print('Recovery result:\n' + temp_str)
                    threading.Thread(target=process.join).start()
            except:
                pass
        time.sleep(0.2)


if __name__ == "__main__":
    process_name_list = ["process_1", "process_2"]

    process_1 = mp.Process(target=proc_1, kwargs={'timepass': 2, 'repeat': 30})
    process_2 = mp.Process(target=proc_2, kwargs={'timepass': 3, 'repeat': 30})

    process_1.start()
    process_2.start()

    PID_list = [process_1.pid, process_2.pid]

    process_recov = mp.Process(target=process_recovery, kwargs={'process_pid_list': PID_list})
    process_recov.start()

    process_kill = mp.Process(target=process_killer,
                              kwargs={'PID_list': PID_list, 'timepass': 10, 'exclusion_list': [process_recov.pid, ]})
    #process_kill.start()



    print("Main PID:", os.getpid())
    print("Process_1 PID:", process_1.pid)
    print("Process_2 PID:", process_2.pid)
    print("Process_killer PID:", process_kill.pid)

    process_1.join()
    process_2.join()
    #process_kill.join()
    process_recov.join()

    time.sleep(5)

    print("Program completed")

I hope that if suddenly you encounter a similar problem and do not want to waste time searching for an error – a phantom, then this solution to the problem will help you.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *