Skip to content

Commit

Permalink
Support for delivery filters in mboxes::unique_subscribers removed.
Browse files Browse the repository at this point in the history
It's because delivery filters are not supported for mutable messages in
the current version of SObjectizer.
  • Loading branch information
eao197 committed Jan 6, 2022
1 parent a4de8f9 commit a633b09
Showing 1 changed file with 99 additions and 105 deletions.
204 changes: 99 additions & 105 deletions dev/so_5_extra/mboxes/unique_subscribers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

#include <so_5/details/invoke_noexcept_code.hpp>

#include <so_5/impl/local_mbox.hpp>

#include <memory>
#include <tuple>
#include <utility>
Expand All @@ -48,16 +46,62 @@ namespace errors {
const int rc_subscription_exists =
so_5::extra::errors::mboxes_unique_subscribers_errors;

/*!
* \brief An attempt to set a delivery filter.
*
* Delivery filters are not supported by unique_subscribers mbox at the moment.
*
* \since
* v.1.5.0
*/
const int rc_delivery_filters_not_supported =
so_5::extra::errors::mboxes_unique_subscribers_errors + 1;

} /* namespace errors */

namespace details {

//
// subscriber_info_t
//
/*!
* \brief Description of a subscriber.
*
* \note
* It's assumed that the limit will be set only once and will not be
* changed after that.
*
* \since v.1.5.0
*/
struct subscriber_info_t
{
//! Subscriber.
/*!
* \attention
* It's assumed that this pointer can't be null.
*/
agent_t * m_agent;

//! Information about a subscriber.
using subscriber_info_t = so_5::impl::local_mbox_details::subscriber_info_t;
//! Message limit for that subscriber.
/*!
* \note
* This pointer can be null. It means that there is no limit.
*/
const so_5::message_limit::control_block_t * m_limit;

//! Initializing constructor.
/*!
* \attention
* The constructor doesn't check the validity of \a agent parameter,
* it's assumed that it can't be null.
*/
subscriber_info_t(
agent_t * agent,
const so_5::message_limit::control_block_t * limit )
: m_agent{ agent }
, m_limit{ limit }
{}
};

//
// data_t
Expand All @@ -84,9 +128,7 @@ struct data_t
/*!
* \brief Map from message type to subscribers.
*/
using messages_table_t = std::map<
std::type_index,
subscriber_info_t >;
using messages_table_t = std::map< std::type_index, subscriber_info_t >;

//! Map of subscribers to messages.
messages_table_t m_subscribers;
Expand Down Expand Up @@ -138,28 +180,19 @@ class actual_mbox_t final
const so_5::message_limit::control_block_t * limit,
agent_t & subscriber ) override
{
try_insert_or_modify_subscriber(
try_insert_subscriber(
type_wrapper,
&subscriber,
[&] {
return subscriber_info_t{ &subscriber, limit };
},
[&]( subscriber_info_t & info ) {
info.set_limit( limit );
} );
subscriber_info_t{ &subscriber, limit } );
}

void
unsubscribe_event_handlers(
const std::type_index & type_wrapper,
agent_t & subscriber ) override
{
modify_and_remove_subscriber_if_needed(
remove_subscriber_if_needed(
type_wrapper,
&subscriber,
[]( subscriber_info_t & info ) {
info.drop_limit();
} );
&subscriber );
}

std::string
Expand Down Expand Up @@ -198,32 +231,21 @@ class actual_mbox_t final

void
set_delivery_filter(
const std::type_index & msg_type,
const delivery_filter_t & filter,
agent_t & subscriber ) override
const std::type_index & /*msg_type*/,
const delivery_filter_t & /*filter*/,
agent_t & /*subscriber*/ ) override
{
try_insert_or_modify_subscriber(
msg_type,
&subscriber,
[&] {
return subscriber_info_t{ &subscriber, &filter };
},
[&]( subscriber_info_t & info ) {
info.set_filter( filter );
} );
SO_5_THROW_EXCEPTION( errors::rc_delivery_filters_not_supported,
"delivery filters can't be used with "
"unique_subscribers mboxes" );
}

void
drop_delivery_filter(
const std::type_index & msg_type,
agent_t & subscriber ) noexcept override
const std::type_index & /*msg_type*/,
agent_t & /*subscriber*/ ) noexcept override
{
modify_and_remove_subscriber_if_needed(
msg_type,
&subscriber,
[]( subscriber_info_t & info ) {
info.drop_filter();
} );
// No nothing.
}

environment_t &
Expand All @@ -233,47 +255,37 @@ class actual_mbox_t final
}

private :
template< typename Info_Maker, typename Info_Changer >
void
try_insert_or_modify_subscriber(
try_insert_subscriber(
const std::type_index & type_wrapper,
agent_t * subscriber,
Info_Maker maker,
Info_Changer changer )
subscriber_info_t new_subscriber_info )
{
this->lock_and_perform( [&] {
auto it = this->m_subscribers.find( type_wrapper );
if( it == this->m_subscribers.end() )
{
// There isn't such message type yet.
m_subscribers.emplace( type_wrapper, maker() );
m_subscribers.emplace( type_wrapper, new_subscriber_info );
}
else
{
auto & subscriber_info = it->second;

// Existing info can be modified only if the info was
// created for the same agent.
if( subscriber != subscriber_info.subscriber_pointer() )
SO_5_THROW_EXCEPTION(
errors::rc_subscription_exists,
std::string{ "subscription is already exists "
"for message type '" }
+ type_wrapper.name()
+ "'" );

// State of the single subscriber has to be changed.
changer( subscriber_info );
// We assume that if a subscription exists it is made by
// different agent (because the current subscriber won't
// make the same subscription again).
SO_5_THROW_EXCEPTION(
errors::rc_subscription_exists,
std::string{ "subscription is already exists "
"for message type '" }
+ type_wrapper.name()
+ "'" );
}
} );
}

template< typename Info_Changer >
void
modify_and_remove_subscriber_if_needed(
remove_subscriber_if_needed(
const std::type_index & type_wrapper,
agent_t * subscriber,
Info_Changer changer )
agent_t * subscriber )
{
this->lock_and_perform( [&] {
auto it = this->m_subscribers.find( type_wrapper );
Expand All @@ -283,15 +295,10 @@ class actual_mbox_t final

// Skip all other actions if the subscription is
// made for a different agent.
if( subscriber == subscriber_info.subscriber_pointer() )
if( subscriber == subscriber_info.m_agent )
{
// Subscriber must be modified.
changer( subscriber_info );

// If info about subscriber becomes empty after modification
// then subscriber info must be removed.
if( subscriber_info.empty() )
this->m_subscribers.erase( it );
// Subscriber must be removed.
this->m_subscribers.erase( it );
}
}
} );
Expand Down Expand Up @@ -328,39 +335,26 @@ class actual_mbox_t final
const message_ref_t & message,
unsigned int overlimit_reaction_deep ) const
{
const auto delivery_status =
agent_info.must_be_delivered(
message,
[]( const message_ref_t & m ) -> message_t & {
return *m;
} );

if( delivery_possibility_t::must_be_delivered == delivery_status )
{
using namespace so_5::message_limit::impl;

try_to_deliver_to_agent(
this->m_id,
agent_info.subscriber_reference(),
agent_info.limit(),
msg_type,
message,
overlimit_reaction_deep,
tracer.overlimit_tracer(),
[&] {
tracer.push_to_queue( agent_info.subscriber_pointer() );

agent_t::call_push_event(
agent_info.subscriber_reference(),
agent_info.limit(),
this->m_id,
msg_type,
message );
} );
}
else
tracer.message_rejected(
agent_info.subscriber_pointer(), delivery_status );
using namespace so_5::message_limit::impl;

try_to_deliver_to_agent(
this->m_id,
*(agent_info.m_agent),
agent_info.m_limit,
msg_type,
message,
overlimit_reaction_deep,
tracer.overlimit_tracer(),
[&] {
tracer.push_to_queue( agent_info.m_agent );

agent_t::call_push_event(
*(agent_info.m_agent),
agent_info.m_limit,
this->m_id,
msg_type,
message );
} );
}
};

Expand Down

0 comments on commit a633b09

Please sign in to comment.