logo
Docs « Hub

Multi-threading in the Hub

Introduction

Any software that does significant amount of processing is required to adopt to modern CPU design, which has for some decades included the usage of multiple cores or multiple CPUs in a system.

There are several ways to approach, and conquer, the multi-core battle in a software family like Flowee and I’ll address the various ways and their benefits in this page.

Terminology

Event-queue
When a single thread is provided with an ‘event-queue’ this thread can be used for any job-type which are processed one by one.
Chunking
The practice of taking a single long process and splitting it into individual ‘chunks’ in order to make it possible to run that in an event-queue. The goal is to spent only a limited amount of time before returning to the event-queue.
Locking
When data can’t be accessed from multiple threads, we can use locking (mutex) to make a second thread wait until it’s safe.
Lock-Free
Lock-free programming moves the protection of data-members from the programmer to the CPU and in many cases is vastly preferred for its speed and avoidance of waiting.
Worker-pool
An array of threads, typically the same number as hardware cores, each of them running an event-queue ready to process any tasks posted to them.

Multi-threading in Flowee the Hub

The starting point for multi-threading is knowing that we use a worker-pool owned by the WorkerThreads class (implemented in boost asio). The worker pool allows any component anywhere in the application to schedule a job to be executed. Posting a job can be done from any thread and the job can be executed in any thread (owned by the worker-pool).

It is thus important to realize that practically all parts of the application should be made thread-safe.

In new classes we do this by encapsulating the data and using either locking or making the data members lock-free (or a combination of the two). Specifically, those data members should be private to the close.
This is in start contrast to old code that still uses global or public locks (implemented as mutexes) like cs_main. New code introducing such constructs will not be accepted.

Example 1

Lets take an example from the Validation::Engine

namespace Validation {
class Engine {
public:
  Settings addBlock(const FastBlock &block, std::uint32_t onResultFlags, CNode *pFrom = nullptr);
};
}

class Settings {
public:
    Settings();
    Settings(const Validation::Settings &other);
    Validation::Settings start();
    const std::string &error() const;
    ...
    void setCheckPoW(bool on);
    void waitUntilFinished() const;
    void waitHeaderFinished() const;
private:
    std::shared_ptr<ValidationSettingsPrivate> d;
};

void test() {
    // just add and let it complete in the background.
    validator.addBlock(block, Validation::PunishBadNode, pFrom);

    // wait until done, we can ask for errors afterwards.
    auto future = validator.addBlock(block2, 0).start();
    future.waitUntilFinished();

    // set settings on the job before we start it.
    auto settings = validator.addBlock(block3, 0)
    settings.setCheckPoW(false);
    settings.start();
}

In this example we show how the new validation engine allows you to add a block to be validated. The ‘addBlock()’ method returns immediately because the base assumption in Flowee is that APIs are non-blocking. We need to assume this because we share the same thread in the validation engine as well as the API network handling. Blocking means we waste CPU time where we could have handled another user request.

In our first example we pass in the PunishBadNode. We delegate the punishing of a node to the validation engine in order to allow the thread that handled the incoming data to forget about this block the moment it passed the block to the validation engine.

In specific cases the programmer can decide to be blocking, for instance when mining, and then you can call waitUntilFinished(). For this reason the ‘addBlock()’ method returns a Settings instance which can be used as a ‘future’ to wait for the block validation to finish in another thread.

The 3rd example is when we use the settings object to pass in more options. In Flowee we love maintainable code, as such the option of having a method with a dozen arguments didn’t sound very wise. The solution is shown at the bottom of the code shown above. We return a ‘Settings’ object which can be used to set more validation options on, and when we are happy we call start() to actually commence the validation. We could do a waitUntilFinished on that settings if we would wish to do so.

Please note that in the above example we use std::shared_ptr and various similar classes. Those are effective examples of lock-free programming. They are thread-safe and require no mutexes.

Example 2

An example from the transaction validation to show chunking.


struct BlockValidationState {
    FastBlock m_block;
    mutable std::atomic<int> m_txChunkLeftToStart;
    void calculateTxCheckChunks(int &chunks, int &itemsPerChunk) const;

    void checkSignaturesChunk() {
        const int totalTxCount = (int) m_block.transactions().size();
        int chunkToStart = m_txChunkLeftToStart.fetch_sub(1) - 1;
        int chunks, itemsPerChunk;
        calculateTxCheckChunks(chunks, itemsPerChunk);
        const int txMax = std::min(txIndex + itemsPerChunk, totalTxCount);
        for (int txIndex = itemsPerChunk * chunkToStart; txIndex < txMax; ++txIndex) {
            // validate a tx
        }
    }

The scenario is this: we have a block with a large number of transactions. We have a specific number of cores the current hardware has and we want to divide the total number of transactions to check evenly over each of those cores. Each does its own chunk. What we could do is prepare everything, but here is a simpler way.

We call the checkSignaturesChunk() a total of 8 times, should the machine have 8 cores. They can be run in parallel. The fun detail is the usage of the m_txChunkLeftToStart atomic int which allows for lock-free operation. We set it to 8 before we start and as each core starts to execute the checkSignaturesChunk() method, it reaches this line:

int chunkToStart = m_txChunkLeftToStart.fetch_sub(1) - 1;

The fetch_sub(1) on the atomic ensures that each core gets a different number out without any locking going on. It is practically free as long as two threads don’t happen to access that specific variable at exactly the same time. In that case the CPU just tries again, so not expensive either.
The end result is that each core will validate the scripts of a different sub-set of transactions, in parallel without any locking.

For the curious among you, in the final code there is a second atomic called m_txChunkLeftToFinish which also started out at 8, can you guess what the point of this atomic is?