]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon/OSDMonitor: add send_pg_create() to OSDMonitor
authorKefu Chai <kchai@redhat.com>
Thu, 9 Mar 2017 04:08:29 +0000 (12:08 +0800)
committerKefu Chai <kchai@redhat.com>
Thu, 30 Mar 2017 12:21:17 +0000 (20:21 +0800)
OSDMonitor will handle the pg-create subscriptions after luminous.
1. scan new pools to get the pgs to be created
2. send pg creates using the collected pgs
3. trim the creating_pgs using the "created!" messages from OSD.

please note that we need to wait for the OSDMonitor::mapping to be fully
populated, so we cannot scan the incrementa map for creating pgs until
it is applied and accepted by other monitors.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/mon/CreatingPGs.h [new file with mode: 0644]
src/mon/Monitor.cc
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/mon/PGMonitor.cc
src/test/encoding/types.h

diff --git a/src/mon/CreatingPGs.h b/src/mon/CreatingPGs.h
new file mode 100644 (file)
index 0000000..7c909a5
--- /dev/null
@@ -0,0 +1,59 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+#include <set>
+#include "include/encoding.h"
+
+struct creating_pgs_t {
+  epoch_t last_scan_epoch = 0;
+  std::map<pg_t, std::pair<epoch_t, utime_t> > pgs;
+  std::set<int64_t> created_pools;
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(last_scan_epoch, bl);
+    ::encode(pgs, bl);
+    ::encode(created_pools, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    ::decode(last_scan_epoch, bl);
+    ::decode(pgs, bl);
+    ::decode(created_pools, bl);
+    DECODE_FINISH(bl);
+  }
+  void dump(ceph::Formatter *f) const {
+    f->open_object_section("creating_pgs");
+    f->dump_unsigned("last_scan_epoch", last_scan_epoch);
+    for (auto& pg : pgs) {
+      f->open_object_section("pg");
+      f->dump_stream("pgid") << pg.first;
+      f->dump_unsigned("epoch", pg.second.first);
+      f->dump_stream("ctime") << pg.second.second;
+      f->close_section();
+    }
+    f->close_section();
+    f->open_array_section("created_pools");
+    for (auto pool : created_pools) {
+      f->dump_unsigned("pool", pool);
+    }
+    f->close_section();
+  }
+  static void generate_test_instances(list<creating_pgs_t*>& o) {
+    auto c = new creating_pgs_t;
+    c->last_scan_epoch = 17;
+    c->pgs.emplace(pg_t{42, 2}, make_pair(31, utime_t{891, 113}));
+    c->pgs.emplace(pg_t{44, 2}, make_pair(31, utime_t{891, 113}));
+    c->created_pools = {0, 1};
+    o.push_back(c);
+    c = new creating_pgs_t;
+    c->last_scan_epoch = 18;
+    c->pgs.emplace(pg_t{42, 3}, make_pair(31, utime_t{891, 113}));
+    c->created_pools = {};
+    o.push_back(c);
+  }
+};
+WRITE_CLASS_ENCODER(creating_pgs_t);
index 657d22cff4cc1eaf7a95135db5161df923755eaf..19657f85d0a724152eaefebd60778c7f21f589aa 100644 (file)
@@ -4445,7 +4445,12 @@ void Monitor::handle_subscribe(MonOpRequestRef op)
       }
     } else if (p->first == "osd_pg_creates") {
       if ((int)s->is_capable("osd", MON_CAP_W)) {
-       pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
+       if (monmap->get_required_features().contains_all(
+             ceph::features::mon::FEATURE_LUMINOUS)) {
+         osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
+       } else {
+         pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
+       }
       }
     } else if (p->first == "monmap") {
       monmon()->check_sub(s->sub_map[p->first]);
index d5a48db1ec4ef985127e720d6d84bda6b5401148..020c88ac15fa40edf8cfad38d36b39b00c557122 100644 (file)
@@ -41,6 +41,7 @@
 #include "messages/MOSDAlive.h"
 #include "messages/MPoolOp.h"
 #include "messages/MPoolOpReply.h"
+#include "messages/MOSDPGCreate.h"
 #include "messages/MOSDPGTemp.h"
 #include "messages/MMonCommand.h"
 #include "messages/MRemoveSnaps.h"
 #include "include/str_map.h"
 
 #define dout_subsys ceph_subsys_mon
+#define OSD_PG_CREATING_PREFIX "osd_pg_creating"
 
-struct C_PrintTime : public Context {
+struct C_UpdateCreatingPGs : public Context {
+  OSDMonitor *osdmon;
   utime_t start;
   epoch_t epoch;
-  C_PrintTime(epoch_t e) : start(ceph_clock_now()), epoch(e) {}
+  C_UpdateCreatingPGs(OSDMonitor *osdmon, epoch_t e) :
+    osdmon(osdmon), start(ceph_clock_now()), epoch(e) {}
   void finish(int r) override {
     if (r >= 0) {
       utime_t end = ceph_clock_now();
       dout(10) << "osdmap epoch " << epoch << " mapping took "
               << (end - start) << " seconds" << dendl;
+      osdmon->update_creating_pgs();
+      osdmon->check_pg_creates_subs();
     }
   }
 };
@@ -164,6 +170,12 @@ void OSDMonitor::create_initial()
   dout(20) << " full crc " << pending_inc.full_crc << dendl;
 }
 
+void OSDMonitor::get_store_prefixes(std::set<string>& s)
+{
+  s.insert(service_name);
+  s.insert(OSD_PG_CREATING_PREFIX);
+}
+
 void OSDMonitor::update_from_paxos(bool *need_bootstrap)
 {
   version_t version = get_last_committed();
@@ -227,6 +239,16 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
     osdmap.decode(latest_bl);
   }
 
+  if (mon->monmap->get_required_features().contains_all(
+       ceph::features::mon::FEATURE_LUMINOUS)) {
+    bufferlist bl;
+    mon->store->get(OSD_PG_CREATING_PREFIX, "creating", bl);
+    auto p = bl.begin();
+    std::lock_guard<Spinlock> l(creating_pgs_lock);
+    creating_pgs.decode(p);
+    dout(7) << __func__ << " loading creating_pgs e" << creating_pgs.last_scan_epoch << dendl;
+  }
+
   // walk through incrementals
   MonitorDBStore::TransactionRef t;
   size_t tx_size = 0;
@@ -292,6 +314,10 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
       t = MonitorDBStore::TransactionRef();
       tx_size = 0;
     }
+    if (mon->monmap->get_required_features().contains_all(
+          ceph::features::mon::FEATURE_LUMINOUS)) {
+      creating_pgs = update_pending_creatings(inc);
+    }
   }
 
   if (t) {
@@ -320,13 +346,13 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap)
   /** we don't have any of the feature bit infrastructure in place for
    * supporting primary_temp mappings without breaking old clients/OSDs.*/
   assert(g_conf->mon_osd_allow_primary_temp || osdmap.primary_temp->empty());
-
   if (mon->is_leader()) {
     // kick pgmon, make sure it's seen the latest map
     mon->pgmon()->check_osd_map(osdmap.epoch);
   }
 
   check_osdmap_subs();
+  check_pg_creates_subs();
 
   share_map_with_random_osd();
   update_logger();
@@ -350,7 +376,7 @@ void OSDMonitor::start_mapping()
             << dendl;
     mapping_job->abort();
   }
-  auto *fin = new C_PrintTime(osdmap.get_epoch());
+  auto fin = new C_UpdateCreatingPGs(this, osdmap.get_epoch());
   mapping_job = mapping.start_update(osdmap, mapper,
                                     g_conf->mon_osd_mapping_pgs_per_chunk);
   dout(10) << __func__ << " started mapping job " << mapping_job.get()
@@ -1011,6 +1037,40 @@ void OSDMonitor::create_pending()
   }
 }
 
+creating_pgs_t
+OSDMonitor::update_pending_creatings(const OSDMap::Incremental& inc)
+{
+  creating_pgs_t pending_creatings;
+  {
+    std::lock_guard<Spinlock> l(creating_pgs_lock);
+    pending_creatings = creating_pgs;
+  }
+  if (pending_creatings.last_scan_epoch > inc.epoch) {
+    return pending_creatings;
+  }
+  for (auto& pg : pending_created_pgs) {
+    pending_creatings.created_pools.insert(pg.pool());
+    pending_creatings.pgs.erase(pg);
+  }
+  for (auto old_pool : inc.old_pools) {
+    pending_creatings.created_pools.erase(old_pool);
+    const auto removed_pool = (uint64_t)old_pool;
+    auto first =
+      pending_creatings.pgs.lower_bound(pg_t{0, removed_pool});
+    auto last =
+      pending_creatings.pgs.lower_bound(pg_t{0, removed_pool + 1});
+    pending_creatings.pgs.erase(first, last);
+  }
+  scan_for_creating_pgs(osdmap.get_pools(),
+                       inc.old_pools,
+                       &pending_creatings);
+  scan_for_creating_pgs(inc.new_pools,
+                       inc.old_pools,
+                       &pending_creatings);
+  pending_creatings.last_scan_epoch = osdmap.get_epoch();
+  return pending_creatings;
+}
+
 void OSDMonitor::maybe_prime_pg_temp()
 {
   bool all = false;
@@ -1308,6 +1368,32 @@ void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
     t->erase(OSD_METADATA_PREFIX, stringify(*p));
   pending_metadata.clear();
   pending_metadata_rm.clear();
+
+  // and pg creating, also!
+  if (mon->monmap->get_required_features().contains_all(
+       ceph::features::mon::FEATURE_LUMINOUS)) {
+    auto pending_creatings = update_pending_creatings(pending_inc);
+    if (!osdmap.test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+      dout(7) << __func__ << " in the middle of upgrading, "
+             << " trimming pending creating_pgs using pgmap" << dendl;
+      trim_creating_pgs(&pending_creatings, mon->pgmon()->pg_map);
+    }
+    bufferlist creatings_bl;
+    ::encode(pending_creatings, creatings_bl);
+    t->put(OSD_PG_CREATING_PREFIX, "creating", creatings_bl);
+  }
+}
+
+void OSDMonitor::trim_creating_pgs(creating_pgs_t* creating_pgs,
+                                  const PGMap& pgm)
+{
+  for (auto& pg : pgm.pg_stat) {
+    auto created = creating_pgs->pgs.find(pg.first);
+    if (created != creating_pgs->pgs.end()) {
+      creating_pgs->pgs.erase(created);
+      creating_pgs->created_pools.insert(pg.first.pool());
+    }
+  }
 }
 
 int OSDMonitor::load_metadata(int osd, map<string, string>& m, ostream *err)
@@ -2967,6 +3053,9 @@ epoch_t OSDMonitor::blacklist(const entity_addr_t& a, utime_t until)
 void OSDMonitor::check_osdmap_subs()
 {
   dout(10) << __func__ << dendl;
+  if (!osdmap.get_epoch()) {
+    return;
+  }
   auto osdmap_subs = mon->session_map.subs.find("osdmap");
   if (osdmap_subs == mon->session_map.subs.end()) {
     return;
@@ -2995,6 +3084,144 @@ void OSDMonitor::check_osdmap_sub(Subscription *sub)
   }
 }
 
+void OSDMonitor::check_pg_creates_subs()
+{
+  if (!mon->monmap->get_required_features().contains_all(
+       ceph::features::mon::FEATURE_LUMINOUS)) {
+    // PGMonitor takes care of this in pre-luminous era.
+    return;
+  }
+  if (!osdmap.get_num_up_osds()) {
+    return;
+  }
+  assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
+  mon->with_session_map([this](const MonSessionMap& session_map) {
+      auto pg_creates_subs = session_map.subs.find("osd_pg_creates");
+      if (pg_creates_subs == session_map.subs.end()) {
+       return;
+      }
+      for (auto sub : *pg_creates_subs->second) {
+       check_pg_creates_sub(sub);
+      }
+    });
+}
+
+void OSDMonitor::check_pg_creates_sub(Subscription *sub)
+{
+  dout(20) << __func__ << " .. " << sub->session->inst << dendl;
+  assert(sub->type == "osd_pg_creates");
+  // only send these if the OSD is up.  we will check_subs() when they do
+  // come up so they will get the creates then.
+  if (sub->session->inst.name.is_osd() &&
+      mon->osdmon()->osdmap.is_up(sub->session->inst.name.num())) {
+    sub->next = send_pg_creates(sub->session->inst.name.num(),
+                               sub->session->con.get(),
+                               sub->next);
+  }
+}
+
+void OSDMonitor::scan_for_creating_pgs(const map<int64_t,pg_pool_t>& pools,
+                                      const set<int64_t>& removed_pools,
+                                      creating_pgs_t* creating_pgs) const
+{
+  for (auto& p : pools) {
+    int64_t poolid = p.first;
+    const pg_pool_t& pool = p.second;
+    int ruleno = osdmap.crush->find_rule(pool.get_crush_ruleset(),
+                                        pool.get_type(), pool.get_size());
+    if (ruleno < 0 || !osdmap.crush->rule_exists(ruleno))
+      continue;
+
+    const auto last_scan_epoch = creating_pgs->last_scan_epoch;
+    const auto created = pool.get_last_change();
+    if (last_scan_epoch && created <= last_scan_epoch) {
+      dout(10) << __func__ << " no change in pool " << poolid
+              << " " << pool << dendl;
+      continue;
+    }
+    if (removed_pools.count(poolid)) {
+      dout(10) << __func__ << " pool is being removed: " << poolid
+              << " " << pool << dendl;
+      continue;
+    }
+    dout(10) << __func__ << " scanning pool " << poolid
+            << " " << pool << dendl;
+    if (creating_pgs->created_pools.count(poolid)) {
+      // split pgs are skipped by OSD, so drop it early.
+      continue;
+    }
+    // first pgs in this pool
+    for (ps_t ps = 0; ps < pool.get_pg_num(); ps++) {
+      const pg_t pgid{ps, static_cast<uint64_t>(poolid)};
+      if (creating_pgs->pgs.count(pgid)) {
+       dout(20) << __func__ << " already have " << pgid << dendl;
+       continue;
+      }
+      creating_pgs->pgs.emplace(pgid, make_pair(created, ceph_clock_now()));
+      dout(10) << __func__ << " adding " << pgid
+              << " at " << osdmap.get_epoch() << dendl;
+    }
+  }
+}
+
+void OSDMonitor::update_creating_pgs()
+{
+  creating_pgs_by_osd_epoch.clear();
+  std::lock_guard<Spinlock> l(creating_pgs_lock);
+  for (const auto& pg : creating_pgs.pgs) {
+    int acting_primary = -1;
+    auto pgid = pg.first;
+    auto created = pg.second.first;
+    mapping.get(pgid, nullptr, nullptr, nullptr, &acting_primary);
+    if (acting_primary >= 0) {
+      dout(10) << __func__ << " will instruct osd." << acting_primary
+              << " to create " << pgid << dendl;
+      creating_pgs_by_osd_epoch[acting_primary][created].insert(pgid);
+    }
+  }
+  creating_pgs_epoch = mapping.get_epoch();
+}
+
+epoch_t OSDMonitor::send_pg_creates(int osd, Connection *con, epoch_t next)
+{
+  dout(30) << __func__ << " osd." << osd << " next=" << next
+          << " " << creating_pgs_by_osd_epoch << dendl;
+  auto creating_pgs_by_epoch = creating_pgs_by_osd_epoch.find(osd);
+  if (creating_pgs_by_epoch == creating_pgs_by_osd_epoch.end())
+    return next;
+  assert(!creating_pgs_by_epoch->second.empty());
+
+  MOSDPGCreate *m = nullptr;
+  epoch_t last = 0;
+  for (auto epoch_pgs = creating_pgs_by_epoch->second.lower_bound(next);
+       epoch_pgs != creating_pgs_by_epoch->second.end(); ++epoch_pgs) {
+    auto epoch = epoch_pgs->first;
+    auto& pgs = epoch_pgs->second;
+    dout(20) << __func__ << " osd." << osd << " from " << next
+             << " : epoch " << epoch << " " << pgs.size() << " pgs" << dendl;
+    last = epoch;
+    for (auto& pg : pgs) {
+      if (!m)
+       m = new MOSDPGCreate(creating_pgs_epoch);
+      // Need the create time from the monitor using its clock to set
+      // last_scrub_stamp upon pg creation.
+      const auto& creation = creating_pgs.pgs[pg];
+      m->mkpg[pg] = pg_create_t{creation.first, pg, 0};
+      m->ctimes[pg] = creation.second;
+      dout(20) << __func__ << " will create " << pg
+              << " at " << creation.first << dendl;
+    }
+  }
+  if (!m) {
+    dout(20) << __func__ << " osd." << osd << " from " << next
+             << " has nothing to send" << dendl;
+    return next;
+  }
+  con->send_message(m);
+  // sub is current through last + 1
+  return last + 1;
+}
+
 // TICK
 
 
index 01428ef457b00319129c1c65e3000464cc6a4977..ad6966021ac936bf3b4d8f61b97d10547b41437f 100644 (file)
@@ -32,6 +32,7 @@ using namespace std;
 #include "osd/OSDMap.h"
 #include "osd/OSDMapMapping.h"
 
+#include "CreatingPGs.h"
 #include "PaxosService.h"
 
 class Monitor;
@@ -148,6 +149,8 @@ public:
   // svc
 public:  
   void create_initial() override;
+  void get_store_prefixes(std::set<string>& s) override;
+
 private:
   void update_from_paxos(bool *need_bootstrap) override;
   void create_pending() override;  // prepare a new pending
@@ -420,6 +423,24 @@ private:
   bool preprocess_beacon(MonOpRequestRef op);
   bool prepare_beacon(MonOpRequestRef op);
 
+  friend class C_UpdateCreatingPGs;
+  std::map<int, std::map<epoch_t, std::set<pg_t>>> creating_pgs_by_osd_epoch;
+  std::vector<pg_t> pending_created_pgs;
+  // the epoch when the pg mapping was calculated
+  epoch_t creating_pgs_epoch = 0;
+  creating_pgs_t creating_pgs;
+  Spinlock creating_pgs_lock;
+
+  creating_pgs_t update_pending_creatings(const OSDMap::Incremental& inc);
+  void trim_creating_pgs(creating_pgs_t *creating_pgs, const PGMap& pgm);
+  void scan_for_creating_pgs(const std::map<int64_t,pg_pool_t>& pools,
+                            const std::set<int64_t>& removed_pools,
+                            creating_pgs_t* creating_pgs) const;
+  pair<int32_t, pg_t> get_parent_pg(pg_t pgid) const;
+  void update_creating_pgs();
+  void check_pg_creates_subs();
+  epoch_t send_pg_creates(int osd, Connection *con, epoch_t next);
+
 public:
   OSDMonitor(CephContext *cct, Monitor *mn, Paxos *p, const string& service_name);
 
@@ -456,6 +477,7 @@ public:
   void print_nodes(Formatter *f);
 
   void check_osdmap_sub(Subscription *sub);
+  void check_pg_creates_sub(Subscription *sub);
 
   void add_flag(int flag) {
     if (!(osdmap.flags & flag)) {
index 001dc34af8d4355796dff9dd8d63d1693eed6605..e360303c8110dfb91863335f4da1315a546e437c 100644 (file)
@@ -231,6 +231,11 @@ void PGMonitor::post_paxos_update()
 {
   dout(10) << __func__ << dendl;
   OSDMap& osdmap = mon->osdmon()->osdmap;
+  if (mon->monmap->get_required_features().contains_all(
+       ceph::features::mon::FEATURE_LUMINOUS)) {
+    // let OSDMonitor take care of the pg-creates subscriptions.
+    return;
+  }
   if (osdmap.get_epoch()) {
     if (osdmap.get_num_up_osds() > 0) {
       assert(osdmap.get_up_osd_features() & CEPH_FEATURE_MON_STATEFUL_SUB);
index a3736bdffc5575cb7c7cf9f72a3816afb456d3b5..6be33fff761528c25ebd8c9961f44f9a5d2d0b66 100644 (file)
@@ -157,6 +157,9 @@ TYPE(MonCap)
 #include "mon/mon_types.h"
 TYPE(LevelDBStoreStats)
 
+#include "mon/CreatingPGs.h"
+TYPE(creating_pgs_t)
+
 #include "os/filestore/DBObjectMap.h"
 TYPE(DBObjectMap::_Header)
 TYPE(DBObjectMap::State)