const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
{
std::set<NvmeAnaGrpId> allocated;
+ if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+ auto gw_epoch_it = gw_epoch.find(group_key);
+ if (gw_epoch_it == gw_epoch.end()) {
+ gw_epoch[group_key] = epoch;
+ dout(10) << "Allocated first gw_epoch : group_key "
+ << group_key << " epoch " << gw_epoch[group_key] << dendl;
+ }
+ }
for (auto& itr: created_gws[group_key]) {
allocated.insert(itr.second.ana_grp_id);
if (itr.first == gw_id) {
fsm_timers.erase(group_key);
created_gws[group_key].erase(gw_id);
- if (created_gws[group_key].size() == 0)
+ if (created_gws[group_key].size() == 0) {
created_gws.erase(group_key);
+ gw_epoch.erase(group_key);
+ }
return 0;
}
return -EINVAL;
}
+void NVMeofGwMap::gw_performed_startup(const NvmeGwId &gw_id,
+ const NvmeGroupKey& group_key, bool &propose_pending)
+{
+ dout(4) << "GW performed the full startup " << gw_id << dendl;
+ propose_pending = true;
+ increment_gw_epoch( group_key);
+}
+
+void NVMeofGwMap::increment_gw_epoch(const NvmeGroupKey& group_key)
+{
+ if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+ gw_epoch[group_key] ++;
+ dout(4) << "incremented epoch of " << group_key
+ << " " << gw_epoch[group_key] << dendl;
+ }
+}
+
int NVMeofGwMap::get_num_namespaces(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key, const BeaconSubsystems& subs)
{
gw_id, group_key, state_itr.second,state_itr.first, propose_pending);
}
propose_pending = true; // map should reflect that gw becames Created
- if (propose_pending) validate_gw_map(group_key);
+ if (propose_pending) {
+ validate_gw_map(group_key);
+ increment_gw_epoch(group_key);
+ }
} else {
dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map "
<< gw_id << dendl;
state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE;
}
propose_pending = true; // map should reflect that gw becames Unavailable
- if (propose_pending) validate_gw_map(group_key);
+ if (propose_pending) {
+ validate_gw_map(group_key);
+ increment_gw_epoch(group_key);
+ }
} else {
dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map "
<< gw_id << dendl;
state_itr.first, last_osd_epoch, propose_pending);
}
}
- if (propose_pending) validate_gw_map(group_key);
+ if (propose_pending) {
+ validate_gw_map(group_key);
+ increment_gw_epoch(group_key);
+ }
}
void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
}
if (propose) {
validate_gw_map(group_key);
+ increment_gw_epoch(group_key);
}
}
}
<< "for GW " << gw_id << dendl;
}
}
- if (map_modified) validate_gw_map(group_key);
+ if (map_modified) {
+ validate_gw_map(group_key);
+ increment_gw_epoch(group_key);
+ }
}
void NVMeofGwMap::fsm_handle_to_expired(
//another Trigger for GW down (failover)
process_gw_map_gw_down(gw_id, group_key, map_modified);
}
- if (map_modified) validate_gw_map(group_key);
+ if (map_modified) {
+ validate_gw_map(group_key);
+ increment_gw_epoch(group_key);
+ }
}
struct CMonRequestProposal : public Context {
<< " active " << is_active() << dendl;
// Initialize last_beacon to identify transitions of available
// GWs to unavailable state
- for (const auto& created_map_pair: map.created_gws) {
- const auto& group_key = created_map_pair.first;
- const NvmeGwMonStates& gw_created_map = created_map_pair.second;
- for (const auto& gw_created_pair: gw_created_map) {
- const auto& gw_id = gw_created_pair.first;
+ for (auto &created_map_pair: map.created_gws) {
+ const auto &group_key = created_map_pair.first;
+ NvmeGwMonStates& gw_created_map = created_map_pair.second;
+ for (auto& gw_created_pair: gw_created_map) {
+ auto& gw_id = gw_created_pair.first;
if (gw_created_pair.second.availability ==
gw_availability_t::GW_AVAILABLE) {
dout(10) << "synchronize last_beacon for GW :" << gw_id << dendl;
LastBeacon lb = {gw_id, group_key};
last_beacon[lb] = last_tick;
}
+ // force send ack after nearest beacon after leader re-election
+ gw_created_pair.second.beacon_index =
+ g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
}
}
}
dout(10) << " pending " << pending_map << dendl;
}
+void NVMeofGwMon::recreate_gw_epoch() {
+ //check pending map - if exists group_key but no gw_epoch[group_key]
+ //- create it and assign to epoch (offset)
+ for (auto& created_map_pair: pending_map.created_gws) {
+ auto group_key = created_map_pair.first;
+ if (pending_map.gw_epoch.find(group_key) ==
+ pending_map.gw_epoch.end()) {
+ pending_map.gw_epoch[group_key] = pending_map.epoch;
+ dout(10) << "recreated gw epoch for group " << group_key
+ << " set epoch " << pending_map.epoch << dendl;
+ }
+ }
+}
+
void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t)
{
dout(10) << dendl;
ceph_assert(get_last_committed() + 1 == pending_map.epoch);
bufferlist bl;
uint64_t features = mon.get_quorum_con_features();
+ if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
+ recreate_gw_epoch();
+ }
pending_map.encode(bl, features);
- dout(10) << " has NVMEOFHA: "
- << HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOFHA) << dendl;
+ dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA)
+ << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl;
put_version(t, pending_map.epoch, bl);
put_last_committed(t, pending_map.epoch);
}
}
-void NVMeofGwMon::check_sub(Subscription *sub)
+bool NVMeofGwMon::get_gw_by_addr(const entity_addr_t &sub_addr,
+ NvmeGwId &gw_id, NvmeGroupKey& group_key)
+{
+ for (auto& created_map_pair: map.created_gws) {
+ group_key = created_map_pair.first;
+ NvmeGwMonStates& gw_created_map = created_map_pair.second;
+ for (auto& gw_created_pair: gw_created_map) {
+ gw_id = gw_created_pair.first;
+ if ((gw_created_pair.second.availability !=
+ gw_availability_t::GW_CREATED) &&
+ (gw_created_pair.second.addr_vect == entity_addrvec_t(sub_addr))) {
+ dout(10) << "found gw-vect " << gw_created_pair.second.addr_vect
+ << " GW " << gw_id << " group-key " << group_key << dendl;
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+/**
+ * check_sub_unconditional
+ *
+ * Unconditionally sends the next map to the subscription without referring
+ * to the gw_epoch map. Used until mon quorum supports NVMEOFHAMAP
+ */
+void NVMeofGwMon::check_sub_unconditional(Subscription *sub)
{
dout(10) << "sub->next , map-epoch " << sub->next
<< " " << map.epoch << dendl;
}
}
+void NVMeofGwMon::check_sub(Subscription *sub)
+{
+ NvmeGwId gw_id;
+ NvmeGroupKey group_key;
+ if (get_gw_by_addr(sub->session->con->get_peer_addr(),
+ gw_id, group_key)) {
+ dout(10) << "sub->next(epoch) " << sub->next << " map.gw_epoch "
+ << map.gw_epoch[group_key] << dendl;
+ if (sub->next <= map.gw_epoch[group_key]) {
+ dout(4) << "Send unicast map to GW "<< gw_id << dendl;
+ NVMeofGwMap unicast_map;
+ unicast_map.created_gws[group_key][gw_id]
+ = map.created_gws[group_key][gw_id];
+ // respond with a map slice correspondent to the same GW
+ unicast_map.epoch = map.gw_epoch[group_key];//map.epoch;
+ sub->session->con->send_message2(make_message<MNVMeofGwMap>(unicast_map));
+ if (sub->onetime) {
+ mon.session_map.remove_sub(sub);
+ } else {
+ sub->next = map.gw_epoch[group_key] + 1;
+ }
+ }
+ }
+}
+
void NVMeofGwMon::check_subs(bool t)
{
const std::string type = "NVMeofGw";
return;
}
for (auto sub : *(mon.session_map.subs[type])) {
- check_sub(sub);
+ dout(10) << " dump subscriber peer_addr : "
+ << sub->session->con->get_peer_addr() << dendl;
+ if (HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOFHAMAP)) {
+ check_sub(sub);
+ } else {
+ check_sub_unconditional(sub);
+ }
}
}
}
}
f->dump_unsigned("num gws", map.created_gws[group_key].size());
+ if (map.gw_epoch.find(group_key) != map.gw_epoch.end())
+ f->dump_unsigned("GW-epoch", map.gw_epoch[group_key]);
if (map.created_gws[group_key].size() == 0) {
f->close_section();
f->flush(rdata);
return false;
}
+epoch_t NVMeofGwMon::get_ack_map_epoch(bool gw_created,
+ const NvmeGroupKey& group_key) {
+ epoch_t rc;
+ if (!gw_created) {
+ rc = 0;
+ } else if (map.gw_epoch.find(group_key) != map.gw_epoch.end()) {
+ rc = map.gw_epoch[group_key];
+ } else { // feature bit NVMEOFHAMAP was not applied
+ rc = map.epoch;
+ }
+ return rc;
+}
bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
{
<< " GW : " << m->get_gw_id()
<< " osdmap_epoch " << m->get_last_osd_epoch()
<< " subsystems " << m->get_subsystems() << dendl;
-
+ ConnectionRef con = op->get_connection();
NvmeGwId gw_id = m->get_gw_id();
NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group());
gw_availability_t avail = m->get_availability();
bool propose = false;
bool nonce_propose = false;
bool timer_propose = false;
+ bool gw_propose = false;
bool gw_created = true;
NVMeofGwMap ack_map;
+ bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(),
+ NVMEOFHAMAP);
auto& group_gws = map.created_gws[group_key];
auto gw = group_gws.find(gw_id);
const BeaconSubsystems& sub = m->get_subsystems();
auto now = ceph::coarse_mono_clock::now();
+ int beacons_till_ack =
+ g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
+ bool apply_ack_logic = true;
+ bool send_ack = false;
if (avail == gw_availability_t::GW_CREATED) {
if (gw == group_gws.end()) {
gw_created = false;
dout(10) << "Warning: GW " << gw_id << " group_key " << group_key
- << " was not found in the map.Created_gws "
+ << " was not found in the map.created_gws "
<< map.created_gws << dendl;
goto set_propose;
} else {
- dout(4) << "GW prepares the full startup " << gw_id
- << " GW availability: "
+ dout(4) << "GW beacon: Created state - full startup done " << gw_id
+ << " GW state in monitor data-base : "
<< pending_map.created_gws[group_key][gw_id].availability
<< dendl;
if (pending_map.created_gws[group_key][gw_id].availability ==
dout(1) << " Warning :GW marked as Available in the NVmeofGwMon "
<< "database, performed full startup - Apply GW!"
<< gw_id << dendl;
- process_gw_down(gw_id, group_key, propose, avail);
+ process_gw_down(gw_id, group_key, gw_propose, avail);
LastBeacon lb = {gw_id, group_key};
last_beacon[lb] = now; //Update last beacon
} else if (
pending_map.created_gws[group_key][gw_id].performed_full_startup ==
false) {
pending_map.created_gws[group_key][gw_id].performed_full_startup = true;
- propose = true;
+ pending_map.gw_performed_startup(gw_id, group_key, gw_propose);
+ pending_map.created_gws[group_key][gw_id].addr_vect =
+ entity_addrvec_t(con->get_peer_addr());
}
goto set_propose;
}
// gw already created
- } else {
- // if GW reports Available but in monitor's database it is Unavailable
+ } else { // first GW beacon should come with avail = Created
+ // if GW reports Avail/Unavail but in monitor's database it is Unavailable
if (gw != group_gws.end()) {
// it means it did not perform "exit" after failover was set by
// NVMeofGWMon
gw_availability_t::GW_UNAVAILABLE) &&
(pending_map.created_gws[group_key][gw_id].performed_full_startup ==
false) &&
- avail == gw_availability_t::GW_AVAILABLE) {
+ (avail == gw_availability_t::GW_AVAILABLE ||
+ avail == gw_availability_t::GW_UNAVAILABLE )) {
ack_map.created_gws[group_key][gw_id] =
pending_map.created_gws[group_key][gw_id];
- ack_map.epoch = map.epoch;
- dout(1) << " Force gw to exit: Sending ack_map to GW: "
- << gw_id << dendl;
+ ack_map.epoch = get_ack_map_epoch(true, group_key);
+ dout(1) << " Force gw to exit: first beacon in state " << avail
+ << " GW " << gw_id << dendl;
auto msg = make_message<MNVMeofGwMap>(ack_map);
mon.send_reply(op, msg.detach());
goto false_return;
}
}
}
-
- // At this stage the gw has to be in the Created_gws
+ // Beacon from GW in !Created state but it does not appear in the map
if (gw == group_gws.end()) {
dout(4) << "GW that does not appear in the map sends beacon, ignore "
<< gw_id << dendl;
}
if (pending_map.created_gws[group_key][gw_id].availability ==
gw_availability_t::GW_DELETING) {
- dout(4) << "GW sends beacon in DELETING state, ignore "
+ dout(4) << "GW sends beacon in DELETING state, ignore it"
<< gw_id << dendl;
mon.no_reply(op);
goto false_return; // not sending ack to this beacon
}
+ if (epoch_filter_enabled &&
+ pending_map.created_gws[group_key][gw_id].addr_vect !=
+ entity_addrvec_t(con->get_peer_addr()) ) {
+ dout(4) << "Warning: entity addr need to set for GW client " << gw_id
+ << " was " << pending_map.created_gws[group_key][gw_id].addr_vect
+ << " now " << entity_addrvec_t(con->get_peer_addr()) << dendl;
+ pending_map.created_gws[group_key][gw_id].addr_vect =
+ entity_addrvec_t(con->get_peer_addr());
+ gw_propose = true;
+ }
// deep copy the whole nonce map of this GW
if (m->get_nonce_map().size()) {
if (pending_map.created_gws[group_key][gw_id].nonce_map !=
}
} else {
dout(10) << "Warning: received empty nonce map in the beacon of GW "
- << gw_id << " " << dendl;
+ << gw_id << " avail " << (int)avail << dendl;
}
if (sub.size() == 0) {
nonce_propose = true;
}
pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
- (map.epoch == m->get_last_gwmap_epoch());
+ (get_ack_map_epoch(true, group_key) == m->get_last_gwmap_epoch());
if (pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid ==
false) {
dout(20) << "map epoch of gw is not up-to-date " << gw_id
- << " epoch " << map.epoch
+ << " epoch " << get_ack_map_epoch(true, group_key)
<< " beacon_epoch " << m->get_last_gwmap_epoch() << dendl;
}
if (avail == gw_availability_t::GW_AVAILABLE) {
LastBeacon lb = {gw_id, group_key};
last_beacon[lb] = now;
epoch_t last_osd_epoch = m->get_last_osd_epoch();
- pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, propose);
+ pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, gw_propose);
// state set by GW client application
} else if (avail == gw_availability_t::GW_UNAVAILABLE ||
avail == gw_availability_t::GW_CREATED) {
- process_gw_down(gw_id, group_key, propose, avail);
+ process_gw_down(gw_id, group_key, gw_propose, avail);
}
// Periodic: check active FSM timers
pending_map.update_active_timers(timer_propose);
- propose |= timer_propose;
- propose |= nonce_propose;
-set_propose:
- if (!propose) {
+ set_propose:
+ propose |= (timer_propose | gw_propose | nonce_propose);
+ apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false;
+ if ( (apply_ack_logic &&
+ ((pending_map.created_gws[group_key][gw_id].beacon_index++
+ % beacons_till_ack) == 0))|| (!apply_ack_logic) ) {
+ send_ack = true;
+ if (apply_ack_logic) {
+ dout(10) << "ack sent: beacon index "
+ << pending_map.created_gws[group_key][gw_id].beacon_index
+ << " gw " << gw_id <<dendl;
+ }
+ }
+ if (send_ack && ((!gw_propose && epoch_filter_enabled) ||
+ (propose && !epoch_filter_enabled)) ) {
+ // if epoch-filter-bit: send ack to beacon in case no propose
+ //or if changed something not relevant to gw-epoch
if (gw_created) {
// respond with a map slice correspondent to the same GW
ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
}
- ack_map.epoch = map.epoch;
+ ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
+ if (!gw_created)
+ dout(10) << "gw not created, ack map "
+ << ack_map << " epoch " << ack_map.epoch << dendl;
dout(20) << "ack_map " << ack_map <<dendl;
auto msg = make_message<MNVMeofGwMap>(ack_map);
mon.send_reply(op, msg.detach());
{
os << " " << state_itr.first <<": " << state_itr.second << ",";
}
-
- os << "]\n"<< MODULE_PREFFIX << "availability " << value.availability
+ os << "]\n"<< MODULE_PREFFIX << " entity-addr : " << value.addr_vect
+ << " availability " << value.availability
<< " full-startup " << value.performed_full_startup << " ]";
return os;
}
inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) {
- os << "NVMeofGwMap [ Created_gws: ";
- for (auto& group_gws: value.created_gws) {
+ os << "\n" << MODULE_PREFFIX << "== NVMeofGwMap [ Created_gws: epoch "
+ << value.epoch;
+ for (auto& group_gws: value.gw_epoch) {
os << "\n" << MODULE_PREFFIX << "{ " << group_gws.first
- << " } -> { " << group_gws.second << " }";
+ << " } -> GW epoch: " << group_gws.second << " }";
+ }
+ for (auto& group_gws: value.created_gws) {
+ os << "\n" << MODULE_PREFFIX << "{ " << group_gws.first
+ << " } -> { " << group_gws.second << " }";
}
- os << "]";
return os;
}
if (HAVE_FEATURE(features, NVMEOFHA)) {
version = 2;
}
+ if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
+ version = 3;
+ }
ENCODE_START(version, version, bl);
encode ((uint32_t)gws.size(), bl); // number of gws in the group
for (auto& gw : gws) {
}
}
encode(gw.second.nonce_map, bl, features);
+ if (version >= 3) {
+ dout(20) << "encode addr_vect and beacon_index" << dendl;
+ gw.second.addr_vect.encode(bl, features);
+ encode(gw.second.beacon_index, bl);
+ }
}
ENCODE_FINISH(bl);
}
NvmeGwMonStates& gws, ceph::buffer::list::const_iterator &bl) {
gws.clear();
uint32_t num_created_gws;
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
dout(20) << "decode NvmeGwMonStates. struct_v: " << struct_v << dendl;
decode(num_created_gws, bl);
dout(20) << "decode NvmeGwMonStates. num gws " << num_created_gws << dendl;
}
}
decode(gw_created.nonce_map, bl);
+ if (struct_v >= 3) {
+ dout(20) << "decode addr_vect and beacon_index" << dendl;
+ gw_created.addr_vect.decode(bl);
+ decode(gw_created.beacon_index, bl);
+ }
+
gws[gw_name] = gw_created;
}
if (struct_v == 1) { //Fix allocations of states and blocklist_data
DECODE_FINISH(bl);
}
+inline void encode(const std::map<NvmeGroupKey, epoch_t>& gw_epoch,
+ ceph::bufferlist &bl) {
+ ENCODE_START(1, 1, bl);
+ encode ((uint32_t)gw_epoch.size(), bl); // number of groups
+ for (auto& group_epoch: gw_epoch) {
+ auto& group_key = group_epoch.first;
+ encode(group_key.first, bl); // pool
+ encode(group_key.second, bl); // group
+ encode(group_epoch.second, bl);
+ }
+ ENCODE_FINISH(bl);
+}
+
+inline void decode(std::map<NvmeGroupKey, epoch_t>& gw_epoch,
+ ceph::buffer::list::const_iterator &bl) {
+ gw_epoch.clear();
+ uint32_t ngroups;
+ DECODE_START(1, bl);
+ decode(ngroups, bl);
+ for(uint32_t i = 0; i<ngroups; i++){
+ std::string pool, group;
+ decode(pool, bl);
+ decode(group, bl);
+ epoch_t gepoch;
+ decode(gepoch, bl);
+ gw_epoch[std::make_pair(pool, group)] = gepoch;
+}
+ DECODE_FINISH(bl);
+}
+
inline void encode(
const std::map<NvmeGroupKey, NvmeGwMonStates>& created_gws,
ceph::bufferlist &bl, uint64_t features) {