Real Windows. We write realtime under the windows

Neurosurgical operating room.  The previous generation system is in the left third of the image.
Neurosurgical operating room. The previous generation system is in the left third of the image.

Preamble

The firm I work for manufactures equipment for neurosurgeons and neurophysiologists, which is based on the technology Deep Brain Stimulation. In short, an electrode is inserted into a living brain, and a neurosurgeon can read a signal from the brain or stimulate brain cells with a current discharge. The technology has a great future in the treatment of diseases (for example, Parkinson’s disease, dystonia, epilepsy) and in the creation of neurointerfaces: neuroprosthetics, in particular, restoration of vision; brain augmentation additional devices, empowering the brain. Let’s say intelligence very interested a way to read and write information from the visual and auditory nerves, as well as control the movement of animals, which will create a new class of spies.

For clinical use in the treatment of tremor in Parkinson’s disease, only a few implanted contacts are sufficient (and some neurosurgeons get by with one at all). But for brain researchers, the number of contacts matters, and they need to get data from as many contacts as possible simultaneously and synchronously. Let’s say a thousand or two thousand contact pads implanted in the brain. It is clear that we would like the speed to be decent, say, forty thousand measurements from each contact per second. And so that the resolution is higher, so that each measurement is at least 32 bits, in float format or Q. We get that the system produces about 320 megabytes of data per second, so all this will have to be processed.

In addition to the “clean” data read directly from the brain, there is also filtered data: the result of the measurements applied to top filters And low frequencies. High-order filters are used, at least the fourth, implemented as polynomials. They are applied to every incoming measurement, increasing the amount of data the system has to take care of, four times, and raising the amount of generated data to 1.3 gigabytes per second. But this is no longer my concern, because they are generated from the data I passed after how I did my part of the job.

The result of all this happiness is needed in real time and is extremely important. You can’t miss a single measurement, because researchers do the main work on data analysis after the end of the experiment. Therefore, all this richness of data, in addition to displaying it on the screen, will have to be written to the hard disk. All 1.3 gigabytes of data per second. And then read in Matlab NeuroExplorer`e or another program. The system, which retained 99.99999% of the data, failed quality control and was rejected because it lost up to 13,000 samples every second.

And now we will try to take off with all this.

Task Formulation

There is a board with a controller developed by the company FPGAon one side into which the wires coming from the brain are plugged (well, actually, from converters, such as such), and on the other there is a PCIe output. With this output, the board will be plugged into the PCIe port on the most ordinary, just very powerful, computer.

I had to create a driver that receives potential data from this custom board of ours, processes it on a video card connected to the same PCIe (the result of calculating the filters of one contact does not depend on the results of another; it’s stupid not to use a processor for independent calculations that is specially sharpened to perform a large number parallel calculations of the same type at the same time) and passes it on to the user interface. And you need to do this very, very quickly, because new packets with eight data samples from each contact arrive every 200 microseconds. And, most importantly, this must be done under the tenth Windows, because neurosurgeons do not know and do not want to know anything other than Windows and Mac. Moreover, judging by the appearance of some clients and the adequacy of their requirements for the program, the last word of the previous sentence can be written with a small letter.

People in the topic have already realized that we are talking about hard realtime: a guaranteed response to the received data within a fixed time, and no matter what the hell is going on around, without the possibility of delaying or missing at least one packet. The same people in the topic have already shook their heads: the game around is Windows, hard realtime is impossible under Windows, Windows is not real time operating system. Moreover, Windows is not designed to work with time slices of less than a millisecond, so working at the speed of “a full data processing cycle in 200 microseconds” under Windows is doubly impossible.

Soft realtime differs from hard realtime in that in soft sometimes small delays are still allowed, provided that the system recovers from the delay and has time to catch up and clear out the data accumulated during the delay without loss of performance.

There are all sorts of extensions for Windows that allow you to partially implement realtime. For example, operating systems On Time, RTX64 from IntervalZero and others. They come down to the same idea: we take away one or more cores and a piece of memory from Winda, pretend that they are no longer in the computer, and run our own operating system on them. After this Frankenstein monster fires up and enters the operating mode, two operating systems will work simultaneously on the computer: realtime and Windows. You can set up communication between them. This solution will work with two caveats: firstly, under Windows there is practically no way to influence what happens inside a parallel realtime OS (for example, programs for it must be compiled using a proprietary SDK; you cannot transfer your own program to it during operation for processing the received data and run it), and secondly, the cost of this solution, to put it mildly, is inadequate. The RTX64 developer license costs about 10 thousand dollars, and for each copy of the finished product that went to the client (that same neurosurgeon), you will have to pay another 500 dollars. On top of the $600 Windows license that the customer will also receive. This takes the overall cost of the product out of the competitive zone and makes it financially unattractive for potential buyers.

For ten thousand dollars plus an unspecified $500 royalty, I’ll write my own RTOS for Windows, I thought. And wrote.

Applied technical tricks

  1. First, we want our board to do as much work as possible with FPGA on board. Let’s say that it is better to transfer data to it: its DMA controller will definitely not be busy with anything, there is no chance that when we need a DMA channel, Windows will tell us in response “in line, Linux children, in line!”

    How to connect FPGA to PCIe so that DMA writes data where necessary is a completely, completely separate topic that is beyond the scope of this article. I will only say that the FPGA must be configured as a PCIe Endpoint, because the computer remains Root Complex, – he still has to manage the video adapter. At the same time, since DMA is launched by the board, then address translation must be performed on the board. And then the question arises: where will the board write? From within Windows, I can only work with virtual addresses. Even if I allocate real memory with MmAllocateContiguousMemoryI will only get a virtual address that the board cannot reach.

    So it was not possible to do without Frankenstein’s solutions. I reserve a piece of physical memory on the computer for use only by our device by running the following command in the command line from the Administrator:

    bcdedit /set removememory Х(X is how many megabytes to reserve)

    Thus, the last megabytes are hidden from Windows, and Windows cannot access them. In addition to guaranteeing the absence of collisions on the memory bus, several other problems are solved in this way, in particular, there is no need to synchronize access, which eliminates the need for me to use long and slow semaphores And mutexes. (Synchronization between writing data to memory and reading can be done in time: let the board write to five buffers with a difference of 200 microseconds; knowing that it wrote to the zero buffer in an integer number of milliseconds, I will read the buffers with a lag of one: an integer millisecond – fourth, in milliseconds and 200 microseconds – zero, in milliseconds and four hundred microseconds – first, etc. How to synchronize time at the microsecond level between two devices is a problem, if there is a communication channel between them, solvable).

  2. The driver that will read the data from the reserved memory runs strictly on one core. For this I change it processor affinity:

    /*
     * The ID of the PCI driver CPU core. Starting from 0.
     */
    static constexpr USHORT DRIVER_CPU_ID = 3;
    . . . .
    // Set the thread to run on specific processor
    KAFFINITY affinity = 1ULL << (DRIVER_CPU_ID);
    KeSetSystemAffinityThread(affinity);

    – and raise its priority, but not to the highest, but to one that is slightly lower. At the highest priority, some system functions do not work, and critical system tasks that run at the same priority will not be executed:

    // Set the thread priority to the highest available -1
    // Тhe "-1" is because running for a long time in HIGH_PRIORITY
    // "starves" important system tasks which run in HIGH_PRIORTY
    KeSetPriorityThread(PsGetCurrentThread(), HIGH_PRIORITY - 1);
  3. But this is not enough. It is necessary not only that this process runs on one core, but also that no other process runs on this core. To do this, I raise the priority of interrupts that can interrupt the execution of my process (KIRQL), up to the maximum (DISPATCH_LEVEL):

    KIRQL oldIrql;
    KeRaiseIrql(DISPATCH_LEVEL, &oldIrql);

    However, the process cannot run all the time with a ban on any interruptions, Winda strictly monitors this and can beat the insolent. So periodically I lower the priority of interrupts that I allow my process to, uhm, interrupt. Purely formal, but still:

    // It's important that we don't stay at DISPATCH_LEVEL for too long
    // so we record the last tick we were at passive, and every once in
    // a while lower the KIRQL
    static constexpr ULONG64 MS_ALLOWED = 50;
    LARGE_INTEGER freq{};
    LONGLONG lastPassiveTick = 0;
    . . . . . .
    KeQueryPerformanceCounter(&freq);
    timePassed = ((KeQueryPerformanceCounter(nullptr).QuadPart - 
                                lastPassiveTick) * 1000ULL) / freq.QuadPart;
    if (timePassed >= MS_ALLOWED) {
        yieldProcessor();
        lastTickAtPassive = KeQueryPerformanceCounter(nullptr).QuadPart;
    }
    
    /* Yield Processor means lowering to PASSIVE_LEVEL and then raising back
     * to DISPATCH_LEVEL. It allows other important tasks to run in between,
     * if they are fast enough.
     */
    void yieldProcessor() {
        KIRQL oldIrql;
        KeLowerIrql(PASSIVE_LEVEL);
        KeRaiseIrql(DISPATCH_LEVEL, &oldIrql);
    }
  4. And now the most fun.

    When initializing the driver, I go through all the processes available in the operating system and change their binding to the processor:

namespace accelerator {
	class IAccelerator {
	public:
		explicit IAccelerator() = default;
		virtual void revert() = 0;
		virtual void accelerate() = 0;
		virtual ~IAccelerator() = default;
	};
}


namespace accelerator {

const std::vector<std::wstring> DEFAULT_BLACKLIST_PROCESSES = {
	L"system",
	L"system.exe",
	L"winlogon.exe"
};

class AffinitySetter : public IAccelerator {
public:
	/**
	 * Sets the processor affinity of all processes.
	 *
	 * Affinity is reset upon reseting the computer.
	 *
	 * @param activeCpuIdentifiers The cpu identifiers which should NOT be used by any process.
	 * @param blacklistProcesses A list of processes that should not be altered.
	 *
	 */
	explicit AffinitySetter(std::vector<uint8_t> activeCpuIdentifiers,
							std::vector<std::wstring> blacklistProcesses = DEFAULT_BLACKLIST_PROCESSES);
	virtual void revert();
	virtual void accelerate();
	virtual ~AffinitySetter() = default;
private:
	ULONG_PTR getAffinityMaskWithoutBlackList(ULONG_PTR maskLimit);
	std::vector<uint8_t> m_activeCpuIdentifiers;
	std::vector<std::wstring> m_blacklistProcesses;
  };
}

. . . . . . .
std::vector<std::unique_ptr<accelerator::IAccelerator>> accelerators;
auto affinitySetter = std::make_unique<accelerator::AffinitySetter>(
    std::vector<uint8_t>({ DRIVER_CPU_ID }));
accelerators.push_back(std::move(affinitySetter));
for (auto& accelerator : accelerators) {
	  accelerator->accelerate();
}
  1. But that’s not all. It is not enough to take care of the processes that already exist, we must also take care of those that the user will create in the future. To do this, I register two system callback, to create a process and to create a thread; they are called for every new process and thread, and I change their CPU affinity:

    /*
     * We want to keep this core to ourself, so register a callback for each
     * process and thread created. At this callback we change their affinity
     * (the core they can run on) to be different from our core
     */
    if (!NT_SUCCESS(PsSetCreateProcessNotifyRoutine(newProcessCreated, FALSE))) {
        DEBUG_TRACE("PsCreateProcessNotifyRoutine failed");
        COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL);
    }
    FINALLY([&guardActivator]() {
        if (guardActivator) {
            PsSetCreateProcessNotifyRoutine(newProcessCreated, TRUE);
        }
    });
    
    if (!NT_SUCCESS(PsSetCreateThreadNotifyRoutine(newThreadCreated))) {
        DEBUG_TRACE("PsCreateProcessNotifyRoutine failed");
        COMPLETE_IRP(Irp, STATUS_UNSUCCESSFUL);
    }
    FINALLY([&guardActivator]() {
        if (guardActivator) {
            PsRemoveCreateThreadNotifyRoutine(newThreadCreated);
        }
    });
    . . . . . .
    
    void newProcessCreated(
        HANDLE ParentId,
        HANDLE ProcessId,
        BOOLEAN Create
    )
    {
        UNREFERENCED_PARAMETER(ParentId);
        if (Create) {
            KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID)));
            KAFFINITY maximumAffinity = KeQueryActiveProcessors();
            affinity &= maximumAffinity;
    
            // Get process handle by id
            HANDLE processHandle;
            OBJECT_ATTRIBUTES objectAttributes{ 0 };
            InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL);
            CLIENT_ID clientid{ 0 };
            clientid.UniqueProcess = ProcessId;
            auto status = ZwOpenProcess(&processHandle, GENERIC_ALL, &objectAttributes, &clientid);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status);
                return;
            }
            FINALLY([&processHandle]() {
                ZwClose(processHandle);
            });
    
            // Set the process affinity by handle
            DEBUG_TRACE("Will set process affinity: %d for process: %d", affinity, ProcessId);
    
            if (affinity) {
                status = ZwSetInformationProcess(processHandle, ProcessAffinityMask, &affinity, sizeof(affinity));
                if (!NT_SUCCESS(status)) {
                    DEBUG_TRACE("ZwSetInformationProcess failed getting process affinity for pid %d with status %d", ProcessId, status);
                    return;
                }
            }
    
        }
    }
    
    void newThreadCreated(
        HANDLE ProcessId,
        HANDLE ThreadId,
        BOOLEAN Create
    )
    {
        if (Create) {
            // Thread affinity should eventually be all cpus except our own.
            KAFFINITY affinity = ~((1ULL << (DRIVER_CPU_ID)));
            KAFFINITY maximumAffinity = KeQueryActiveProcessors();
            affinity &= maximumAffinity;
    
            // Get process handle by id
            HANDLE processHandle;
            OBJECT_ATTRIBUTES objectAttributes{ 0 };
            InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL);
            CLIENT_ID clientid{ 0 };
            clientid.UniqueProcess = ProcessId;
            auto status = ZwOpenProcess(&processHandle, GENERIC_READ, &objectAttributes, &clientid);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwOpenProcess failed getting process for pid %d with status %d", ProcessId, status);
                return;
            }
            FINALLY([&processHandle]() {
                ZwClose(processHandle);
            });
    
            // Get the process affinity by handle
            PROCESS_BASIC_INFORMATION processInformation;
            ULONG returnLength;
            status = ZwQueryInformationProcess(processHandle, ProcessBasicInformation, &processInformation, sizeof(processInformation), &returnLength);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwQueryInformationProcess failed getting process for pid %d with status %d", ProcessId, status);
                return;
            }
    
            // Reduce affinity to by subset of process
            affinity &= processInformation.AffinityMask;
    
            // Get thread handle by id
            HANDLE threadHandle;
            objectAttributes = { 0 };
            InitializeObjectAttributes(&objectAttributes, NULL, OBJ_KERNEL_HANDLE, NULL, NULL);
            clientid = { 0 };
            clientid.UniqueThread = ThreadId;
            status = ZwOpenThread(&threadHandle, GENERIC_ALL, &objectAttributes, &clientid);
            if (!NT_SUCCESS(status)) {
                DEBUG_TRACE("ZwOpenThread failed getting thread for tid %d with status %d", ProcessId, status);
                return;
            }
            FINALLY([&threadHandle]() {
                ZwClose(threadHandle);
            });
    
            // Set the thread affinity by handle
            DEBUG_TRACE("Will set thread affinity: %d for thread: %d", affinity, ThreadId);
    
            if (affinity) {
                status = ZwSetInformationThread(threadHandle, ThreadAffinityMask, &affinity, sizeof(affinity));
                if (!NT_SUCCESS(status)) {
                    DEBUG_TRACE("ZwSetInformationThread failed getting thread affinity for tid %d with status %d", ProcessId, status);
                    return;
                }
            }
        }
    }

    You just need to remember to remove these callbacks at the end of the work.

Conclusion

In fact, I implemented a real-time system inside Windows. The technique, in general, is the same as commercial solutions such as the above-mentioned On Time: I take the core and part of the memory for my purposes and do not let Windows get to them and interfere with me. But there is a difference: my solution works inside Windows, in kernel space, and allows you to take full advantage of the operating system. I am not limited in communicating with the rest of the operating system programs and can use the entire set of tools for interprocessor communication. Moreover, I can return the kernel occupied by the driver back to Windows at any time, just remove my callbacks and go through the processes, fixing their bindings.

The processing time of one data packet under such conditions does not exceed 155 microseconds, including adding headers to each data packet. The data is then transferred from the reserved memory to the processing program, which already takes care of transferring the data to the GPU, showing all this richness on the screen and saving it to the hard disk. The time of transferring data from the board to the computer’s memory is not taken into account here, because I start working only after the data is in memory.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *