Monitoring multithreaded Node.JS applications

In this article, we will analyze the features of monitoring a multithreaded Node.JS application using the example of our collector for the PostgreSQL server log monitoring and analysis service.

To scale the collector, we use a multi-process approach, with one manager and several worker processes, inter-process communication occurs only between the worker and the manager process.
Worker processes perform the same tasks – collecting, processing and writing logs from PostgreSQL servers. That being said, collecting and recording are essentially IO tasks that nodejs is very good at. But the processing and parsing of query plans is a rather CPU-intensive task that blocks the event-loop. Therefore, it is better to move such tasks to a separate worker or pool of workers, transferring data to them for processing through the exchange of IPC messages.

Previously, we used this approach for the task of processing and parsing query plans. But it has a drawback – large volumes of data transferred over IPC can lead to a significant increase in the cost of serialization to JSON and vice versa.

For example, when transmitting a buffer over IPC, which contains the string ‘test’, the following string is transmitted:

'{"type":"Buffer","data":[116,101,115,116]}'

With a large amount of data transferred, the overhead can be as follows:

The solution for us was to use worker_threads, which appeared in Node.JS 10.5.0, working in one process and allowing the use of new methods of communication between threads
The architecture has changed:

And along with it, and approaches to monitoring. For example, the CPU usage inside worker_thread cannot be measured in the traditional way.
Those. earlier, for each worker process, we measured CPU-usage using process.cpuUsage () and process.hrtime () more or less like this:


let startCpuUsage = process.cpuUsage();
let startTime = process.hrtime();
let n = 1000;
while (n--) Math.sin(n);
let {user, system} = process.cpuUsage(startCpuUsage); // время в микросекундах
let time = process.hrtime(startTime); // наносекунды
let cpuUsage = 100 * 1000 * (user + system) / (time[0] * 1e9 + time[1]);

But for a process with worker_threads call process.cpuUsage () gives the processor time for the entire process as a whole, summing up all its threads. And we get the same result if we call process.cpuUsage () from within worker_thread.
Why it happens?
The point is that process.cpuUsage () uses call uv_getrusage, and the one in Linux makes a system call getrusage with parameter RUSAGE_SELF, i.e. returns statistics for the calling process as the sum of all its threads, and it doesn’t matter from which thread we make this call – it will return the same values ​​in all threads.

How do you get CPU-usage for worker_threads, and why does Node.JS have built-in methods for profiling CPU worker_threrads?
There is a worker_threads developer answer here
Two options are offered – either using a system call gettid () get the tid for worker_thread and view the data in / proc / $ {tid}, or use getrusage () with the RUSAGE_THREAD parameter to get statistics only for the calling thread.
By the way, in the same way, you can get metrics of CPU usage for the main thread of a process, without taking into account all additional threads and worker_threads.

So, having dealt with this issue, we began to look for ready-made modules for monitoring worker_threads, and did not find … So we did your , at the same time adding to it the issuance of all other metrics for monitoring a Node.JS application. We already get server metrics using our metrics collection system.

CPU monitoring

To analyze CPU utilization, we take metrics from workers and worker_threads, as well as metrics of the total CPU utilization and in the context of cores:

  • For workers in general:

  • For the main worker threads:

  • For worker_threads (according to the first from the pool, but the total in the context of the worker will also be useful):

  • Total CPU load on the server:

With CPU metrics sorted out, what about worker_threads profiling?
Let’s check by running this small test with the parameter node –prof

Test code


const { Worker, isMainThread} = require('worker_threads');
const crypto = require('crypto');

function mainTest() {
  let cnt = 10000;
  while (cnt--) crypto.randomBytes(cnt);
}

function threadTest() {
  let cnt = 10000;
  while (cnt--) crypto.randomBytes(cnt);
}

if (isMainThread) {
  let worker = new Worker(__filename);
  setInterval(mainTest, 1000);
} else {
  setInterval(threadTest, 1000);
}

As a result, we got two isolate- * files, for the main thread and for worker_thread.
Further, using node –prof-process we can see the required profile.
By the way, with the –no-logfile-per-isolate option, instead of several isolate * files, there will be one – v8.log with the total result for all streams, including the main one.

And more – using the option node –inspect or by sending a SIGUSR1 signal to a running process in order to remove the CPU profile in Chrome DevTools, we will see data only on the main thread.

Memory usage

As well as for the CPU, snapshot the profile in Chrome DevTools, we will get a Heap snapshot of only the main thread.
Fortunately, since node 12.17.0, it became possible to get a heap snapshot directly from the worker_threads code using the call worker.getHeapSnapshot (), and since version 11.13.0 also for the main thread by calling v8.getHeapSnapshot ()

Let’s try


const { Worker, isMainThread } = require('worker_threads');
const v8 = require('v8');
const fs = require('fs');

if (isMainThread) {
  let worker = new Worker(__filename);
  let mainArray = [];
  function mainTest() {
    let cnt = 100;
    while (cnt--) mainArray.push(`main-msg-${cnt}`);
  }
  process.on('SIGUSR2', () => {
    v8.getHeapSnapshot().pipe(fs.createWriteStream(`process_${process.pid}.heapsnapshot`));
    worker.getHeapSnapshot().then((heapsnapshot) => {
      heapsnapshot.pipe(fs.createWriteStream(`process_${process.pid}_wt_${worker.threadId}.heapsnapshot`));
    })
  });
  setInterval(mainTest, 1000);
} else {
  let threadArray = [];
  function threadTest() {
    let cnt = 100;
    while (cnt--) threadArray.push(`thread-msg-${cnt}`);
  }
  setInterval(threadTest, 1000);
}

By sending a signal SIGUSR2 process, we get two heapsnapshots, which can then be analyzed in Chrome DevTools:

  • Main process:

  • worker_thread:

What memory metrics are interesting for analysis?
We use the ones that give process.memoryUsage () – rss, heapTotal, heapUsed, external.
And also v8.getHeapSpaceStatistics (), with its help you can get data on Heap segments – new_space, old_space, code_space, large_object_space.
rss is always displayed for the entire process, and the rest of the metrics are provided within the calling context.

  • Total for workers:

  • By worker:

  • By worker_threads:

Garbage collection

Because each worker_thread runs its own Node.JS instance with v8 / libuv, then each also has its own GC and must be monitored separately.
To monitor the GC, we need to get data about the total duration of the garbage collection, as well as the number of starts and the time to complete each cycle.
For a long time, since version 8.5.0, Node.JS has a mechanism PerformanceObserver, allowing, among other things, to get all the necessary information on GC cycles.

For example so


const { PerformanceObserver, constants } = require('perf_hooks');
let stats = {};
let gcObserver = new PerformanceObserver((list) => {
  list
    .getEntries()
    .map(({kind, duration}) => {
      stats['gc.time'] += duration;
      switch (kind) {
        case constants.NODE_PERFORMANCE_GC_MINOR:
          stats['gc.Scavenge.count']++;
          stats['gc.Scavenge.time'] += duration;
          break;
        case constants.NODE_PERFORMANCE_GC_MAJOR:
          stats['gc.MarkSweepCompact.count']++;
          stats['gc.MarkSweepCompact.time'] += duration;
          break;
        case constants.NODE_PERFORMANCE_GC_INCREMENTAL:
          stats['gc.IncrementalMarking.count']++;
          stats['gc.IncrementalMarking.time'] += duration;
          break;
        case constants.NODE_PERFORMANCE_GC_WEAKCB:
          stats['gc.ProcessWeakCallbacks.count']++;
          stats['gc.ProcessWeakCallbacks.time'] += duration;
          break;
      }
    })
});

function resetStats() {
  Object.assign(stats, {
    'gc.time': 0,
    'gc.Scavenge.count': 0,
    'gc.Scavenge.time': 0,
    'gc.MarkSweepCompact.count': 0,
    'gc.MarkSweepCompact.time': 0,
    'gc.IncrementalMarking.count': 0,
    'gc.IncrementalMarking.time': 0,
    'gc.ProcessWeakCallbacks.count': 0,
    'gc.ProcessWeakCallbacks.time': 0,
  });
}

resetStats();
gcObserver.observe({entryTypes: ['gc'], buffered: true});

function triggerScavenge() {
  let arr = [];
  for (let i = 0; i < 5000; i++) {
    arr.push({});
  }

  setTimeout(triggerScavenge, 50);
}

let ds = [];

function triggerMarkCompact() {
  for (let i = 0; i < 10000; i++) {
    ds.push(new ArrayBuffer(1024));
  }

  if (ds.length > 40000) {
    ds = [];
  }

  setTimeout(triggerMarkCompact, 200);
}

triggerScavenge();
triggerMarkCompact();

setInterval(() => {
  console.log(stats);
  resetStats();
}, 5000);

Result:


{
  'gc.time': 158.716144,
  'gc.Scavenge.count': 11,
  'gc.Scavenge.time': 135.690545,
  'gc.MarkSweepCompact.count': 2,
  'gc.MarkSweepCompact.time': 22.96941,
  'gc.IncrementalMarking.count': 2,
  'gc.IncrementalMarking.time': 0.056189,
  'gc.ProcessWeakCallbacks.count': 0,
  'gc.ProcessWeakCallbacks.time': 0
}

This method works both in the main thread and in worker_threads, for our collector we get graphs with metrics per second:

  • By workers

  • By worker_threads

  • Total GC time by worker

  • Total GC time by worker_threads

Event-loop latency

To monitor the event-loop delays, it is convenient to use the one that appeared in version 11.10.0 monitorEventLoopDelay – here you can get not only the average and limit values, but also various percentiles.
We use max, min, mean, and percentile (99):

  • Cumulative across all workers

  • Total for worker_threads

  • By worker

  • By worker_thread

Monitoring the worker_threads pool

The system performance indicators of the pool have already been given above, but here we will talk about the performance metrics of a multithreaded application.
At startup, each collector worker starts a pool with one worker_thread, which processes the queue of incoming request plans.
Additional worker_threads are triggered when the queue size increases and when tasks have been in the queue for longer than a certain time. They also automatically end after a period of inactivity.

Task queue processing code


  const SPAWN_LAG = 2000;
  this._queue = [];

  assignTask(msg) {
    if (this.mainExplainer.ready === true) {
      this.mainExplainer.ready = false;
      this.mainExplainer.sendMessage(msg);
    } else if (this._idleExplainers.length > 0) {
      let explainer = this._idleExplainers.pop();
      clearTimeout(explainer.timeoutIdle);
      explainer.sendMessage(msg);
    } else {
      this._checkAndStartNew(msg);
    }
  }

  _checkAndStartNew(msg) {
    let ts = Date.now();
    let q = this._queue;
    if (msg && process[hasFreeRAM]) q.push({msg, ts});
    if (this._canCreateExplainer && q.length > this._workersCnt() && q[0].ts + SPAWN_LAG < ts) {
      this._createExplainer();
    }
  }

  explainer.on('explainDone', (msg) => {
    explainer.pulse();
  });

  explainer.pulse = () => {
    if (this._queue.length > explainer.id) {
      explainer.sendMessage(this._queue.shift().msg);
    } else if (this._isMain(explainer)) {
      explainer.ready = true;
    } else {
      this._idleExplainers.push(explainer);
      explainer.unpool();
    }
  };

The important metrics for the worker_thread pool are the queue size and the number of working threads:

In addition, we monitor the speed and performance of worker_thread and workers in general:

  • Query plan processing speed:

  • Worker performance by number of tasks:

  • Worker performance in terms of data volume:

  • Worker_thread performance by number of tasks:

  • Worker_thread performance in terms of data volume:

Conclusion

We have covered the specifics of monitoring a multithreaded Node.JS application.
For a comprehensive analysis of the application’s operation, it is necessary to track a lot of indicators – metrics for the server as a whole, the application’s use of system resources, runtime metrics, as well as various indicators of the application itself. In general, everything that includes APM

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *