]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
nvmeofgw*: Add mechanism to send maps to clients that need them
authorLeonid Chernin <leonidc@il.ibm.com>
Wed, 22 Jan 2025 17:59:31 +0000 (19:59 +0200)
committerleonidc <leonidc@il.ibm.com>
Sun, 16 Feb 2025 19:57:09 +0000 (21:57 +0200)
see comments in gw_epoch for details

Signed-off-by: Leonid Chernin <leonidc@il.ibm.com>
src/common/options/mon.yaml.in
src/include/ceph_features.h
src/mon/NVMeofGwMap.cc
src/mon/NVMeofGwMap.h
src/mon/NVMeofGwMon.cc
src/mon/NVMeofGwMon.h
src/mon/NVMeofGwSerialize.h
src/mon/NVMeofGwTypes.h

index 1307030e3fb96f9182e6f670a93c25bc5d8968ec..c3660319a8e940e03c0588c452df25bf54d7f153 100644 (file)
@@ -89,6 +89,11 @@ options:
     receives a monitor group ID assignment before the gateway is fully up during
     initialization, a retry is required.
   default: 1000
+- name: mon_nvmeofgw_beacons_till_ack
+  type: uint
+  level: advanced
+  default: 15
+  desc: Number of beacons from MonClient before NVMeofGwMon sends ack-map to it
   services:
   - mon
 - name: mon_nvmeofgw_delete_grace
index bceb9f5f4bff837aff9c2b18a26e6b8d4a85d399..49b95d906fe93d1155fde616471eea5fa9da8504 100644 (file)
@@ -148,6 +148,7 @@ DEFINE_CEPH_FEATURE(42, 1, MSGR_KEEPALIVE2)  // 4.3 (for consistency)
 DEFINE_CEPH_FEATURE(43, 1, OSD_POOLRESEND)   // 4.13
 DEFINE_CEPH_FEATURE(44, 2, NVMEOFHA)
 DEFINE_CEPH_FEATURE_RETIRED(45, 1, OSD_SET_ALLOC_HINT, JEWEL, LUMINOUS)
+DEFINE_CEPH_FEATURE(45, 2, NVMEOFHAMAP)
 // available
 DEFINE_CEPH_FEATURE(46, 1, OSD_FADVISE_FLAGS)
 DEFINE_CEPH_FEATURE_RETIRED(46, 1, OSD_REPOP, JEWEL, LUMINOUS) // overlap
@@ -226,6 +227,7 @@ DEFINE_CEPH_FEATURE_RETIRED(63, 1, RESERVED_BROKEN, LUMINOUS, QUINCY) // client-
         CEPH_FEATURE_MSGR_KEEPALIVE2 | \
         CEPH_FEATURE_OSD_POOLRESEND |  \
         CEPH_FEATUREMASK_NVMEOFHA | \
+        CEPH_FEATUREMASK_NVMEOFHAMAP | \
         CEPH_FEATURE_OSD_FADVISE_FLAGS |     \
         CEPH_FEATURE_MDS_QUOTA | \
          CEPH_FEATURE_CRUSH_V4 |            \
index 26c16740ecbad3c33ebb2235a6dc81e319e54965..581591e3260b46b5ad113344862dfe5c708cb741 100755 (executable)
@@ -83,6 +83,14 @@ int NVMeofGwMap::cfg_add_gw(
   const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
 {
   std::set<NvmeAnaGrpId> allocated;
+  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+    auto gw_epoch_it = gw_epoch.find(group_key);
+    if (gw_epoch_it == gw_epoch.end()) {
+      gw_epoch[group_key] = epoch;
+      dout(10) << "Allocated first gw_epoch : group_key "
+          << group_key << " epoch " << gw_epoch[group_key] << dendl;
+    }
+  }
   for (auto& itr: created_gws[group_key]) {
     allocated.insert(itr.second.ana_grp_id);
     if (itr.first == gw_id) {
@@ -190,8 +198,10 @@ int  NVMeofGwMap::do_erase_gw_id(const NvmeGwId &gw_id,
     fsm_timers.erase(group_key);
 
   created_gws[group_key].erase(gw_id);
-  if (created_gws[group_key].size() == 0)
+  if (created_gws[group_key].size() == 0) {
     created_gws.erase(group_key);
+    gw_epoch.erase(group_key);
+  }
   return 0;
 }
 
@@ -221,6 +231,23 @@ int NVMeofGwMap::do_delete_gw(
   return -EINVAL;
 }
 
+void  NVMeofGwMap::gw_performed_startup(const NvmeGwId &gw_id,
+      const NvmeGroupKey& group_key, bool &propose_pending)
+{
+  dout(4) << "GW  performed the full startup " << gw_id << dendl;
+  propose_pending = true;
+  increment_gw_epoch( group_key);
+}
+
+void NVMeofGwMap::increment_gw_epoch(const NvmeGroupKey& group_key)
+{
+  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+    gw_epoch[group_key] ++;
+    dout(4) << "incremented epoch of " << group_key
+         << " " << gw_epoch[group_key] << dendl;
+  }
+}
+
 int NVMeofGwMap::get_num_namespaces(const NvmeGwId &gw_id,
     const NvmeGroupKey& group_key,  const BeaconSubsystems&  subs)
 {
@@ -273,7 +300,10 @@ int NVMeofGwMap::process_gw_map_gw_no_subsys_no_listeners(
     gw_id, group_key, state_itr.second,state_itr.first, propose_pending);
     }
     propose_pending = true; // map should reflect that gw becames Created
-    if (propose_pending) validate_gw_map(group_key);
+    if (propose_pending) {
+      validate_gw_map(group_key);
+      increment_gw_epoch(group_key);
+    }
   } else {
     dout(1)  << __FUNCTION__ << "ERROR GW-id was not found in the map "
          << gw_id << dendl;
@@ -299,7 +329,10 @@ int NVMeofGwMap::process_gw_map_gw_down(
       state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE;
     }
     propose_pending = true; // map should reflect that gw becames Unavailable
-    if (propose_pending) validate_gw_map(group_key);
+    if (propose_pending) {
+      validate_gw_map(group_key);
+      increment_gw_epoch(group_key);
+    }
   } else {
     dout(1)  << __FUNCTION__ << "ERROR GW-id was not found in the map "
             << gw_id << dendl;
@@ -338,7 +371,10 @@ void NVMeofGwMap::process_gw_map_ka(
        state_itr.first, last_osd_epoch, propose_pending);
     }
   }
-  if (propose_pending) validate_gw_map(group_key);
+  if (propose_pending) {
+    validate_gw_map(group_key);
+    increment_gw_epoch(group_key);
+  }
 }
 
 void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
@@ -387,6 +423,7 @@ void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
     }
     if (propose) {
       validate_gw_map(group_key);
+      increment_gw_epoch(group_key);
     }
   }
 }
@@ -751,7 +788,10 @@ void NVMeofGwMap::fsm_handle_gw_delete(
            << "for GW " << gw_id  << dendl;
   }
   }
-  if (map_modified) validate_gw_map(group_key);
+  if (map_modified) {
+    validate_gw_map(group_key);
+    increment_gw_epoch(group_key);
+  }
 }
 
 void NVMeofGwMap::fsm_handle_to_expired(
@@ -817,7 +857,10 @@ void NVMeofGwMap::fsm_handle_to_expired(
     //another Trigger for GW down (failover)
     process_gw_map_gw_down(gw_id, group_key, map_modified);
   }
-  if (map_modified) validate_gw_map(group_key);
+  if (map_modified) {
+    validate_gw_map(group_key);
+    increment_gw_epoch(group_key);
+  }
 }
 
 struct CMonRequestProposal : public Context {
index 2fd38346869891011ea9a5623b0bc3cc0bbddaa9..232a44709af374ddb48b18ee3e8cf501161d088b 100755 (executable)
@@ -45,6 +45,21 @@ public:
 
   // map that handles timers started by all Gateway FSMs
   std::map<NvmeGroupKey, NvmeGwTimers> fsm_timers;
+  /**
+   * gw_epoch
+   *
+   * Mapping from NvmeGroupKey -> epoch_t e such that e is the most recent
+   * map epoch which affects NvmeGroupKey.
+   *
+   * The purpose of this map is to allow us to determine whether a particular
+   * gw needs to be sent the current map.  If a gw with NvmeGroupKey key already
+   * has map epoch e, we only need to send a new map if gw_epoch[key] > e.  See
+   * check_sub for this logic.
+   *
+   * Map mutators generally need to invoke increment_gw_epoch(group_key) when
+   * updating the map with a change affecting gws in group_key.
+   */
+  std::map<NvmeGroupKey, epoch_t> gw_epoch;
 
   void to_gmap(std::map<NvmeGroupKey, NvmeGwMonClientStates>& Gmap) const;
   void track_deleting_gws(const NvmeGroupKey& group_key,
@@ -70,6 +85,8 @@ public:
     NvmeAnaGrpId anagrpid, uint8_t value);
   void handle_gw_performing_fast_reboot(const NvmeGwId &gw_id,
        const NvmeGroupKey& group_key, bool &map_modified);
+  void gw_performed_startup(const NvmeGwId &gw_id,
+       const NvmeGroupKey& group_key, bool &propose_pending);
 private:
   int  do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
   int  do_erase_gw_id(const NvmeGwId &gw_id,
@@ -115,6 +132,7 @@ private:
     NvmeAnaGrpId anagrpid);
   void validate_gw_map(
     const NvmeGroupKey& group_key);
+  void increment_gw_epoch(const NvmeGroupKey& group_key);
 
 public:
   int blocklist_gw(
@@ -123,21 +141,31 @@ public:
 
   void encode(ceph::buffer::list &bl, uint64_t features) const {
     using ceph::encode;
-    ENCODE_START(1, 1, bl);
+    uint8_t version = 1;
+    if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
+       version = 2;
+    }
+    ENCODE_START(version, version, bl);
     encode(epoch, bl);// global map epoch
 
     encode(created_gws, bl, features); //Encode created GWs
     encode(fsm_timers, bl, features);
+    if (version >= 2) {
+      encode(gw_epoch, bl);
+    }
     ENCODE_FINISH(bl);
   }
 
   void decode(ceph::buffer::list::const_iterator &bl) {
     using ceph::decode;
-    DECODE_START(1, bl);
-    decode(epoch, bl);
+    DECODE_START(2, bl);
 
+    decode(epoch, bl);
     decode(created_gws, bl);
     decode(fsm_timers, bl);
+    if (struct_v >= 2) {
+      decode(gw_epoch, bl);
+    }
     DECODE_FINISH(bl);
   }
 
index 95360a51ec519e688b2bc1b592e2534bcb3b4c38..ccda0eeca6281fb63f1c65d67d310103a9bad1e2 100644 (file)
@@ -44,17 +44,20 @@ void NVMeofGwMon::synchronize_last_beacon()
           << " active " << is_active()  << dendl;
   // Initialize last_beacon to identify transitions of available
   // GWs to unavailable state
-  for (const auto& created_map_pair: map.created_gws) {
-    const autogroup_key = created_map_pair.first;
-    const NvmeGwMonStates& gw_created_map = created_map_pair.second;
-    for (const auto& gw_created_pair: gw_created_map) {
-      const auto& gw_id = gw_created_pair.first;
+  for (auto &created_map_pair: map.created_gws) {
+    const auto &group_key = created_map_pair.first;
+    NvmeGwMonStates& gw_created_map = created_map_pair.second;
+    for (auto& gw_created_pair: gw_created_map) {
+      auto& gw_id = gw_created_pair.first;
       if (gw_created_pair.second.availability ==
          gw_availability_t::GW_AVAILABLE) {
        dout(10) << "synchronize last_beacon for  GW :" << gw_id << dendl;
        LastBeacon lb = {gw_id, group_key};
        last_beacon[lb] = last_tick;
       }
+      // force send ack after nearest beacon after leader re-election
+      gw_created_pair.second.beacon_index =
+          g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
     }
   }
 }
@@ -165,15 +168,32 @@ void NVMeofGwMon::create_pending()
   dout(10) << " pending " << pending_map  << dendl;
 }
 
+void NVMeofGwMon::recreate_gw_epoch() {
+  //check pending map - if exists group_key but no gw_epoch[group_key]
+  //- create it and assign to epoch (offset)
+  for (auto& created_map_pair: pending_map.created_gws) {
+    auto group_key = created_map_pair.first;
+    if (pending_map.gw_epoch.find(group_key) ==
+        pending_map.gw_epoch.end()) {
+      pending_map.gw_epoch[group_key] = pending_map.epoch;
+      dout(10) << "recreated gw epoch for group " << group_key
+           << " set epoch " << pending_map.epoch << dendl;
+    }
+  }
+}
+
 void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t)
 {
   dout(10) << dendl;
   ceph_assert(get_last_committed() + 1 == pending_map.epoch);
   bufferlist bl;
   uint64_t features = mon.get_quorum_con_features();
+  if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
+    recreate_gw_epoch();
+  }
   pending_map.encode(bl, features);
-  dout(10) << " has NVMEOFHA: "
-          << HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOFHA) << dendl;
+  dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA)
+       << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl;
   put_version(t, pending_map.epoch, bl);
   put_last_committed(t, pending_map.epoch);
 
@@ -204,7 +224,33 @@ void NVMeofGwMon::update_from_paxos(bool *need_bootstrap)
   }
 }
 
-void NVMeofGwMon::check_sub(Subscription *sub)
+bool NVMeofGwMon::get_gw_by_addr(const entity_addr_t &sub_addr,
+     NvmeGwId &gw_id, NvmeGroupKey& group_key)
+{
+  for (auto& created_map_pair: map.created_gws) {
+     group_key = created_map_pair.first;
+     NvmeGwMonStates& gw_created_map = created_map_pair.second;
+     for (auto& gw_created_pair: gw_created_map) {
+       gw_id = gw_created_pair.first;
+       if ((gw_created_pair.second.availability !=
+          gw_availability_t::GW_CREATED) &&
+          (gw_created_pair.second.addr_vect == entity_addrvec_t(sub_addr))) {
+         dout(10) << "found gw-vect " << gw_created_pair.second.addr_vect
+               << " GW " << gw_id << " group-key " << group_key <<  dendl;
+         return true;
+       }
+     }
+  }
+  return false;
+}
+
+/**
+ * check_sub_unconditional
+ *
+ * Unconditionally sends the next map to the subscription without referring
+ * to the gw_epoch map.  Used until mon quorum supports NVMEOFHAMAP
+ */
+void NVMeofGwMon::check_sub_unconditional(Subscription *sub)
 {
   dout(10) << "sub->next , map-epoch " << sub->next
           << " " << map.epoch << dendl;
@@ -222,6 +268,31 @@ void NVMeofGwMon::check_sub(Subscription *sub)
   }
 }
 
+void NVMeofGwMon::check_sub(Subscription *sub)
+{
+  NvmeGwId gw_id;
+  NvmeGroupKey group_key;
+  if (get_gw_by_addr(sub->session->con->get_peer_addr(),
+      gw_id, group_key)) {
+    dout(10) << "sub->next(epoch) " << sub->next << " map.gw_epoch "
+       << map.gw_epoch[group_key] << dendl;
+    if (sub->next <= map.gw_epoch[group_key]) {
+      dout(4) << "Send unicast map to GW "<< gw_id << dendl;
+      NVMeofGwMap unicast_map;
+      unicast_map.created_gws[group_key][gw_id]
+          = map.created_gws[group_key][gw_id];
+      // respond with a map slice correspondent to the same GW
+      unicast_map.epoch =  map.gw_epoch[group_key];//map.epoch;
+      sub->session->con->send_message2(make_message<MNVMeofGwMap>(unicast_map));
+      if (sub->onetime) {
+        mon.session_map.remove_sub(sub);
+      } else {
+        sub->next = map.gw_epoch[group_key] + 1;
+      }
+    }
+  }
+}
+
 void NVMeofGwMon::check_subs(bool t)
 {
   const std::string type = "NVMeofGw";
@@ -231,7 +302,13 @@ void NVMeofGwMon::check_subs(bool t)
     return;
   }
   for (auto sub : *(mon.session_map.subs[type])) {
-    check_sub(sub);
+    dout(10) << " dump subscriber peer_addr : "
+       << sub->session->con->get_peer_addr() <<  dendl;
+    if (HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOFHAMAP)) {
+      check_sub(sub);
+    } else {
+      check_sub_unconditional(sub);
+    }
   }
 }
 
@@ -331,6 +408,8 @@ bool NVMeofGwMon::preprocess_command(MonOpRequestRef op)
       }
     }
     f->dump_unsigned("num gws", map.created_gws[group_key].size());
+    if (map.gw_epoch.find(group_key) != map.gw_epoch.end())
+      f->dump_unsigned("GW-epoch", map.gw_epoch[group_key]);
     if (map.created_gws[group_key].size() == 0) {
       f->close_section();
       f->flush(rdata);
@@ -513,6 +592,18 @@ bool NVMeofGwMon::preprocess_beacon(MonOpRequestRef op)
   return false;
 }
 
+epoch_t NVMeofGwMon::get_ack_map_epoch(bool gw_created,
+    const NvmeGroupKey& group_key) {
+  epoch_t rc;
+  if (!gw_created) {
+    rc = 0;
+  } else if (map.gw_epoch.find(group_key) != map.gw_epoch.end()) {
+    rc = map.gw_epoch[group_key];
+  } else { // feature bit NVMEOFHAMAP was not applied
+    rc = map.epoch;
+  }
+  return rc;
+}
 
 bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
 {
@@ -522,30 +613,37 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
           << " GW : " << m->get_gw_id()
           << " osdmap_epoch " << m->get_last_osd_epoch()
           << " subsystems " << m->get_subsystems() << dendl;
-
+  ConnectionRef con = op->get_connection();
   NvmeGwId gw_id = m->get_gw_id();
   NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(),  m->get_gw_group());
   gw_availability_t  avail = m->get_availability();
   bool propose = false;
   bool nonce_propose = false;
   bool timer_propose = false;
+  bool gw_propose    = false;
   bool gw_created = true;
   NVMeofGwMap ack_map;
+  bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(),
+                              NVMEOFHAMAP);
   auto& group_gws = map.created_gws[group_key];
   auto gw = group_gws.find(gw_id);
   const BeaconSubsystems& sub = m->get_subsystems();
   auto now = ceph::coarse_mono_clock::now();
+  int beacons_till_ack =
+        g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
+  bool apply_ack_logic = true;
+  bool send_ack =  false;
 
   if (avail == gw_availability_t::GW_CREATED) {
     if (gw == group_gws.end()) {
       gw_created = false;
       dout(10) << "Warning: GW " << gw_id << " group_key " << group_key
-              << " was not found in the  map.Created_gws "
+              << " was not found in the  map.created_gws "
               << map.created_gws << dendl;
       goto set_propose;
     } else {
-      dout(4) << "GW  prepares the full startup " << gw_id
-              << " GW availability: "
+      dout(4) << "GW beacon: Created state - full startup done " << gw_id
+              << " GW state in monitor data-base : "
               << pending_map.created_gws[group_key][gw_id].availability
               << dendl;
       if (pending_map.created_gws[group_key][gw_id].availability ==
@@ -553,20 +651,22 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
        dout(1) << " Warning :GW marked as Available in the NVmeofGwMon "
                << "database, performed full startup - Apply GW!"
                << gw_id << dendl;
-        process_gw_down(gw_id, group_key, propose, avail);
+        process_gw_down(gw_id, group_key, gw_propose, avail);
         LastBeacon lb = {gw_id, group_key};
         last_beacon[lb] = now; //Update last beacon
       } else if (
        pending_map.created_gws[group_key][gw_id].performed_full_startup ==
        false) {
        pending_map.created_gws[group_key][gw_id].performed_full_startup = true;
-       propose = true;
+       pending_map.gw_performed_startup(gw_id, group_key, gw_propose);
+       pending_map.created_gws[group_key][gw_id].addr_vect =
+           entity_addrvec_t(con->get_peer_addr());
       }
       goto set_propose;
     }
   // gw already created
-  } else {
-    // if GW reports Available but in monitor's database it is Unavailable
+  } else { // first GW beacon should come with avail = Created
+    // if GW reports Avail/Unavail but in monitor's database it is Unavailable
     if (gw != group_gws.end()) {
       // it means it did not perform "exit" after failover was set by
       // NVMeofGWMon
@@ -574,20 +674,20 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
           gw_availability_t::GW_UNAVAILABLE) &&
          (pending_map.created_gws[group_key][gw_id].performed_full_startup ==
           false) &&
-         avail == gw_availability_t::GW_AVAILABLE) {
+         (avail == gw_availability_t::GW_AVAILABLE ||
+          avail == gw_availability_t::GW_UNAVAILABLE )) {
        ack_map.created_gws[group_key][gw_id] =
          pending_map.created_gws[group_key][gw_id];
-       ack_map.epoch = map.epoch;
-       dout(1) << " Force gw to exit: Sending ack_map to GW: "
-               << gw_id << dendl;
+       ack_map.epoch = get_ack_map_epoch(true, group_key);
+       dout(1) << " Force gw to exit: first beacon in state " << avail
+               << " GW " << gw_id << dendl;
        auto msg = make_message<MNVMeofGwMap>(ack_map);
        mon.send_reply(op, msg.detach());
        goto false_return;
       }
     }
   }
-
-  // At this stage the gw has to be in the Created_gws
+  // Beacon from GW in !Created state but it does not appear in the map
   if (gw == group_gws.end()) {
     dout(4) << "GW that does not appear in the map sends beacon, ignore "
        << gw_id << dendl;
@@ -596,11 +696,21 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   }
   if (pending_map.created_gws[group_key][gw_id].availability ==
     gw_availability_t::GW_DELETING) {
-    dout(4) << "GW sends beacon in DELETING state, ignore "
+    dout(4) << "GW sends beacon in DELETING state, ignore it"
        << gw_id << dendl;
     mon.no_reply(op);
     goto false_return; // not sending ack to this beacon
   }
+  if (epoch_filter_enabled &&
+      pending_map.created_gws[group_key][gw_id].addr_vect !=
+      entity_addrvec_t(con->get_peer_addr()) ) {
+    dout(4) << "Warning: entity addr need to set for GW client " << gw_id
+      << " was " <<  pending_map.created_gws[group_key][gw_id].addr_vect
+      << " now " << entity_addrvec_t(con->get_peer_addr()) << dendl;
+    pending_map.created_gws[group_key][gw_id].addr_vect =
+      entity_addrvec_t(con->get_peer_addr());
+    gw_propose = true;
+  }
   // deep copy the whole nonce map of this GW
   if (m->get_nonce_map().size()) {
     if (pending_map.created_gws[group_key][gw_id].nonce_map !=
@@ -614,7 +724,7 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
     }
   } else {
     dout(10) << "Warning: received empty nonce map in the beacon of GW "
-            << gw_id << " " << dendl;
+            << gw_id << " avail " << (int)avail << dendl;
   }
 
   if (sub.size() == 0) {
@@ -641,11 +751,11 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
     nonce_propose = true;
   }
   pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
-    (map.epoch == m->get_last_gwmap_epoch());
+    (get_ack_map_epoch(true, group_key) == m->get_last_gwmap_epoch());
   if (pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid ==
       false) {
     dout(20) <<  "map epoch of gw is not up-to-date " << gw_id
-            << " epoch " << map.epoch
+            << " epoch " << get_ack_map_epoch(true, group_key)
             << " beacon_epoch " << m->get_last_gwmap_epoch() <<  dendl;
   }
   if (avail == gw_availability_t::GW_AVAILABLE) {
@@ -655,24 +765,40 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
     LastBeacon lb = {gw_id, group_key};
     last_beacon[lb] = now;
     epoch_t last_osd_epoch = m->get_last_osd_epoch();
-    pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, propose);
+    pending_map.process_gw_map_ka(gw_id, group_key, last_osd_epoch, gw_propose);
   // state set by GW client application
   } else if (avail == gw_availability_t::GW_UNAVAILABLE ||
       avail == gw_availability_t::GW_CREATED) {
-      process_gw_down(gw_id, group_key, propose, avail);
+      process_gw_down(gw_id, group_key, gw_propose, avail);
   }
   // Periodic: check active FSM timers
   pending_map.update_active_timers(timer_propose);
-  propose |= timer_propose;
-  propose |= nonce_propose;
 
-set_propose:
-  if (!propose) {
+ set_propose:
+  propose |= (timer_propose | gw_propose | nonce_propose);
+  apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false;
+  if ( (apply_ack_logic &&
+      ((pending_map.created_gws[group_key][gw_id].beacon_index++
+          % beacons_till_ack) == 0))|| (!apply_ack_logic) ) {
+    send_ack = true;
+    if (apply_ack_logic) {
+      dout(10) << "ack sent: beacon index "
+       << pending_map.created_gws[group_key][gw_id].beacon_index
+       << " gw " << gw_id <<dendl;
+    }
+  }
+  if (send_ack && ((!gw_propose && epoch_filter_enabled) ||
+                    (propose && !epoch_filter_enabled)) ) {
+          // if epoch-filter-bit: send ack to beacon in case no propose
+          //or if changed something not relevant to gw-epoch
     if (gw_created) {
       // respond with a map slice correspondent to the same GW
       ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
     }
-    ack_map.epoch = map.epoch;
+    ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
+    if (!gw_created)
+      dout(10) << "gw not created, ack map "
+                        << ack_map << " epoch " << ack_map.epoch << dendl;
     dout(20) << "ack_map " << ack_map <<dendl;
     auto msg = make_message<MNVMeofGwMap>(ack_map);
     mon.send_reply(op, msg.detach());
index d7f5fd89cde688000912e858a8d6a2aea01e4cd1..8cde2af54b1fb621a30e92d43c4452f022a1c1b2 100644 (file)
@@ -81,6 +81,7 @@ public:
 
   void check_subs(bool type);
   void check_sub(Subscription *sub);
+  void check_sub_unconditional(Subscription *sub);
 
   std::map<NvmeGroupKey, std::map<NvmeGwId, utime_t>> gws_deleting_time;
 
@@ -89,6 +90,10 @@ private:
   void process_gw_down(const NvmeGwId &gw_id,
      const NvmeGroupKey& group_key, bool &propose_pending,
      gw_availability_t avail);
+  bool get_gw_by_addr(const  entity_addr_t &sub_addr,
+       NvmeGwId &gw_id, NvmeGroupKey& group_key);
+  epoch_t get_ack_map_epoch(bool gw_created, const NvmeGroupKey& group_key);
+  void recreate_gw_epoch();
 };
 
 #endif /* MON_NVMEGWMONITOR_H_ */
index b10eac88c2fde307f964ddc7b5203beb5827ed6a..9d43f242ae8509552b0fcc2227c417ff152b0e96 100755 (executable)
@@ -182,8 +182,8 @@ inline std::ostream& print_gw_created_t(
   {
     os << " " << state_itr.first <<": " << state_itr.second << ",";
   }
-
-  os << "]\n"<< MODULE_PREFFIX << "availability " << value.availability
+  os << "]\n"<< MODULE_PREFFIX << " entity-addr : " << value.addr_vect
+     << " availability " << value.availability
      << " full-startup " << value.performed_full_startup  << " ]";
 
   return os;
@@ -224,12 +224,16 @@ inline std::ostream& operator<<(std::ostream& os, const NvmeGwMonStates value) {
 }
 
 inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) {
-  os << "NVMeofGwMap [ Created_gws: ";
-  for (auto& group_gws: value.created_gws) {
+  os <<  "\n" <<  MODULE_PREFFIX << "== NVMeofGwMap [ Created_gws: epoch "
+     << value.epoch;
+  for (auto& group_gws: value.gw_epoch) {
     os <<  "\n" <<  MODULE_PREFFIX  << "{ " << group_gws.first
-       << " } -> { " << group_gws.second << " }";
+       << " } -> GW epoch: " << group_gws.second << " }";
+  }
+  for (auto& group_gws: value.created_gws) {
+   os <<  "\n" <<  MODULE_PREFFIX  << "{ " << group_gws.first
+      << " } -> { " << group_gws.second << " }";
   }
-  os << "]";
   return os;
 }
 
@@ -449,6 +453,9 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
   if (HAVE_FEATURE(features, NVMEOFHA)) {
     version = 2;
   }
+  if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
+    version = 3;
+  }
   ENCODE_START(version, version, bl);
   encode ((uint32_t)gws.size(), bl); // number of gws in the group
   for (auto& gw : gws) {
@@ -492,6 +499,11 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
       }
     }
     encode(gw.second.nonce_map, bl, features);
+    if (version >= 3) {
+      dout(20) << "encode addr_vect and beacon_index" << dendl;
+      gw.second.addr_vect.encode(bl, features);
+      encode(gw.second.beacon_index, bl);
+    }
   }
   ENCODE_FINISH(bl);
 }
@@ -500,7 +512,7 @@ inline void decode(
   NvmeGwMonStates& gws, ceph::buffer::list::const_iterator &bl) {
   gws.clear();
   uint32_t num_created_gws;
-  DECODE_START(2, bl);
+  DECODE_START(3, bl);
   dout(20) << "decode NvmeGwMonStates. struct_v: " << struct_v << dendl;
   decode(num_created_gws, bl);
   dout(20) << "decode NvmeGwMonStates. num gws  " << num_created_gws << dendl;
@@ -573,6 +585,12 @@ inline void decode(
       }
     }
     decode(gw_created.nonce_map, bl);
+    if (struct_v >= 3) {
+      dout(20) << "decode addr_vect and beacon_index" << dendl;
+      gw_created.addr_vect.decode(bl);
+      decode(gw_created.beacon_index, bl);
+    }
+
     gws[gw_name] = gw_created;
   }
   if (struct_v == 1) {  //Fix allocations of states and blocklist_data
@@ -590,6 +608,36 @@ inline void decode(
   DECODE_FINISH(bl);
 }
 
+inline void encode(const std::map<NvmeGroupKey, epoch_t>& gw_epoch,
+                   ceph::bufferlist &bl) {
+  ENCODE_START(1, 1, bl);
+  encode ((uint32_t)gw_epoch.size(), bl); // number of groups
+  for (auto& group_epoch: gw_epoch) {
+    auto& group_key = group_epoch.first;
+    encode(group_key.first, bl); // pool
+    encode(group_key.second, bl); // group
+    encode(group_epoch.second, bl);
+  }
+  ENCODE_FINISH(bl);
+}
+
+inline void decode(std::map<NvmeGroupKey, epoch_t>& gw_epoch,
+                   ceph::buffer::list::const_iterator &bl) {
+  gw_epoch.clear();
+  uint32_t ngroups;
+  DECODE_START(1, bl);
+  decode(ngroups, bl);
+  for(uint32_t i = 0; i<ngroups; i++){
+    std::string pool, group;
+    decode(pool, bl);
+    decode(group, bl);
+    epoch_t gepoch;
+    decode(gepoch, bl);
+    gw_epoch[std::make_pair(pool, group)] = gepoch;
+}
+  DECODE_FINISH(bl);
+}
+
 inline void encode(
   const std::map<NvmeGroupKey, NvmeGwMonStates>& created_gws,
   ceph::bufferlist &bl, uint64_t features) {
index 2dd3e11ba3ab7310d57a938976f18b736a23403f..667479696f9c693208b088a77490085fb14f3403 100755 (executable)
@@ -128,7 +128,9 @@ struct NvmeGwMonState {
   // state machine states per ANA group
   SmState sm_state;
   BlocklistData blocklist_data;
-
+  //ceph entity address allocated for the GW-client that represents this GW-id
+  entity_addrvec_t addr_vect;
+  uint16_t beacon_index = 0;
   NvmeGwMonState(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}
 
   NvmeGwMonState(NvmeAnaGrpId id)