}
};
+inline void encode(const mds_gid_t &v, bufferlist& bl, uint64_t features = 0) {
+ uint64_t vv = v;
+ encode_raw(vv, bl);
+}
+
+inline void decode(mds_gid_t &v, bufferlist::const_iterator& p) {
+ uint64_t vv;
+ decode_raw(vv, p);
+ v = vv;
+}
+
typedef int32_t fs_cluster_id_t;
constexpr fs_cluster_id_t FS_CLUSTER_ID_NONE = -1;
membership.send_ack = [=, this](QuiesceMap&& ack) {
if (me == membership.leader) {
// loopback
- quiesce_db_manager->submit_ack_from(me, std::move(ack));
+ quiesce_db_manager->submit_peer_ack({me, std::move(ack)});
return 0;
} else {
std::lock_guard guard(mds_lock);
auto ack_msg = make_message<MMDSQuiesceDbAck>();
dout(10) << "sending ack " << ack << " to the leader " << membership.leader << dendl;
- ack_msg->encode_payload_from(me, ack);
+ ack_msg->encode_payload_from({me, std::move(ack)});
return send_message_mds(ack_msg, addrs);
}
};
auto addrs = mdsmap->get_info_gid(to).addrs;
auto listing_msg = make_message<MMDSQuiesceDbListing>();
dout(10) << "sending listing " << db << " to the peer " << to << dendl;
- listing_msg->encode_payload_from(me, db);
+ listing_msg->encode_payload_from({me, std::move(db)});
return send_message_mds(listing_msg, addrs);
};
}
{
const auto& req = ref_cast<MMDSQuiesceDbListing>(m);
mds_gid_t gid;
- QuiesceDbListing db_listing;
- req->decode_payload_into(gid, db_listing);
+ QuiesceDbPeerListing peer_listing;
+ req->decode_payload_into(peer_listing);
if (quiesce_db_manager) {
- dout(10) << "got " << db_listing << " from peer " << gid << dendl;
- int result = quiesce_db_manager->submit_listing_from(gid, std::move(db_listing));
+ dout(10) << "got " << peer_listing << dendl;
+ int result = quiesce_db_manager->submit_peer_listing(std::move(peer_listing));
if (result != 0) {
- dout(3) << "error (" << result << ") submitting " << db_listing << " from peer " << gid << dendl;
+ dout(3) << "error (" << result << ") submitting " << peer_listing << dendl;
}
} else {
- dout(5) << "no db manager to process " << db_listing << dendl;
+ dout(5) << "no db manager to process " << peer_listing << dendl;
}
return true;
}
{
const auto& req = ref_cast<MMDSQuiesceDbAck>(m);
mds_gid_t gid;
- QuiesceMap diff_map;
- req->decode_payload_into(gid, diff_map);
+ QuiesceDbPeerAck peer_ack;
+ req->decode_payload_into(peer_ack);
if (quiesce_db_manager) {
- dout(10) << "got ack " << diff_map << " from peer " << gid << dendl;
- int result = quiesce_db_manager->submit_ack_from(gid, std::move(diff_map));
+ dout(10) << "got " << peer_ack << dendl;
+ int result = quiesce_db_manager->submit_peer_ack(std::move(peer_ack));
if (result != 0) {
- dout(3) << "error (" << result << ") submitting an ack from peer " << gid << dendl;
+ dout(3) << "error (" << result << ") submitting and ack from " << peer_ack.origin << dendl;
}
} else {
- dout(5) << "no db manager to process an ack: " << diff_map << dendl;
+ dout(5) << "no db manager to process " << peer_ack << dendl;
}
return true;
}
using QuiesceRoot = std::string;
using QuiesceSetVersion = uint64_t;
+namespace QuiesceInterface {
+ using PeerId = mds_gid_t;
+}
+
struct QuiesceDbVersion {
epoch_t epoch;
QuiesceSetVersion set_version;
/// contain all sets that have their version > than the last acked by the peer.
struct QuiesceDbListing {
QuiesceDbVersion db_version = {0, 0};
- /// @brief Crusially, the precise `db_age` must be included in every db listing
+ /// @brief Crucially, the precise `db_age` must be included in every db listing
/// This data is used by all replicas to update their calculated DB TIME ZERO.
/// All events in the database are measured relative to the DB TIME ZERO
QuiesceTimeInterval db_age = QuiesceTimeInterval::zero();
return os << "q-db[v:" << dbl.db_version << " sets:" << active << "/" << inactive << "]";
}
+struct QuiesceDbPeerListing {
+ QuiesceInterface::PeerId origin;
+ QuiesceDbListing db;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbPeerListing& dbl)
+{
+ return os << dbl.db << " from " << dbl.origin;
+}
+
/// @brief `QuiesceMap` is a root-centric representation of the quiesce database
/// It lists roots with their effective states as of particular version.
/// Additionally, the same structure is used by the peers when reporting
return os << "q-map[v:" << map.db_version << " roots:" << active << "/" << inactive << "]";
}
+struct QuiesceDbPeerAck {
+ QuiesceInterface::PeerId origin;
+ QuiesceMap diff_map;
+};
+
+template <class CharT, class Traits>
+static std::basic_ostream<CharT, Traits>&
+operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceDbPeerAck& ack)
+{
+ return os << "ack " << ack.diff_map << " from " << ack.origin;
+}
+
inline QuiesceTimeInterval interval_saturate_add(QuiesceTimeInterval lhs, QuiesceTimeInterval rhs)
{
// assuming an unsigned time interval.
};
namespace QuiesceInterface {
- using PeerId = mds_gid_t;
/// @brief A callback from the manager to the agent with an up-to-date root list
/// The map is mutable and will be used as synchronous agent ack if the return value is true
using AgentNotify = std::function<bool(QuiesceMap&)>;
#include "include/encoding.h"
#include <stdint.h>
+struct QuiesceDbEncoding {
+ static constexpr int version = 1;
+ static constexpr int compat = 1;
+};
+
void encode(QuiesceDbVersion const& v, bufferlist& bl, uint64_t features = 0)
{
encode(v.epoch, bl, features);
decode(listing.sets, p);
}
+void encode(QuiesceDbPeerListing const& listing, bufferlist& bl, uint64_t features = 0)
+{
+ encode(listing.origin, bl, features);
+ encode(listing.db, bl, features);
+}
+
+void decode(QuiesceDbPeerListing& listing, bufferlist::const_iterator& p)
+{
+ decode(listing.origin, p);
+ decode(listing.db, p);
+}
+
void encode(QuiesceMap::RootInfo const& root, bufferlist& bl, uint64_t features = 0)
{
encode(root.state, bl, features);
decode(map.roots, p);
}
+void encode(QuiesceDbPeerAck const& ack, bufferlist& bl, uint64_t features = 0)
+{
+ encode(ack.origin, bl, features);
+ encode(ack.diff_map, bl, features);
+}
+
+void decode(QuiesceDbPeerAck& ack, bufferlist::const_iterator& p)
+{
+ decode(ack.origin, p);
+ decode(ack.diff_map, p);
+}
{
// as a replica, we only care about the latest update
while (db_updates.size() > 1) {
- dout(10) << "skipping an older update from " << db_updates.front().first << " version " << db_updates.front().second.db_version << dendl;
+ dout(10) << "skipping an older update from " << db_updates.front().origin << " version " << db_updates.front().db.db_version << dendl;
db_updates.pop();
}
return QuiesceTimeInterval::max();
}
- QuiesceDbListing &update = db_updates.back().second;
+ QuiesceDbListing &update = db_updates.back().db;
if (update.db_version.epoch != membership.epoch) {
dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl;
// only consider db submissions from unknown peers
while (!unknown_peers.empty() && !db_updates.empty()) {
- auto &[from, update] = db_updates.front();
+ auto &from = db_updates.front().origin;
+ auto &update = db_updates.front().db;
if (update.db_version.epoch == membership.epoch && unknown_peers.erase(from) > 0) {
// see if this peer's version is newer than mine
if (db.set_version < update.db_version.set_version) {
submit_condition.notify_all();
return 0;
}
+
// acks the messaging system
- int submit_ack_from(QuiesceInterface::PeerId sender, const QuiesceMap& diff_map) {
+ int submit_peer_ack(QuiesceDbPeerAck&& ack) {
std::lock_guard l(submit_mutex);
if (!cluster_membership || !cluster_membership->is_leader()) {
return -EPERM;
}
- if (!cluster_membership->members.contains(sender)) {
+ if (!cluster_membership->members.contains(ack.origin)) {
return -ESTALE;
}
- pending_acks.push({ sender, diff_map });
+ pending_acks.push(std::move(ack));
submit_condition.notify_all();
return 0;
}
// -> EPERM if this is the leader
// process an incoming listing from a leader
- int submit_listing_from(QuiesceInterface::PeerId sender, QuiesceDbListing&& listing) {
+ int submit_peer_listing(QuiesceDbPeerListing&& listing) {
std::lock_guard l(submit_mutex);
if (!cluster_membership) {
return -EPERM;
}
- if (cluster_membership->epoch != listing.db_version.epoch) {
+ if (cluster_membership->epoch != listing.db.db_version.epoch) {
return -ESTALE;
}
- pending_db_updates.push({sender, std::move(listing)});
+ pending_db_updates.push(std::move(listing));
submit_condition.notify_all();
return 0;
}
std::optional<AgentCallback> agent_callback;
std::optional<QuiesceClusterMembership> cluster_membership;
- std::queue<std::pair<QuiesceInterface::PeerId, QuiesceDbListing>> pending_db_updates;
- std::queue<std::pair<QuiesceInterface::PeerId, QuiesceMap>> pending_acks;
+ std::queue<QuiesceDbPeerListing> pending_db_updates;
+ std::queue<QuiesceDbPeerAck> pending_acks;
std::deque<RequestContext*> pending_requests;
class QuiesceDbThread : public Thread {
// noop to prevent unnecessary overheads
}
- void encode_payload_from(mds_gid_t const&gid, QuiesceMap const&diff_map)
+ void encode_payload_from(QuiesceDbPeerAck const& ack)
{
- using ceph::encode;
-
- ceph_assert(gid != MDS_GID_NONE);
-
- ENCODE_START(1, 1, payload);
- encode(gid, payload);
- encode(diff_map, payload);
+ ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload);
+ encode(ack, payload);
ENCODE_FINISH(payload);
}
// noop to prevent unnecessary overheads
}
- void decode_payload_into(mds_gid_t &gid, QuiesceMap &diff_map) const
+ void decode_payload_into(QuiesceDbPeerAck &ack) const
{
- using ceph::decode;
auto p = payload.cbegin();
- DECODE_START(1, p);
- decode(gid, p);
- decode(diff_map, p);
+ DECODE_START(QuiesceDbEncoding::version, p);
+ decode(ack, p);
DECODE_FINISH(p);
}
// noop to prevent unnecessary overheads
}
- void encode_payload_from(mds_gid_t const& gid, QuiesceDbListing const& db_listing)
+ void encode_payload_from(QuiesceDbPeerListing const& peer_listing)
{
- using ceph::encode;
-
- ceph_assert(gid != MDS_GID_NONE);
-
- ENCODE_START(1, 1, payload);
- encode(gid, payload);
- encode(db_listing, payload);
+ ENCODE_START(QuiesceDbEncoding::version, QuiesceDbEncoding::compat, payload);
+ encode(peer_listing, payload);
ENCODE_FINISH(payload);
}
// noop to prevent unnecessary overheads
}
- void decode_payload_into(mds_gid_t &gid, QuiesceDbListing &db_listing) const
+ void decode_payload_into(QuiesceDbPeerListing &peer_listing) const
{
- using ceph::decode;
auto p = payload.cbegin();
- DECODE_START(1, p);
- decode(gid, p);
- decode(db_listing, p);
+ DECODE_START(QuiesceDbEncoding::version, p);
+ decode(peer_listing, p);
DECODE_FINISH(p);
}
if (epoch == this->epoch) {
if (this->managers.contains(recipient)) {
dout(10) << "listing from " << me << " (leader=" << leader << ") to " << recipient << " for version " << listing.db_version << " with " << listing.sets.size() << " sets" << dendl;
- this->managers[recipient]->submit_listing_from(me, std::move(listing));
+ this->managers[recipient]->submit_peer_listing({me, std::move(listing)});
comms_cond.notify_all();
return 0;
}
it++;
}
}
- this->managers[leader]->submit_ack_from(me, std::move(diff_map));
+ this->managers[leader]->submit_peer_ack({me, std::move(diff_map)});
comms_cond.notify_all();
l.unlock();
while(!done_hooks.empty()) {