Skip to content

Commit

Permalink
- Initialize member cluster ID only on connection to cluster.
Browse files Browse the repository at this point in the history
 - Don't rely on own index from the view because the view may come from
   another member, instead determine own index from own ID.

Refs #13
  • Loading branch information
ayurchen committed Nov 6, 2018
1 parent d4efa59 commit 73f8756
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 50 deletions.
3 changes: 1 addition & 2 deletions dbsim/db_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@

db::server::server(simulator& simulator,
const std::string& name,
const std::string& server_id,
const std::string& address)
: simulator_(simulator)
, storage_engine_(simulator_.params())
, mutex_()
, cond_()
, server_service_(*this)
, server_state_(*this, server_service_,
name, server_id, address, "dbsim_" + name + "_data")
name, address, "dbsim_" + name + "_data")
, last_client_id_(0)
, last_transaction_id_(0)
, appliers_()
Expand Down
1 change: 0 additions & 1 deletion dbsim/db_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ namespace db
public:
server(simulator& simulator,
const std::string& name,
const std::string& id,
const std::string& address);
void applier_thread();
void start_applier();
Expand Down
2 changes: 0 additions & 2 deletions dbsim/db_server_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ namespace db
server_state(db::server& server,
wsrep::server_service& server_service,
const std::string& name,
const std::string& server_id,
const std::string& address,
const std::string& working_dir)
: wsrep::server_state(
mutex_,
cond_,
server_service,
name,
server_id,
"",
address,
working_dir,
Expand Down
1 change: 0 additions & 1 deletion dbsim/db_simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ void db::simulator::start()
std::make_unique<db::server>(
*this,
name_os.str(),
id_os.str(),
address_os.str()))));
if (it.second == false)
{
Expand Down
5 changes: 2 additions & 3 deletions include/wsrep/server_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ namespace wsrep
* A method which will be called when the server
* has been joined to the cluster
*/
void on_connect(const wsrep::gtid& gtid);
void on_connect(const wsrep::view& view);

/**
* A method which will be called when a view
Expand Down Expand Up @@ -540,7 +540,6 @@ namespace wsrep
wsrep::condition_variable& cond,
wsrep::server_service& server_service,
const std::string& name,
const std::string& id,
const std::string& incoming_address,
const std::string& address,
const std::string& working_dir,
Expand All @@ -565,7 +564,7 @@ namespace wsrep
, streaming_appliers_()
, provider_()
, name_(name)
, id_(id)
, id_(wsrep::id::undefined())
, incoming_address_(incoming_address)
, address_(address)
, working_dir_(working_dir)
Expand Down
6 changes: 6 additions & 0 deletions include/wsrep/view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ namespace wsrep
return (members_.empty() && own_index_ == -1);
}

void fix_own_index(ssize_t i)
{
assert(i < ssize_t(members_.size()));
own_index_ = i;
}

private:
wsrep::gtid state_id_;
wsrep::seqno view_seqno_;
Expand Down
43 changes: 23 additions & 20 deletions src/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,29 +565,45 @@ wsrep::server_state::causal_read(int timeout) const
return provider_->causal_read(timeout);
}

void wsrep::server_state::on_connect(const wsrep::gtid& gtid)
void wsrep::server_state::on_connect(const wsrep::view& view)
{
assert(id_.is_undefined());
id_ = view.members()[view.own_index()].id();

wsrep::log_info() << "Server "
<< name_
<< " connected to cluster at position "
<< gtid;
<< view.state_id()
<< " with ID "
<< id_;

wsrep::unique_lock<wsrep::mutex> lock(mutex_);
connected_gtid_ = gtid;
connected_gtid_ = view.state_id();
state(lock, s_connected);
}

void wsrep::server_state::on_view(const wsrep::view& view,
wsrep::high_priority_service* high_priority_service)
{
const std::vector<wsrep::view::member>& members(view.members());
int own_idx(-1);

if (!id_.is_undefined())
{
for (size_t i = 0; i != members.size(); ++i)
{
if (members[i].id() == id_) { own_idx = i; break; }
}
}

wsrep::log_info()
<< "================================================\nView:\n"
<< " id: " << view.state_id() << "\n"
<< " status: " << view.status() << "\n"
<< " prococol_version: " << view.protocol_version() << "\n"
<< " own_index: " << view.own_index() << "\n"
<< " own_index: " << own_idx << "\n"
<< " final: " << view.final() << "\n"
<< " members";
const std::vector<wsrep::view::member>& members(view.members());
for (std::vector<wsrep::view::member>::const_iterator i(members.begin());
i != members.end(); ++i)
{
Expand All @@ -596,25 +612,11 @@ void wsrep::server_state::on_view(const wsrep::view& view,
}
wsrep::log_info() << "=================================================";
current_view_ = view;
current_view_.fix_own_index(own_idx);
if (view.status() == wsrep::view::primary)
{
wsrep::unique_lock<wsrep::mutex> lock(mutex_);
if (view.own_index() >= 0)
{
if (id_.is_undefined())
{
// No identifier was passed during server state initialization
// and the ID was generated by the provider.
id_ = view.members()[view.own_index()].id();
}
else
{
// Own identifier must not change between views.
// assert(id_ == view.members()[view.own_index()].id());
}
}
assert(view.final() == false);

//
// Reached primary from connected state. This may mean the following
//
Expand Down Expand Up @@ -703,6 +705,7 @@ void wsrep::server_state::on_view(const wsrep::view& view,
{
close_transactions_at_disconnect(*high_priority_service);
}
id_ = id::undefined();
state(lock, s_disconnected);
}
else if (state_ != s_disconnecting)
Expand Down
2 changes: 1 addition & 1 deletion src/wsrep_provider_v26.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ namespace
*reinterpret_cast<wsrep::server_state*>(app_ctx));
try
{
server_state.on_connect(view.state_id());
server_state.on_connect(view);
return WSREP_CB_SUCCESS;
}
catch (const wsrep::runtime_error& e)
Expand Down
27 changes: 18 additions & 9 deletions test/client_state_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ namespace
{
replicating_client_fixture_sync_rm()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service)
, sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
Expand All @@ -54,11 +55,12 @@ namespace
{
replicating_client_fixture_async_rm()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_async, server_service)
, sc("s1", wsrep::server_state::rm_async, server_service)
, cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
Expand All @@ -76,11 +78,12 @@ namespace
{
replicating_client_fixture_2pc()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service)
, sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
cc.do_2pc_ = true;
BOOST_REQUIRE(cc.before_command() == 0);
Expand All @@ -99,11 +102,12 @@ namespace
{
replicating_client_fixture_autocommit()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service)
, sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc, wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
cc.is_autocommit_ = true;
BOOST_REQUIRE(cc.before_command() == 0);
Expand All @@ -122,13 +126,14 @@ namespace
{
applying_client_fixture()
: server_service(sc)
, sc("s1", "s1",
, sc("s1",
wsrep::server_state::rm_async, server_service)
, cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_high_priority)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
Expand All @@ -155,13 +160,14 @@ namespace
{
applying_client_fixture_2pc()
: server_service(sc)
, sc("s1", "s1",
, sc("s1",
wsrep::server_state::rm_async, server_service)
, cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_high_priority)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
cc.do_2pc_ = true;
BOOST_REQUIRE(cc.before_command() == 0);
Expand Down Expand Up @@ -189,12 +195,13 @@ namespace
{
streaming_client_fixture_row()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service)
, sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
Expand All @@ -214,12 +221,13 @@ namespace
{
streaming_client_fixture_byte()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service)
, sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
Expand All @@ -238,12 +246,13 @@ namespace
{
streaming_client_fixture_statement()
: server_service(sc)
, sc("s1", "s1", wsrep::server_state::rm_sync, server_service)
, sc("s1", wsrep::server_state::rm_sync, server_service)
, cc(sc,
wsrep::client_id(1),
wsrep::client_state::m_local)
, tc(cc.transaction())
{
sc.mock_connect();
cc.open(cc.id());
BOOST_REQUIRE(cc.before_command() == 0);
BOOST_REQUIRE(cc.before_statement() == 0);
Expand Down
45 changes: 43 additions & 2 deletions test/mock_server_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,10 @@ namespace wsrep
{
public:
mock_server_state(const std::string& name,
const std::string& id,
enum wsrep::server_state::rollback_mode rollback_mode,
wsrep::server_service& server_service)
: wsrep::server_state(mutex_, cond_, server_service,
name, id, "", "", "./",
name, "", "", "./",
wsrep::gtid::undefined(),
1,
rollback_mode)
Expand All @@ -191,6 +190,48 @@ namespace wsrep

wsrep::mock_provider& provider() const WSREP_OVERRIDE
{ return provider_; }

// mock connected state for tests without overriding the connect()
// method.
int mock_connect(const std::string& own_id,
const std::string& cluster_name,
const std::string& cluster_address,
const std::string& state_donor,
bool bootstrap)
{
int const ret(server_state::connect(cluster_name,
cluster_address,
state_donor,
bootstrap));
if (0 == ret)
{
wsrep::id cluster_id("1");
wsrep::gtid state_id(cluster_id, wsrep::seqno(0));
std::vector<wsrep::view::member> members;
members.push_back(wsrep::view::member(wsrep::id(own_id),
"name", ""));
wsrep::view bootstrap_view(state_id,
wsrep::seqno(1),
wsrep::view::primary,
0,
0,
1,
members);
server_state::on_connect(bootstrap_view);
}
else
{
assert(0);
}

return ret;
}

int mock_connect()
{
return mock_connect(name(), "cluster", "local", "0", false);
}

private:
wsrep::default_mutex mutex_;
wsrep::default_condition_variable cond_;
Expand Down
Loading

0 comments on commit 73f8756

Please sign in to comment.