Running utPLSQL Tests in Parallel (Oracle)

Almost every developer, sooner or later, has to deal with emerging errors in his seemingly ideal code. It seemed that all the moves were calculated and there could be no mistakes, but that was not the case. There can be a lot of reasons, from a logical error in the related functionality, to an update of third-party software or hardware independent of the developer. To protect your software product, for example, from potential errors arising from new improvements, the code is covered with control tests.

Over time, in a developed/maintained system, the number of tests increases => the automatic quality check stage requires more and more precious time, thereby slowing down the process of delivering value to the end consumer.

When there are no more thoughts on optimizing the performance of tests and the code under test, the option of parallel testing remains.

Inspired by the interest of my colleague – @alisichkin with the idea of ​​speeding up the CI / CD pipeline, I sketched out a variant of running unit tests based on utPLSQL in parallel.

Let’s take a look at what and how we did it.

First of all, we need to prepare the foundation for parallel launch – a stored procedure of the form:

-- Процедура параллельного запуска скрипта p_sql_task
procedure parallel_execute
(
    p_sql_task          in varchar2,                -- Скрипт задачи
    p_task_name         in varchar2,                -- Название задания
    p_parallel_level    in number default 16        -- Количество сессий выполнения задачи
)
is
    v_try               number := 0;
    v_cnt_chunk         number;
    v_sql_chunks        varchar2(4000 char);
begin
    -- Завершаем задания прежнего запуска (если они были)
    for r in
    (
        select
            t.task_name
        from
            user_parallel_execute_tasks t
        where
            t.task_name = p_task_name
    )
    loop
        dbms_parallel_execute.drop_task(task_name => p_task_name);
    end loop;
    -- Запрос определения количества частей задачи - отдельных сессий на сервере.
    v_sql_chunks := '
        select
            level               start_id,
            count(*) over ()    end_id
        from
            dual
        connect by
            level <= ' || p_parallel_level;
    -- создаем задачу
    dbms_parallel_execute.create_task
    (
        task_name => p_task_name
    );
    -- Выделяем части
    dbms_parallel_execute.create_chunks_by_sql
    (
        task_name   => p_task_name,
        sql_stmt    => v_sql_chunks,
        by_rowid    => false
    );
    -- Проверяем наличие выделенных порций
    select
        count(*)
    into
        v_cnt_chunk
    from
        user_parallel_execute_chunks ec
    where
        ec.status = upper('unassigned');
    -- Если не удалось выделить части, логируем/вызываем ошибку/просто выходим, т.е. выполняем требуемое по ситуации действие
    if dbms_parallel_execute.task_status(p_task_name) != dbms_parallel_execute.chunking and v_cnt_chunk = 0 then
        -- логируем (процедура для примера)
        log_message
        (
            p_proc_name     => 'parallel_execute',
            p_log_message   => 'could not get chunks ' || chr(10) || 'SQL TASK:' || chr(10) || p_sql_task
        );
    else
        -- запускаем задачу
        dbms_parallel_execute.run_task
        (
            task_name       => p_task_name,
            sql_stmt        => p_sql_task,
            language_flag   => dbms_sql.native,
            parallel_level  => p_parallel_level
        );
        -- ждем завершения задачи или превышения кол-ва попыток запуска
        loop
            exit when
                dbms_parallel_execute.task_status(p_task_name) = dbms_parallel_execute.finished
                or v_try > 3;      -- Три попытки
            dbms_parallel_execute.resume_task(p_task_name);
            v_try := v_try + 1;
        end loop;
    end if;
    -- Удаляем задачу
    dbms_parallel_execute.drop_task(task_name => p_task_name);
exception
    when others then
        log_message
        (
            p_proc_name     => 'parallel_execute',
            p_log_message   => sqlerrm || chr(10) || dbms_utility.format_error_backtrace || chr(10) || 'SQL TASK:' || chr(10) || p_sql_task
        );
end parallel_execute;

It should be noted that according to the Oracle documentation, the number of task chunks should be formed by the result of the query passed in the parameter sql_stmt, the request itself must contain the fields start_id, end_id, each query result record identifies a specific part of the task being split. In this option, the number of parts will always be allocated equal to the value p_parallel_levelif there is no other restriction in the DBMS schema settings (job_queue_processes).

Next, we should think about where and how we will aggregate the result of the tests (you can implement the exchange between sessions through DBMS_PIPE, or save it in a table specially created for this). I chose the option with a table, because there will be an opportunity for additional retrospective analysis and subsequent rebalancing of tests.

create table utp_test_results
(
    task_name           varchar2(120 char),
    chunk_num           number,
    start_date          date default sysdate,
    end_date            date,
    success             number(1),
    test_result         clob,
    --
    constraint utp_test_results_pk primary key (task_name, chunk_num) using index tablespace tblspc_index
);
comment on table  utp_test_results                is 'Таблица результатов запуска UTP-тестов';
comment on column utp_test_results.task_name ....

The next step is to create a procedure for launching a specific part of the task

/***************************************************/
/* Процедура запуска задачи UTP тестирования */
procedure run_utp_task
(
    p_chunk_num         in number,        -- Номер части   <=> :start_id
    p_chunk_count       in number,        -- Кол-во частей <=> : end_id
    p_task_name         in varchar2       -- Название заздачи
)
is
    v_pack_list         utp.ut_varchar2_list;
    v_error             varchar2(4000 char);
    -- Процедура чтения output-а
    procedure save_output
    is
        l_buffer    dbms_output.chararr;
        l_num_lines pls_integer;
        l_clob      clob;
    begin
        l_num_lines := 4000;
        dbms_output.get_lines(l_buffer, l_num_lines);
        for i in 1..l_buffer.count loop
            if l_buffer(i) like '%test%' then
                l_clob := l_clob || l_buffer(i) || chr(10);
            end if;
        end loop;
        -- Фиксируем результат работы chunk-а
        update
            utp_test_results t
        set
            t.test_result = l_clob,
            t.end_date = sysdate,
            t.success = 1
        where
            t.task_name = p_task_name
            and t.chunk_num = p_chunk_num;
        commit;
    end save_output;
begin
    -- Регистрация в таблице результатов
    update
        utp_test_results
    set
        start_date = sysdate,
        success = null,
        test_result = null
    where
        task_name = p_task_name
        and chunk_num = p_chunk_num;
    --
    if sql%rowcount = 0 then
        insert
        into
            utp_test_results
            (task_name, chunk_num)
        values
        (
            p_task_name,
            p_chunk_num
        );
    end if;
    commit;
    -- читаем список всех имеющихся тестовых пакетов
    select
        pack_name
    bulk collect into
        v_pack_list
    from
        (
            select
                name as pack_name,
                row_number() over (order by name) pack_num,   -- Сортировка обязательна, порядок должен быть согласован для всех частей задачи
                count(*) over () count_total
            from
                user_source
            where
                text like '%\%suite%' escape '\'              -- В тестируемом объекте обязательно должен быть указан таг %suite - без него фреймворк UTP тестировать объект не будет
                and name like 'TST%'                          -- Тестовый объект начинается с TST_
                and name <> 'TST_UTILS'                       -- Исключаем пакет запуска тестов
            group by
                name
        ) s
    where
        p_chunk_num = mod(s.pack_num, p_chunk_count) + 1;     -- Фильтр для выбора части тестовых пакетов
    -- Старт тестов
    utp.ut.run
    (
        a_paths => v_pack_list,
        a_coverage_schemes => utp.ut_varchar2_list('<schema_name>'), -- <schema_name> - название тестируемой схемы в СУБД
        a_reporter => utp.ut_junit_reporter
    );
    -- UTP пишет в output, считываем его и сохраняем в свою таблицу
    save_output;
exception
    when others then
        log_message
        (
            p_log_message   => sqlerrm,
            p_proc_name     => 'run_utp_task'
        );
        v_error := sqlerrm;
        update
            utp_test_results
        set
            success = 0,
            test_result = v_error
        where
            task_name = p_task_name
            and chunk_num = p_chunk_num;
        commit;
    raise;
end run_utp_task;

It remains to add the final storage for running tests in parallel:

/********************************/
/* Процедура запуска UTP тестов */
procedure run_utp
(
    p_parallel_level    in number default 16
)
is
    v_task_name         varchar2(120 char) := 'task_utp_parallel_' || userenv('sessionid');
    v_clob              clob;
    -- Выдод clob в output
    procedure output_clob
    (
        p_length        in number,
        p_start_index   in number default 1
    )
    is
        v_buffer        varchar2(4000 char);   -- размер буфера максимально для нашей версии Oracle = 32000, берем излюбленное значение в 4000
        v_end_index     number;
    begin
        if p_start_index < p_length then
            v_buffer := dbms_lob.substr(v_clob, 4000, p_start_index);
            v_end_index := length(v_buffer);
            if v_end_index < 4000 then
                dbms_output.put_line(v_buffer);
            else
                v_end_index := instr(v_buffer, '>', -1);                -- Бьем по закрывающему символу
                dbms_output.put_line(substr(v_buffer, 1, v_end_index));
                output_clob(p_length, p_start_index + v_end_index);
            end if;
        end if;
    end;
begin
    parallel_execute
    (
        p_sql_task   => '
        begin
            run_utp_task
            (
                p_chunk_num     => :start_id,
                p_chunk_count   => :end_id,
                p_task_name     => ''' || v_task_name || '''
            );
        end;',
        p_task_name  => v_task_name,
        p_parallel_level => p_parallel_level
    );
    -- Формируем итоговый XML
    select
        xmlelement
        (
            "testsuites",
            xmlattributes
            (
                sum(tests) "tests",
                sum(disabled) "disabled",
                sum(errors) "errors",
                sum(failures) "failures",
                sum(time) "time"
            ),
            xmlagg(suite)
        ).getclobval()
    into
        v_clob
    from
        (
            select
                case
                    when test_result is not null then
                        xmltype(test_result) 
                end xml_result
            from 
                amt_utp_test_results t 
            where 
                t.task_name = v_task_name
        ) s,
        xmltable
        (
            'testsuites'
            passing s.xml_result
            columns
                suite       xmltype             path './testsuite',
                tests       number              path '@tests',
                disabled    number              path '@disabled',
                errors      number              path '@errors',
                failures    number              path '@failures',
                name        varchar2(2000 char) path '@name',
                time        number              path '@time'
        ) t;
    -- Выводим в output
    output_clob(dbms_lob.getlength(v_clob));
end run_utp;

That’s all, the goal is achieved.

Procedure run_utp the task of running tests in parallel starts, waits for its completion, and then collects the results into a single XML, which is further parsed by the JUnit parser.

An example of running in 16 sessions:

call tst_utils.run_utp(16);

PS results:

Run time in 16 sessions on cold data was 362 seconds ~6 minutes

Using the table of results, we can analyze what slows down our process to a greater extent and group “heavy” tests with “light” ones, or increase the level of parallelism:

Results table
Results table

From the results, we see that the final testing time was determined by the longest running test (set of tests).

The total time for checking 84 tests within one session would be 1569.5 seconds, i.e. approximately 26 minutes:

Results from output
Results from output

In fact, we get a time gain of 20 minutes from one test. If we take into account the need to run tests for each task several times (we have 3 of them), then this is already 60 minutes, in addition we multiply the time received by the number of developers and rejoice at another victory) …

That’s all. Healthy criticism and feedback is welcome :).

Thank you for your attention.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *