Search and processing of information on file resources


Let’s start with the entry point to the application. To make the tool convenient to use, we will write an application with a command interface. Before starting work, it is also worth creating a variable environment and activating it.

There is a convenient click module for handling command line parameters in Python (pip install click). Command line arguments are handled by adding decorators to the function. Define the required parameters: search_path – the path along which we will search, or a file with paths and additional ones: the program execution mode (multithreaded or without), the name of the file with the results, the format for recording the result (excel, csv, sqlite) and other parameters as you wish.

@click.command()
@click.argument('search_path',
                type=click.Path(resolve_path=True,
                                dir_okay=True,
                                file_okay=True),
                required=True)
@click.option('--concurrent_mode', '-cm',
              type=click.STRING,
              required=False,
              default="None",
              help="Concurrent search execution(multi or single)."
                   " Multi allows multiprocessing and threading")
@click.option('--output_path', '-o',
              type=click.STRING,
              required=False,
              default="results",
              help="Output filename(without extension)")
@click.option('--output_type', '-ot', '-out',
              type=click.STRING,
              required=False,
              default="sqlite",
              help="Format for output data((excel, csv, sqlite, mssql)")
...
def main(search_path, output_path, concurrent_mode, output_type):
    """
    Starts search info(cards numbers) in files in path

    search_path: path to search or json file with paths
    """
    ...

if __name__ == "__main__":
    main()

We will call the content search function from the function main (). In this case, the call looks like this:

multi_mode = True if not concurrent_mode.lower() == "single" else False
search_in_files(search_path, file_types, output_path, maxsize,
                buffer_size, clear_results, clear_log, continue_option,
                multi_mode, output_type, debug, include)

Let’s configure the logging to save information about the errors that occurred during the execution of the script. The built-in module is perfect for this. logging. Information will be written to the log file.

LOGGING_FORMAT = ('%(asctime)s %(levelname)s %(filename)s - '
                  '%(funcName)s %(message)s')
logging.basicConfig(filename="program_log.log", filemode="a",
                    level=logging.INFO, format=LOGGING_FORMAT)

The command line arguments can be a single path or a file with paths. The class is perfect for checking a file or directory. Path module pathlib. The choice of one of the two options is implemented as follows.

searching_path_obj = Path(search_path)
if searching_path_obj.is_dir():
    search_type="one"
else:
    search_type = searching_path_obj.suffix
    search_type = search_type[1:]

We will describe the options for filling the list with paths for verification using a dictionary with keys representing the type of input path (separate path, csv or json file) and values ​​- functions to be processed.

PATH_FILLER = {
    'one': add_search_dict,
    'json': add_search_from_json,
    'csv': add_searches_from_csv
}

Functions for processing read data from files and write identifier, resource name, path to the list of dictionaries with keys. Dictionaries are great for writing pretty code and getting rid of multiple conditional checks.
Each function takes two parameters, a list for storing the results and a path to the data file.
Also, do not forget to process command line parameters – the user may enter them incorrectly. Let’s see the processing using the search_type argument as an example:

SEARCH_TYPES = ["one", "json", "csv"]

if search_type not in SEARCH_TYPES:
    types = ",".join(SEARCH_TYPES)
    message = (f"Неправильный тип поиска!" 
               f" Должен быть один из вариантов: {types}")
    print(message)
    logging.error(message)
    return

Now that we have a dictionary with functions for filling a list of input data from files with different formats, filling in the list will look quite simple:

# Для хранения списка файлов
search_paths = []

# Заполнение списка
PATH_FILLER[search_type](search_paths, search_path)

Multithreading and chunk processing

So that our application can use as much computational resources as possible, let’s add multithreading using ThreadPoolExecutor from the standard library. To display the progress of the program, use tqdm (pip install tqdm).

futures = []
with ThreadPoolExecutor(max_workers=max_threads) as executor:
    for proc_id, search_dict in enumerate(paths_list):
        source = SearchSource(search_dict['name'],
                              search_dict['id'],
                              search_dict['path'])
        futures.append(executor.submit(find_in_path,
                                       source,
                                       allowed_types,
                                       max_filesize=max_filesize,
                                       max_buffer_size=max_buffer_size,
                                       continue_state=continue_state,
                                       debug_mode=debug_mode))

    progress_bar = tqdm(as_completed(futures),
                        total=len(paths_list),
                        desc="Проверка ФИРов",
                        bar_format="{desc:<15} "
                                   '{percentage:3.0f}%|{bar:20}{r_bar}')
    for _ in progress_bar:
        pass

We will write the search results into the module’s dataframe. pandas (pip install pandas). During the search process, we will save intermediate results in case something goes wrong and the script exits.

interim_result = []
common_uid = generate_uid()
for result in files_gen:

    current_num += 1
    result.save_to_dataframe(interim_result, source_search.path)
    total_fs_size += result.size
    if current_num % batch_size == 0:
        batch_num += 1
        progress_state = batch_num * batch_size
        info_message = (f"Поток {process_thread_id}.{subprocess_postfix} "
                        f"Обработано: {progress_state} файлов.")
        print(info_message)

    if current_num > 0 and current_num % FILES_IN_POOL == 0:
        stat = SearchStat(current_num, total_fs_size)
        search_result = pd.DataFrame(interim_result,
                                     columns=REPORT_COLUMNS)

        save_interim(search_result, source_search, stat,
                     part_num, common_uid)
        interim_result = []
        part_num += 1

if interim_result:
    stat = SearchStat(current_num, total_fs_size)
    search_result = pd.DataFrame(interim_result,
                                 columns=REPORT_COLUMNS)
    save_interim(search_result, source_search, stat,
                 part_num, common_uid)
else:
    uid = generate_uid()
    save_dir_progress(source_search, SearchStat(0, 0),
                      uid, "Нет доступа")

The number of files processed for recording intermediate results is defined in FILES_IN_POOL, and the columns for the report are described in the REPORT_COLUMNS variable (list of strings).

Processing files and archives

To process the results, we will write a generator function that returns the processing results and uses the function os.walk for recursive file traversal:

for root, dirs, files in os.walk(path):
    for file in files:
        file_path = os.path.join(root, file)
        try:
            file_obj = File(file_path,
                            thread_id,
                            None,
                            max_buffer_size)
            file_suffix = file_obj.extension
        except Exception as ex:
            error_message = (f"{str(ex)}. Не удалось получить "
                             f"доступ к файлу {os.path.join(root, file)}")
            logging.error(error_message)
            file_suffix = '!NONE!'

        proc_res, res_info = file_obj.process(max_filesize)
        result = SearchResult(file_obj.path, proc_res, res_info,
                                  file_obj.size, file_suffix)
        yield result

A separate class is used to work with files and search through them:

class File:
    """
    File class with file info.

    Method file processing calling processing functions
     for each filetype (document, pdf, xlxs, image)

    """
    # Максимальный объём файла для обработки по умолчанию
    PROCESSING_MAX_SIZE = 100


    def __init__(self, file_path, process_thread_id=1,
                 subprocess_id=None, buffer_size=314_572_800):
        path_obj = Path(file_path)
        self.name = path_obj.name
        self.extension = path_obj.suffix
        self.flat_name = path_obj.stem
        self.parent_directory = str(path_obj.parent)
        self.path = file_path
        self.process_thread_id = process_thread_id
        self.buffer_size = buffer_size
        self.subprocess_id = subprocess_id
        self.size = os.path.getsize(self.path)

We implement file processing using the method process () class File.

def process(self, limit_size: int = 100):
    """
    Process file and starts search inside

    :param limit_size: max file size in MB
    :return:
    """
    if limit_size is None:
        limit_size = File.PROCESSING_MAX_SIZE
    file_size_in_mb = self.size / (1024 * 1024)  # размер в мегабайтах

    if self.extension in CRITICAL_FILE_EXT:
        return self.find_cards(critical=True)

    if file_size_in_mb > limit_size:
        return self.get_max_size_exceed()

    if self.extension in USUAL_SUFFIXES:
        return self.find_in_file()
    elif self.extension in ARCHIVES_SUFFIXES:
        result = self.find_in_zip()
        remove_dir(self.form_tempdir_name(TEMP_ARCHIVES_PATH))
        return result
    elif self.extension in OLD_SUFFIXES:
        temp_dir = self.form_tempdir_name(TEMP_PATH)
        recreate_tempdir(temp_dir)
        result = self.find_in_olddoc()
        remove_dir(temp_dir)
        return result

    return self.get_unsupported_result()

It is worth mentioning separately that when processing archives of different formats, they will have to be extracted into temporary folders and processed. Of course, as an option, you can use separate modules for processing archives of different formats (there are standard modules zip, tarfile the rest must be installed), but the tasks for which the developed tool was used required processing rar format. In this case, the module will be used patoolib (pip install patool).

Also, when processing archives, it is worth considering that there may be other archives inside. Therefore, it is worth retrieving archives recursively. The class for processing archives is presented below:

class Archive(File):
    def __init__(self, path, temp_dir=None):
        super().__init__(path, 1, subprocess_id=None,
                         buffer_size=314_572_800)
        self.temp_dir = TEMP_ARCHIVES_PATH if temp_dir is None else temp_dir

    def extract(self, out_path: str = TEMP_PATH, recursive=True,
                changed_path=None, extract_level=1):
        """
        Extracts all data from archive
        :param extract_level: current extraction depth
        :param recursive: True for recursive extract(by default)
        :param out_path: Path to extract archive
        :param changed_path: Path for second and more extract
        :return:
        """
        source_path = self.path if changed_path is None else changed_path
        try:
            patoolib.extract_archive(source_path, verbosity=-1,
                                     outdir=out_path,
                                     interactive=False)
            if changed_path is not None:
                os.remove(source_path)
        except Exception as ex:
            logging.error(f"Ошибка при извлечении архива {self.path}: {ex}")
            return
        if recursive:
            for root, dirs, files in os.walk(out_path):
                for file in files:
                    pth = Path(root) / file
                    suffix = pth.suffix
                    if suffix in ARCHIVES_SUFFIXES:
                        try:
                            extract_dir = str(pth.parent)
                            if extract_level > 999:
                                # В случае большого вложенного
                                # архива останавливаем процесс
                                return
                            self.extract(extract_dir, recursive=True,
                                         changed_path=f"{pth}",
                                         extract_level=extract_level + 1)
                        except Exception as ex:
                            logging.error(f" Проход по архиву."
                                          f"Файл {pth} Ошибка - {ex}")

It is convenient to organize a separate class for storing search results.

class SearchResult:
    """
    Represents search results

    Attributes:
        result(bool) : True if success, else False
        info(str):  additional string description
        size(int): file size
        ext(str): file extension
    """
    def __init__(self, file_path: str, result: bool = None,
                 info: str = None, size=0, ext=None):
        """

        :param result: result : True if success, else False
        :param info:  additional string description
        :param size: file size
        :param ext: file extension
        """
        self.__result = result
        self.__info = info
        self.__size = size
        self.__ext = ext
        self.__path = file_path

To process files with different extensions, we will write separate functions. We also use a dictionary to call them.

process_functions = {
    '.docx': docx2txt.process,
    '.txt': process_txt,
    '.xls': extract_text_xls,
    '.xlsx': extract_text_xlsx,
    '.xlsm': extract_text_xlsx,
    '.xlsb': extract_text_xlsb,
    '.rtf': extract_text_rtf,
    '.csv': extract_csv,
    '.pdf': extract_text_from_pdf,
}

try:
    text = process_functions[self.extension](self.path)
except Exception as ex:
    logging.error(f"Ошибка при обработке файла {self.path}: {ex}")

When processing text files, we will be helped by the tools of the Python standard library, tabular – modules pandas, pyexcel, xlrd ; Word files – modules docx2txt, win32com/ package LibreOffice; rtf – module striprtf; pdf files – module pdfminer

Content search

To search for information, we will use the module re. If you need to search for data by some specific templates, as in the case of the task for which the tool was used, we define them separately in the SEARCH_REGEXPS variable.

def search_info(text):
    """
    Search info
    :param text: text in str format
    :return: results count and info
    """
    results = []
    for pattern in SEARCH_REGEXPS:
        finds = re.findall(pattern, text)
        for res in finds:            
           results.append(res)
    return len(results), ",".join(results)

Processing of results

Intermediate results are saved to files on the local computer. To record the results The processing of the obtained results is as follows.

RESULT_SAVERS = {
    'sqlite': sqlite_save,
    'mssql': mssql_save,
    'csv': save_csv,
    'excel': save_excel,
    'xlsx': save_excel,

}


def process_results(result_type, output_tablename="results"):
    """
    Unite all results from RESULTS_PATH directory

    :param result_type: result format for save (sqlite, mssql, csv)
    :param output_tablename: out table postfix for name
    """
    result_dirs = load_setting("RESULT_DIRS")
    for result_dir in result_dirs:
        print(f'Чтение результатов {result_dir}..')
        # Имя таблицы/файла для сохранения
        outname = f"{output_tablename}-{result_dir}"
        path_to_process = Path(result_dir)
        dataframes = []
        result_paths = []
        if path_to_process.exists():
            result_paths = [p for p in path_to_process.iterdir()]
        for path in tqdm(result_paths, desc="Чтение файлов",
                         bar_format="{desc:<15} {percentage:3.0f}%|"
                                    '{bar:20}{r_bar}'):
            try:
                if path.suffix == '.xlsx':
                    dataframes.append(pd.read_excel(str(path),
                                                    sheet_name=0))
                elif path.suffix == '.csv':
                    dataframes.append(pd.read_csv(str(path),
                                                  encoding='cp1251',
                                                  sep=';'))
            except Exception as ex:
                print(F"Ошибка при обработке файла {path}. Ошибка: {ex}")

        print('Объединение данных...')
        if dataframes:
            result_frame = pd.concat(dataframes)
            # Убираем лишний столбец со внутренними id
            remove_first = load_setting("REMOVE_FIRST")
            if remove_first:
                result_frame.drop(result_frame.columns[[0]], axis=1,
                                  inplace=True)
            print('Запись результатов..')
            if result_type not in RESULT_SAVERS:
                print(f"Неверный формат данных {result_type}!!!")
            try:
                RESULT_SAVERS[result_type](outname, result_frame)
            except Exception as ex:
                print(f"Ошибка сохранения в {result_type}: {ex}")
            print('Сохранение результов завершено!')
        else:
            print('Данные отсутствуют!')

To write to the database, we will use the modules sqlalchemy, pymssql, pandas. We will store the connection settings in a separate file or environment variables. The code for writing to the database is as follows.

def get_connection_settings():
    global DATABASE_NAME, DOMAIN, DB_SERVER
    DATABASE_NAME = load_setting('DATABASE')
    DOMAIN = load_setting('DOMAIN')
    DB_SERVER = load_setting('DB_SERVER')


get_connection_settings()


def sqlite_save(table_name: str, data: pd.DataFrame):
    """
    Saves pd dataframe in sqlite db

    :param db_name: database name
    :param table_name:  table name in database
    :param data: dataFrame to save

    """
    engine = create_engine(f'sqlite:///{DATABASE_NAME}.db')
    data.to_sql(f'{table_name}', con=engine, if_exists="append",
                method='multi')


def mssql_save(table_name: str, data: pd.DataFrame):
    """
    Saves pd dataframe in sqlite db

    :param table_name:  table name in database
    :param data: dataFrame to save

    """
    with pymssql.connect(server=DB_SERVER, database=DATABASE_NAME) as con:
        engine = create_engine('mssql+pymssql://', creator=lambda: con)
        data.to_sql(f'SearchDpk_{table_name}', con=engine,    if_exists="append",
                    method='multi', chunksize=500)

This is what the result will look like:

Thus, we get a tool that allows you to process files in the specified directories.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *