Skip to content

so5extra 1.6 Asio ThreadPool Dispatcher

Yauheni Akhotnikau edited this page Apr 21, 2023 · 1 revision

Purpose

The purpose of asio_thread_pool dispatcher is to provide a possibility to handle IO-events from Asio and SO-events from SObjectizer on the context of the Asio's io_context. It means that SObjectizer-agents can perform IO-operations during ordinary processing of SObjectizer-messages. For example:

class request_sender : public so_5::agent_t
{
    asio::io_context::strand strand_;
public:
    request_sender(context_t ctx, asio::io_context & io_svc)
        : so_5::agent_t(std::move(ctx)), strand_(io_svc)
    {...}
    
    asio::io_context::strand & strand() noexcept { return strand_; }
    ...
private:
    void on_perform_request(mhood_t<perform_request> req)
    {
        using asio::ip::tcp;
        tcp::socket s(strand_.context());
        tcp::resolver resolver(strand_.context());
        asio::connect(s, resolver.resolve(req->host(), req->port()));
        
        asio::write(s, asio::buffer(req->payload()));
        
        std::array<char, 1024> reply_buf;
        const auto n = asio::read(s, asio::buffer(reply_buf));
        so_5::send<request_result>(req->reply_to(), reply_buf.data(), n);
    }
};

Working Principle

An instance of asio_thread_pool dispatcher spawns N worker threads and calls asio::io_context::run() on each of them. Then the dispatcher dispatches event handlers of all agents bound to the dispatcher via asio::post(). It means that Asio's io_context handles the agent's events just like any other Asio's events (like results of async IO-operations).

The asio_thread_pool closes all work thread when it is no more used (as well as result of SObjectizer Environment's shutdown).

How To Use

A header file so_5_extra/disp/asio_thread_pool/pub.hpp must be included. The main SObjectizer's header file so_5/pub.hpp can also be necessary:

// Definition of asio_thread_pool dispatcher.
#include <so_5_extra/disp/asio_thread_pool/pub.hpp>

// Main SObjectizer header.
#include <so_5/pub.hpp>

Then disp_params_t instance must be prepared. For example:

namespace asio_tp = so_5::extra::disp::asio_thread_pool;
asio_tp::disp_params_t params;
// Count of work threads.
params.thread_count(8);
// Dispatcher must use its own io_context object.
params.use_own_io_context();

Then an instance of dispatcher must be created:

auto disp = asio_tp::make_dispatcher(
    // SObjectizer Environment to work in.
    env,
    // Name for data source.
    "my_asio_tp",
    // Parameters for new dispatcher.
    std::move(params) );

Then binders for agents can be obtained via dispatcher_handle_t::binder() method:

env.introduce_coop([&](so_5::coop_t & coop) {
    // Create agent.
    auto agent = std::make_unique<request_sender>(disp.io_context());
    // Get agent's strand. It is necessary for binder.
    auto & strand = agent->strand();
    // Move agent into the coop with a binder to asio_thread_pool.
    coop.add_agent(std::move(agent), disp.binder(strand));
});

Strand Must Be Known At The Moment Of Agent Binding

Because asio::io_context::run() is called on the context of several work threads then Asio can call event handler on the context of any of those threads. Moreover, if there are several pending events then Asio can schedule the execution of them on several threads at the same time.

To prevent the execution of several events for the same object at the same time asio::io_context::strand is used. When an agent is being bound to asio_thread_pool dispatcher a specific instance of strand must be specified to the binder for that agent. This strand will prevent the execution of several event handlers for that agent.

It is a task of the user to create and maintain an instance of strand for an agent. There are at least three ways to do that.

A Strand Can Be Created By dispatcher_handle_t

Since v.1.3.0 there is a version of dispatcher_handle_t::binder() method that automatically creates a strand for a new agent:

using namespace so_5::extra::disp::asio_thread_pool;
asio::io_context io_ctx;
so_5::environment_t & env = ...;
auto disp = make_dispatcher( env, "my_disp", io_ctx );
env.introduce_coop( [&]( so_5::coop_t & coop ) {
    // This agent will use its own strand object.
    coop.make_agent_with_binder< some_agent_type >(
        disp.binder(),
        ... );
    // This agent will use its own strand object.
    coop.make_agent_with_binder< another_agent_type >(
        disp.binder(),
        ... );
    ...
} );

Note that two separate strands will be created in the example above.

A Strand Can Be A Part Of Agent

class my_agent : public so_5::agent_t {
    asio::io_context::strand strand_;
    ...
public:
    my_agent(context_t ctx, asio::io_context & io_svc, ...)
        : so_5::agent_t(std::move(ctx))
        , strand_(io_svc)
        ...
    {}
    ...
    // Public getter for the agent's strand.
    asio::io_context::strand & strand() noexcept { return strand_; }
};

In such case the addition of agent to the coop will consist three steps:

// 1. Create an agent.
auto agent = std::make_unique<my_agent>(env, io_svc, ...);
// 2. Get the agent's strand.
auto & strand = agent->strand();
// 3. Add the agent to a coop.
coop.add_agent(
    // An agent to be added.
    std::move(agent),
    // Binder for that agent.
    disp.binder(strand));

Please note that the following code is not safe and must be avoilded:

auto agent = std::make_unique<my_agent>(env, io_svc, ...);
coop.add_agent(std::move(agent),
    // At this point `agent` can be an empty unique_ptr and
    // call to my_agent::strand() via empty unique_ptr will lead to
    // segmentation fault or something similar.
    disp.binder(agent->strand());

A Strand Can Be Created Outside of An Agent

Something like that:

// Create a strand and place control on it to the coop.
auto strand_raw_ptr = coop.take_under_control(
    std::make_unique<asio::io_context::strand>(io_svc));
// Use the strand for the new agent.
coop.make_agent_with_binder<some_agent>(
    disp.binder(*strand_raw_ptr),
    ... // Args for some_agent's constructor.
    );

This way allows to use the same strand object for several agents:

auto strand_raw_ptr = coop.take_under_control(
    std::make_unique<asio::io_context::strand>(io_svc));
// Use the strand for the new agent.
coop.make_agent_with_binder<some_agent>(
    disp.binder(*strand_raw_ptr),
    ... // Args for some_agent's constructor.
    );
// Use the same strand for another agent.
// It means that some_agent and another_agent can't work
// in parallel at different work threads.
coop.make_agent_with_binder<another_agent>(
    disp.binder(*strand_raw_ptr),
    ... // Args for another_agent's constructor.
    );

Own Or External Asio's io_context

asio_thread_pool can use its own copy of Asio's io_context. This must be specified by disp_params_t::use_own_io_context method:

so_5::extra::disp::asio_thread_pool::disp_params_t params;
params.use_own_io_context(); // New instance of io_context will be created dynamically here.

This io_context instance will be destroyed with the instance of asio_thread_pool dispatcher.

But asio_thread_pool can also use an io_context created by someone else. For example:

int main()
{
    // An io_context which will live to the end of the whole application.
    asio::io_context io_svc;
    ...
    // Start of SObjectizer Environment.
    so_5::launch([&](so_5::environment_t & env) {
        // Create an asio_thread_pool dispatcher.
        so_5::extra::disp::asio_thread_pool::disp_params params;
        params.use_external_io_context(io_svc);
        auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher(
            env, "my_asio_tp", std::move(params));
        ...
    });
    ...
}

In such case asio_thread_pool won't control lifetime of io_context instance. But asio_thread_pool will call io_context::run() and io_context::stop() methods.

Custom Traits

asio_thread_pool is implemented by template-classes which behaviour is contrilled by specific traits. By default default_traits_t is used as traits type. It means that a call:

auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher(...);

is equivalent to that call:

auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher<
        so_5::extra::disp::asio_thread_pool::default_traits_t>(...);

The type so_5::extra::disp::asio_thread_pool::default_traits_t is empty at the moment. It wasn't empty in so5extra-1.4, but its content was removed during the development of v.1.5.0. Because default_traits_t is empty and the implementation of asio_thread_pool dispatcher doesn't take anything from it, there is no sense to specify a custom traits type (at the current moment).

Despite the fact that the type so_5::extra::disp::asio_thread_pool::default_traits_t it empty it is kept here to have a possibility to extend it in future versions.

Custom Thread Type

If a user wants to use custom thread type with asio_one_thread dispatcher then he/she has to use the standard SObjectizer's mechanism with abstract_work_thread_t/abstract_work_thread_factory_t interfaces. That mechanism was introduced in SObjectizer-5.7.3 and so5exta-1.6 utilizes it.

Clone this wiki locally