-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.3 Asio ThreadPool Dispatcher
The purpose of asio_thread_pool
dispatcher is to provide a possibility to handle IO-events from Asio and SO-events from SObjectizer on the context of the Asio's io_context
. It means that SObjectizer-agents can perform IO-operations during ordinary processing of SObjectizer-messages. For example:
class request_sender : public so_5::agent_t
{
asio::io_context::strand strand_;
public:
request_sender(context_t ctx, asio::io_context & io_svc)
: so_5::agent_t(std::move(ctx)), strand_(io_svc)
{...}
asio::io_context::strand & strand() noexcept { return strand_; }
...
private:
void on_perform_request(mhood_t<perform_request> req)
{
using asio::ip::tcp;
tcp::socket s(strand_.context());
tcp::resolver resolver(strand_.context());
asio::connect(s, resolver.resolve(req->host(), req->port()));
asio::write(s, asio::buffer(req->payload()));
std::array<char, 1024> reply_buf;
const auto n = asio::read(s, asio::buffer(reply_buf));
so_5::send<request_result>(req->reply_to(), reply_buf.data(), n);
}
};
An instance of asio_thread_pool
dispatcher spawns N worker threads and calls asio::io_context::run()
on each of them. Then the dispatcher dispatches event handlers of all agents bound to the dispatcher via asio::post()
. It means that Asio's io_context
handles the agent's events just like any other Asio's events (like results of async IO-operations).
The asio_thread_pool
closes all work thread when it is no more used (as well as result of SObjectizer Environment's shutdown).
A header file so_5_extra/disp/asio_thread_pool/pub.hpp
must be included. The main SObjectizer's header file so_5/pub.hpp
can also be necessary:
// Definition of asio_thread_pool dispatcher.
#include <so_5_extra/disp/asio_thread_pool/pub.hpp>
// Main SObjectizer header.
#include <so_5/pub.hpp>
Then disp_params_t
instance must be prepared. For example:
namespace asio_tp = so_5::extra::disp::asio_thread_pool;
asio_tp::disp_params_t params;
// Count of work threads.
params.thread_count(8);
// Dispatcher must use its own io_context object.
params.use_own_io_context();
Then an instance of dispatcher must be created:
auto disp = asio_tp::make_dispatcher(
// SObjectizer Environment to work in.
env,
// Name for data source.
"my_asio_tp",
// Parameters for new dispatcher.
std::move(params) );
Then binders for agents can be obtained via dispatcher_handle_t::binder()
method:
env.introduce_coop([&](so_5::coop_t & coop) {
// Create agent.
auto agent = std::make_unique<request_sender>(disp.io_context());
// Get agent's strand. It is necessary for binder.
auto & strand = agent->strand();
// Move agent into the coop with a binder to asio_thread_pool.
coop.add_agent(std::move(agent), disp.binder(strand));
});
Because asio::io_context::run()
is called on the context of several work threads then Asio can call event handler on the context of any of those threads. Moreover, if there are several pending events then Asio can schedule the execution of them on several threads at the same time.
To prevent the execution of several events for the same object at the same time asio::io_context::strand
is used. When an agent is being bound to asio_thread_pool
dispatcher a specific instance of strand
must be specified to the binder for that agent. This strand
will prevent the execution of several event handlers for that agent.
It is a task of the user to create and maintain an instance of strand
for an agent. There are at least three ways to do that.
Since v.1.3.0 there is a version of dispatcher_handle_t::binder()
method that automatically creates a strand for a new agent:
using namespace so_5::extra::disp::asio_thread_pool;
asio::io_context io_ctx;
so_5::environment_t & env = ...;
auto disp = make_dispatcher( env, "my_disp", io_ctx );
env.introduce_coop( [&]( so_5::coop_t & coop ) {
// This agent will use its own strand object.
coop.make_agent_with_binder< some_agent_type >(
disp.binder(),
... );
// This agent will use its own strand object.
coop.make_agent_with_binder< another_agent_type >(
disp.binder(),
... );
...
} );
Note that two separate strands will be created in the example above.
class my_agent : public so_5::agent_t {
asio::io_context::strand strand_;
...
public:
my_agent(context_t ctx, asio::io_context & io_svc, ...)
: so_5::agent_t(std::move(ctx))
, strand_(io_svc)
...
{}
...
// Public getter for the agent's strand.
asio::io_context::strand & strand() noexcept { return strand_; }
};
In such case the addition of agent to the coop will consist three steps:
// 1. Create an agent.
auto agent = std::make_unique<my_agent>(env, io_svc, ...);
// 2. Get the agent's strand.
auto & strand = agent->strand();
// 3. Add the agent to a coop.
coop.add_agent(
// An agent to be added.
std::move(agent),
// Binder for that agent.
disp.binder(strand));
Please note that the following code is not safe and must be avoilded:
auto agent = std::make_unique<my_agent>(env, io_svc, ...);
coop.add_agent(std::move(agent),
// At this point `agent` can be an empty unique_ptr and
// call to my_agent::strand() via empty unique_ptr will lead to
// segmentation fault or something similar.
disp.binder(agent->strand());
Something like that:
// Create a strand and place control on it to the coop.
auto strand_raw_ptr = coop.take_under_control(
std::make_unique<asio::io_context::strand>(io_svc));
// Use the strand for the new agent.
coop.make_agent_with_binder<some_agent>(
disp.binder(*strand_raw_ptr),
... // Args for some_agent's constructor.
);
This way allows to use the same strand object for several agents:
auto strand_raw_ptr = coop.take_under_control(
std::make_unique<asio::io_context::strand>(io_svc));
// Use the strand for the new agent.
coop.make_agent_with_binder<some_agent>(
disp.binder(*strand_raw_ptr),
... // Args for some_agent's constructor.
);
// Use the same strand for another agent.
// It means that some_agent and another_agent can't work
// in parallel at different work threads.
coop.make_agent_with_binder<another_agent>(
disp.binder(*strand_raw_ptr),
... // Args for another_agent's constructor.
);
asio_thread_pool
can use its own copy of Asio's io_context
. This must be specified by disp_params_t::use_own_io_context
method:
so_5::extra::disp::asio_thread_pool::disp_params_t params;
params.use_own_io_context(); // New instance of io_context will be created dynamically here.
This io_context
instance will be destroyed with the instance of asio_thread_pool
dispatcher.
But asio_thread_pool
can also use an io_context
created by someone else. For example:
int main()
{
// An io_context which will live to the end of the whole application.
asio::io_context io_svc;
...
// Start of SObjectizer Environment.
so_5::launch([&](so_5::environment_t & env) {
// Create an asio_thread_pool dispatcher.
so_5::extra::disp::asio_thread_pool::disp_params params;
params.use_external_io_context(io_svc);
auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher(
env, "my_asio_tp", std::move(params));
...
});
...
}
In such case asio_thread_pool
won't control lifetime of io_context
instance. But asio_thread_pool
will call io_context::run()
and io_context::stop()
methods.
asio_thread_pool
is implemented by template-classes which behaviour is contrilled by specific traits. By default default_traits_t
is used as traits type. It means that a call:
auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher(...);
is equivalent to that call:
auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher<
so_5::extra::disp::asio_thread_pool::default_traits_t>(...);
A user can specify its own traits type:
struct my_traits {
... // Some stuff
};
...
auto disp = so_5::extra::disp::asio_thread_pool::make_dispatcher<my_traits>(...);
A custom traits type must be a type which defines the following internals types:
-
thread_type
. A type of thread class which must be used byasio_thread_pool
dispatcher for creation of worker thread.
For example:
struct my_traits {
using thread_type = my_custom_type;
};
std::thread
is used as a class for working with threads by default. But a user can specify its own custom thread type via name thread_type
in a custom traits type.
A custom thread type must be a class which looks like:
class custom_thread_type {
public :
// Must provide this constructor.
// F -- is a type of functional object which can be converted
// into std::function<void()>.
template<typename F>
custom_thread_type(F && f) {...}
// Destructor must join thread if it is not joined yet.
~custom_thread_type() noexcept {...}
// The same semantic like std::thread::join.
void join() noexcept {...}
};
This class doesn't need to be DefaultConstructible, CopyConstructible, MoveConstructible, Copyable or Moveable.