]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: enable crimson-osd to boot
authorKefu Chai <kchai@redhat.com>
Fri, 11 Jan 2019 10:47:39 +0000 (18:47 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 18 Jan 2019 04:42:08 +0000 (12:42 +0800)
* add state.h to encapsulate the state represeting different stages
  related to booting an OSD. the boot process of an OSD can be blocked
  by
  - waiting for PG consuming updated osdmaps
  - waiting for osdmaps marking osd.{whoami} up
  - waiting for new osdmaps to bring this osd up to speed.
  - waiting for current OSD to be healthy
  we could chain these "waits" in a more seastarized way, and let
OSD::start() wait on the future returned by this chain. but that'd
requires adding some seastar::shard_future<> as member variables of
`OSD` class, which is a little bit more convoluted than the state
machine approach used in this change. we could switch over to the
`future<>` chain approach, if we found that these futures could have
more consumers than merely `OSD::start()`.
* all osdmaps are now stored in an `std::map` in `OSD`, we can
  improve it by
  - caching it using an LRU cache
  - trimming the stale ones
  - persisting the evicted maps into the meta collection in ObjectStore
* superblock is not persited to store, neither is it read from the
  store.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/state.h [new file with mode: 0644]

index a807a61bb2176e161b2ab484a826390e1bd6d11f..afde4ef1ddeb70a6104aa3d0fca4d759c742f95c 100644 (file)
@@ -1,5 +1,7 @@
 #include "osd.h"
 
+#include "messages/MOSDBoot.h"
+#include "messages/MOSDMap.h"
 #include "crimson/net/Connection.h"
 #include "crimson/net/SocketMessenger.h"
 
@@ -7,6 +9,12 @@ namespace {
   seastar::logger& logger() {
     return ceph::get_logger(ceph_subsys_osd);
   }
+
+  template<typename Message, typename... Args>
+  Ref<Message> make_message(Args&&... args)
+  {
+    return {new Message{std::forward<Args>(args)...}, false};
+  }
 }
 
 using ceph::common::local_conf;
@@ -29,6 +37,7 @@ OSD::OSD(int id, uint32_t nonce)
   }
   dispatchers.push_front(this);
   dispatchers.push_front(&monc);
+  osdmaps[0] = seastar::make_lw_shared<OSDMap>();
 }
 
 OSD::~OSD() = default;
@@ -36,17 +45,85 @@ OSD::~OSD() = default;
 seastar::future<> OSD::start()
 {
   logger().info("start");
-  return client_msgr.start(&dispatchers).then([this] {
+  auto& conf = local_conf();
+  store = std::make_unique<CyanStore>(conf.get_val<std::string>("osd_data"));
+  return read_superblock().then([this] {
+    osdmap = get_map(superblock.current_epoch);
+    return client_msgr->start(&dispatchers);
+  }).then([this] {
     return monc.start();
   }).then([this] {
+    monc.sub_want("osd_pg_creates", last_pg_create_epoch, 0);
     monc.sub_want("mgrmap", 0, 0);
     monc.sub_want("osdmap", 0, 0);
     return monc.renew_subs();
+  }).then([this] {
+    return start_boot();
+  });
+}
+
+seastar::future<> OSD::start_boot()
+{
+  state.set_preboot();
+  return monc.get_version("osdmap").then([this](version_t newest, version_t oldest) {
+    return _preboot(newest, oldest);
   });
 }
 
+seastar::future<> OSD::_preboot(version_t newest, version_t oldest)
+{
+  if (osdmap->get_epoch() == 0) {
+    logger().warn("waiting for initial osdmap");
+  } else if (osdmap->is_destroyed(whoami)) {
+    logger().warn("osdmap says I am destroyed");
+    // provide a small margin so we don't livelock seeing if we
+    // un-destroyed ourselves.
+    if (osdmap->get_epoch() > newest - 1) {
+      throw std::runtime_error("i am destroyed");
+    }
+  } else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) {
+    logger().warn("osdmap NOUP flag is set, waiting for it to clear");
+  } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
+    logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it");
+  } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+    logger().error("osdmap require_osd_release < luminous; please upgrade to luminous");
+  } else if (false) {
+    // TODO: update mon if current fullness state is different from osdmap
+  } else if (version_t n = local_conf()->osd_map_message_max;
+             osdmap->get_epoch() >= oldest - 1 &&
+             osdmap->get_epoch() + n > newest) {
+    return _send_boot();
+  }
+  // get all the latest maps
+  if (osdmap->get_epoch() + 1 >= oldest) {
+    return osdmap_subscribe(osdmap->get_epoch() + 1, false);
+  } else {
+    return osdmap_subscribe(oldest - 1, true);
+  }
+}
+
+seastar::future<> OSD::_send_boot()
+{
+  state.set_booting();
+
+  entity_addrvec_t hb_back_addrs;
+  entity_addrvec_t hb_front_addrs;
+  entity_addrvec_t cluster_addrs;
+
+  auto m = make_message<MOSDBoot>(superblock,
+                                  osdmap->get_epoch(),
+                                  osdmap->get_epoch(),
+                                  hb_back_addrs,
+                                  hb_front_addrs,
+                                  cluster_addrs,
+                                  CEPH_FEATURES_ALL);
+  return monc.send_message(m);
+}
+
 seastar::future<> OSD::stop()
 {
+  // see also OSD::shutdown()
+  state.set_stopping();
   return gate.close().then([this] {
     return monc.stop();
   }).then([this] {
@@ -57,7 +134,16 @@ seastar::future<> OSD::stop()
 seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
 {
   logger().info("ms_dispatch {}", *m);
-  return seastar::now();
+  if (state.is_stopping()) {
+    return seastar::now();
+  }
+
+  switch (m->get_type()) {
+  case CEPH_MSG_OSD_MAP:
+    return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+  default:
+    return seastar::now();
+  }
 }
 
 seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn)
@@ -81,3 +167,205 @@ seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn)
   logger().warn("ms_handle_remote_reset");
   return seastar::now();
 }
+
+seastar::lw_shared_ptr<OSDMap> OSD::get_map(epoch_t e)
+{
+  // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
+  return osdmaps[e];
+}
+
+void OSD::store_maps(epoch_t start, Ref<MOSDMap> m)
+{
+  for (epoch_t e = start; e <= m->get_last(); e++) {
+    seastar::lw_shared_ptr<OSDMap> o;
+    if (auto p = m->maps.find(e); p != m->maps.end()) {
+      o = seastar::make_lw_shared<OSDMap>();
+      o->decode(p->second);
+    } else if (auto p = m->incremental_maps.find(e);
+               p != m->incremental_maps.end()) {
+      o = get_map(e - 1);
+      OSDMap::Incremental inc;
+      auto i = p->second.cbegin();
+      inc.decode(i);
+      o->apply_incremental(inc);
+    } else {
+      logger().error("MOSDMap lied about what maps it had?");
+    }
+    osdmaps[e] = std::move(o);
+  }
+}
+
+seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
+{
+  if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+      force_request) {
+    return monc.renew_subs();
+  } else {
+    return seastar::now();
+  }
+}
+
+seastar::future<> OSD::read_superblock()
+{
+  // just-enough superblock so mon can ack my MOSDBoot
+  // might want to have a PurpleStore which is able to read the meta data for us.
+  string ceph_fsid = store->read_meta("ceph_fsid");
+  superblock.cluster_fsid.parse(ceph_fsid.c_str());
+  string osd_fsid = store->read_meta("fsid");
+  superblock.osd_fsid.parse(osd_fsid.c_str());
+  return seastar::now();
+}
+
+seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
+                                      Ref<MOSDMap> m)
+{
+  logger().info("handle_osd_map {}", *m);
+  if (m->fsid != superblock.cluster_fsid) {
+    logger().warn("fsid mismatched");
+    return seastar::now();
+  }
+  if (state.is_initializing()) {
+    logger().warn("i am still initializing");
+    return seastar::now();
+  }
+
+  const auto first = m->get_first();
+  const auto last = m->get_last();
+
+  // make sure there is something new, here, before we bother flushing
+  // the queues and such
+  if (last <= superblock.newest_map) {
+    return seastar::now();
+  }
+  // missing some?
+  bool skip_maps = false;
+  epoch_t start = superblock.newest_map + 1;
+  if (first > start) {
+    logger().info("handle_osd_map message skips epochs {}..{}",
+                  start, first - 1);
+    if (m->oldest_map <= start) {
+      return osdmap_subscribe(start, false);
+    }
+    // always try to get the full range of maps--as many as we can.  this
+    //  1- is good to have
+    //  2- is at present the only way to ensure that we get a *full* map as
+    //     the first map!
+    if (m->oldest_map < first) {
+      return osdmap_subscribe(m->oldest_map - 1, true);
+    }
+    skip_maps = true;
+    start = first;
+  }
+  // TODO: store new maps: queue for disk and put in the osdmap cache
+  store_maps(start, m);
+
+  // even if this map isn't from a mon, we may have satisfied our subscription
+  monc.sub_got("osdmap", last);
+  if (!superblock.oldest_map || skip_maps) {
+    superblock.oldest_map = first;
+  }
+  superblock.newest_map = last;
+  superblock.current_epoch = last;
+
+  // note in the superblock that we were clean thru the prior epoch
+  if (boot_epoch && boot_epoch >= superblock.mounted) {
+    superblock.mounted = boot_epoch;
+    superblock.clean_thru = last;
+  }
+  // TODO: write to superblock and commit the transaction
+  return committed_osd_maps(start, last, m);
+}
+
+seastar::future<> OSD::committed_osd_maps(version_t first,
+                                          version_t last,
+                                          Ref<MOSDMap> m)
+{
+  logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
+  // advance through the new maps
+  for (epoch_t cur = first; cur <= last; cur++) {
+    osdmap = get_map(cur);
+    if (up_epoch != 0 &&
+        osdmap->is_up(whoami) &&
+        osdmap->get_addrs(whoami) == client_msgr->get_myaddrs()) {
+      up_epoch = osdmap->get_epoch();
+      if (!boot_epoch) {
+        boot_epoch = osdmap->get_epoch();
+      }
+    }
+  }
+
+  if (osdmap->is_up(whoami) &&
+      osdmap->get_addrs(whoami) == client_msgr->get_myaddrs() &&
+      bind_epoch < osdmap->get_up_from(whoami)) {
+    if (state.is_booting()) {
+      logger().info("osd.{}: activating...", whoami);
+      state.set_active();
+    }
+  }
+
+  if (state.is_active()) {
+    logger().info("osd.{}: now active", whoami);
+    if (!osdmap->exists(whoami)) {
+      return shutdown();
+    }
+    if (should_restart()) {
+      return restart();
+    } else {
+      return seastar::now();
+    }
+  } else if (state.is_preboot()) {
+    logger().info("osd.{}: now preboot", whoami);
+
+    if (m->get_source().is_mon()) {
+      logger().info("osd.{}: _preboot", whoami);
+      return _preboot(m->oldest_map, m->newest_map);
+    } else {
+      logger().info("osd.{}: start_boot", whoami);
+      return start_boot();
+    }
+  } else {
+    logger().info("osd.{}: now ???", whoami);
+    // XXX
+    return seastar::now();
+  }
+}
+
+bool OSD::should_restart() const
+{
+  if (!osdmap->is_up(whoami)) {
+    logger().info("map e {} marked osd.{} down",
+                  osdmap->get_epoch(), whoami);
+    return true;
+  } else if (osdmap->get_addrs(whoami) != client_msgr->get_myaddrs()) {
+    logger().error("map e {} had wrong client addr ({} != my {})",
+                   osdmap->get_epoch(),
+                   osdmap->get_addrs(whoami),
+                   client_msgr->get_myaddrs());
+    return true;
+  } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) {
+    logger().error("map e {} had wrong cluster addr ({} != my {})",
+                   osdmap->get_epoch(),
+                   osdmap->get_cluster_addrs(whoami),
+                   cluster_msgr->get_myaddrs());
+    return true;
+  } else {
+    return false;
+  }
+}
+
+seastar::future<> OSD::restart()
+{
+  up_epoch = 0;
+  bind_epoch = osdmap->get_epoch();
+  // TODO: promote to shutdown if being marked down for multiple times
+  // rebind messengers
+  return start_boot();
+}
+
+seastar::future<> OSD::shutdown()
+{
+  // TODO
+  superblock.mounted = boot_epoch;
+  superblock.clean_thru = osdmap->get_epoch();
+  return seastar::now();
+}
index 9abc11bbb5636b83ea0612ee786c9e045f47aa34..13f1b2e67bac2389b8546fc64f5699d2dc1c8af7 100644 (file)
@@ -1,11 +1,20 @@
 #pragma once
 
+#include <map>
 #include <seastar/core/future.hh>
 #include <seastar/core/gate.hh>
+#include <seastar/core/shared_ptr.hh>
 
 #include "crimson/mon/MonClient.h"
 #include "crimson/net/Dispatcher.h"
+#include "crimson/os/cyan_store.h"
 #include "crimson/osd/chained_dispatchers.h"
+#include "crimson/osd/state.h"
+
+#include "osd/OSDMap.h"
+
+class MOSDMap;
+class OSDMap;
 
 namespace ceph::net {
   class Messenger;
@@ -21,6 +30,25 @@ class OSD : public ceph::net::Dispatcher {
   ChainedDispatchers dispatchers;
   ceph::mon::Client monc;
 
+  // TODO: use LRU cache
+  std::map<epoch_t, seastar::lw_shared_ptr<OSDMap>> osdmaps;
+  seastar::lw_shared_ptr<OSDMap> osdmap;
+  // TODO: use a wrapper for ObjectStore
+  std::unique_ptr<CyanStore> store;
+
+  OSDState state;
+
+  /// _first_ epoch we were marked up (after this process started)
+  epoch_t boot_epoch = 0;
+  /// _most_recent_ epoch we were marked up
+  epoch_t up_epoch = 0;
+  //< epoch we last did a bind to new ip:ports
+  epoch_t bind_epoch = 0;
+  //< since when there is no more pending pg creates from mon
+  epoch_t last_pg_create_epoch = 0;
+
+  OSDSuperblock superblock;
+
   // Dispatcher methods
   seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override;
   seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override;
@@ -33,4 +61,25 @@ public:
 
   seastar::future<> start();
   seastar::future<> stop();
+
+private:
+  seastar::future<> start_boot();
+  seastar::future<> _preboot(version_t newest_osdmap, version_t oldest_osdmap);
+  seastar::future<> _send_boot();
+
+  seastar::lw_shared_ptr<OSDMap> get_map(epoch_t e);
+  // TODO: should batch the write op along with superdisk modification as a
+  //       transaction
+  void store_maps(epoch_t start, Ref<MOSDMap> m);
+  seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
+  seastar::future<> read_superblock();
+
+  seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn,
+                                   Ref<MOSDMap> m);
+  seastar::future<> committed_osd_maps(version_t first,
+                                       version_t last,
+                                       Ref<MOSDMap> m);
+  bool should_restart() const;
+  seastar::future<> restart();
+  seastar::future<> shutdown();
 };
diff --git a/src/crimson/osd/state.h b/src/crimson/osd/state.h
new file mode 100644 (file)
index 0000000..2afd7b0
--- /dev/null
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+class OSDMap;
+
+class OSDState {
+
+  enum class State {
+    INITIALIZING,
+    PREBOOT,
+    BOOTING,
+    ACTIVE,
+    STOPPING,
+    WAITING_FOR_HEALTHY,
+  };
+
+  State state = State::INITIALIZING;
+
+public:
+  bool is_initializing() const {
+    return state == State::INITIALIZING;
+  }
+  bool is_preboot() const {
+    return state == State::PREBOOT;
+  }
+  bool is_booting() const {
+    return state == State::BOOTING;
+  }
+  bool is_active() const {
+    return state == State::ACTIVE;
+  }
+  bool is_stopping() const {
+    return state == State::STOPPING;
+  }
+  bool is_waiting_for_healthy() const {
+    return state == State::WAITING_FOR_HEALTHY;
+  }
+  void set_preboot() {
+    state = State::PREBOOT;
+  }
+  void set_booting() {
+    state = State::BOOTING;
+  }
+  void set_active() {
+    state = State::ACTIVE;
+  }
+  void set_stopping() {
+    state = State::STOPPING;
+  }
+  const char* print() {
+    switch (state) {
+    case State::INITIALIZING: return "initializing";
+    case State::PREBOOT: return "preboot";
+    case State::BOOTING: return "booting";
+    case State::ACTIVE: return "active";
+    case State::STOPPING: return "stopping";
+    case State::WAITING_FOR_HEALTHY: return "waiting_for_healthy";
+    default: return "???";
+    }
+  }
+};