From cd668c29e494b9a770c8784a360a5adfc2da1658 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 11 Jan 2019 18:47:39 +0800 Subject: [PATCH] crimson/osd: enable crimson-osd to boot * add state.h to encapsulate the state represeting different stages related to booting an OSD. the boot process of an OSD can be blocked by - waiting for PG consuming updated osdmaps - waiting for osdmaps marking osd.{whoami} up - waiting for new osdmaps to bring this osd up to speed. - waiting for current OSD to be healthy we could chain these "waits" in a more seastarized way, and let OSD::start() wait on the future returned by this chain. but that'd requires adding some seastar::shard_future<> as member variables of `OSD` class, which is a little bit more convoluted than the state machine approach used in this change. we could switch over to the `future<>` chain approach, if we found that these futures could have more consumers than merely `OSD::start()`. * all osdmaps are now stored in an `std::map` in `OSD`, we can improve it by - caching it using an LRU cache - trimming the stale ones - persisting the evicted maps into the meta collection in ObjectStore * superblock is not persited to store, neither is it read from the store. Signed-off-by: Kefu Chai --- src/crimson/osd/osd.cc | 292 +++++++++++++++++++++++++++++++++++++++- src/crimson/osd/osd.h | 49 +++++++ src/crimson/osd/state.h | 63 +++++++++ 3 files changed, 402 insertions(+), 2 deletions(-) create mode 100644 src/crimson/osd/state.h diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index a807a61bb21..afde4ef1dde 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -1,5 +1,7 @@ #include "osd.h" +#include "messages/MOSDBoot.h" +#include "messages/MOSDMap.h" #include "crimson/net/Connection.h" #include "crimson/net/SocketMessenger.h" @@ -7,6 +9,12 @@ namespace { seastar::logger& logger() { return ceph::get_logger(ceph_subsys_osd); } + + template + Ref make_message(Args&&... args) + { + return {new Message{std::forward(args)...}, false}; + } } using ceph::common::local_conf; @@ -29,6 +37,7 @@ OSD::OSD(int id, uint32_t nonce) } dispatchers.push_front(this); dispatchers.push_front(&monc); + osdmaps[0] = seastar::make_lw_shared(); } OSD::~OSD() = default; @@ -36,17 +45,85 @@ OSD::~OSD() = default; seastar::future<> OSD::start() { logger().info("start"); - return client_msgr.start(&dispatchers).then([this] { + auto& conf = local_conf(); + store = std::make_unique(conf.get_val("osd_data")); + return read_superblock().then([this] { + osdmap = get_map(superblock.current_epoch); + return client_msgr->start(&dispatchers); + }).then([this] { return monc.start(); }).then([this] { + monc.sub_want("osd_pg_creates", last_pg_create_epoch, 0); monc.sub_want("mgrmap", 0, 0); monc.sub_want("osdmap", 0, 0); return monc.renew_subs(); + }).then([this] { + return start_boot(); + }); +} + +seastar::future<> OSD::start_boot() +{ + state.set_preboot(); + return monc.get_version("osdmap").then([this](version_t newest, version_t oldest) { + return _preboot(newest, oldest); }); } +seastar::future<> OSD::_preboot(version_t newest, version_t oldest) +{ + if (osdmap->get_epoch() == 0) { + logger().warn("waiting for initial osdmap"); + } else if (osdmap->is_destroyed(whoami)) { + logger().warn("osdmap says I am destroyed"); + // provide a small margin so we don't livelock seeing if we + // un-destroyed ourselves. + if (osdmap->get_epoch() > newest - 1) { + throw std::runtime_error("i am destroyed"); + } + } else if (osdmap->test_flag(CEPH_OSDMAP_NOUP) || osdmap->is_noup(whoami)) { + logger().warn("osdmap NOUP flag is set, waiting for it to clear"); + } else if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { + logger().error("osdmap SORTBITWISE OSDMap flag is NOT set; please set it"); + } else if (osdmap->require_osd_release < CEPH_RELEASE_LUMINOUS) { + logger().error("osdmap require_osd_release < luminous; please upgrade to luminous"); + } else if (false) { + // TODO: update mon if current fullness state is different from osdmap + } else if (version_t n = local_conf()->osd_map_message_max; + osdmap->get_epoch() >= oldest - 1 && + osdmap->get_epoch() + n > newest) { + return _send_boot(); + } + // get all the latest maps + if (osdmap->get_epoch() + 1 >= oldest) { + return osdmap_subscribe(osdmap->get_epoch() + 1, false); + } else { + return osdmap_subscribe(oldest - 1, true); + } +} + +seastar::future<> OSD::_send_boot() +{ + state.set_booting(); + + entity_addrvec_t hb_back_addrs; + entity_addrvec_t hb_front_addrs; + entity_addrvec_t cluster_addrs; + + auto m = make_message(superblock, + osdmap->get_epoch(), + osdmap->get_epoch(), + hb_back_addrs, + hb_front_addrs, + cluster_addrs, + CEPH_FEATURES_ALL); + return monc.send_message(m); +} + seastar::future<> OSD::stop() { + // see also OSD::shutdown() + state.set_stopping(); return gate.close().then([this] { return monc.stop(); }).then([this] { @@ -57,7 +134,16 @@ seastar::future<> OSD::stop() seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) { logger().info("ms_dispatch {}", *m); - return seastar::now(); + if (state.is_stopping()) { + return seastar::now(); + } + + switch (m->get_type()) { + case CEPH_MSG_OSD_MAP: + return handle_osd_map(conn, boost::static_pointer_cast(m)); + default: + return seastar::now(); + } } seastar::future<> OSD::ms_handle_connect(ceph::net::ConnectionRef conn) @@ -81,3 +167,205 @@ seastar::future<> OSD::ms_handle_remote_reset(ceph::net::ConnectionRef conn) logger().warn("ms_handle_remote_reset"); return seastar::now(); } + +seastar::lw_shared_ptr OSD::get_map(epoch_t e) +{ + // TODO: use LRU cache for managing osdmap, fallback to disk if we have to + return osdmaps[e]; +} + +void OSD::store_maps(epoch_t start, Ref m) +{ + for (epoch_t e = start; e <= m->get_last(); e++) { + seastar::lw_shared_ptr o; + if (auto p = m->maps.find(e); p != m->maps.end()) { + o = seastar::make_lw_shared(); + o->decode(p->second); + } else if (auto p = m->incremental_maps.find(e); + p != m->incremental_maps.end()) { + o = get_map(e - 1); + OSDMap::Incremental inc; + auto i = p->second.cbegin(); + inc.decode(i); + o->apply_incremental(inc); + } else { + logger().error("MOSDMap lied about what maps it had?"); + } + osdmaps[e] = std::move(o); + } +} + +seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request) +{ + if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + force_request) { + return monc.renew_subs(); + } else { + return seastar::now(); + } +} + +seastar::future<> OSD::read_superblock() +{ + // just-enough superblock so mon can ack my MOSDBoot + // might want to have a PurpleStore which is able to read the meta data for us. + string ceph_fsid = store->read_meta("ceph_fsid"); + superblock.cluster_fsid.parse(ceph_fsid.c_str()); + string osd_fsid = store->read_meta("fsid"); + superblock.osd_fsid.parse(osd_fsid.c_str()); + return seastar::now(); +} + +seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn, + Ref m) +{ + logger().info("handle_osd_map {}", *m); + if (m->fsid != superblock.cluster_fsid) { + logger().warn("fsid mismatched"); + return seastar::now(); + } + if (state.is_initializing()) { + logger().warn("i am still initializing"); + return seastar::now(); + } + + const auto first = m->get_first(); + const auto last = m->get_last(); + + // make sure there is something new, here, before we bother flushing + // the queues and such + if (last <= superblock.newest_map) { + return seastar::now(); + } + // missing some? + bool skip_maps = false; + epoch_t start = superblock.newest_map + 1; + if (first > start) { + logger().info("handle_osd_map message skips epochs {}..{}", + start, first - 1); + if (m->oldest_map <= start) { + return osdmap_subscribe(start, false); + } + // always try to get the full range of maps--as many as we can. this + // 1- is good to have + // 2- is at present the only way to ensure that we get a *full* map as + // the first map! + if (m->oldest_map < first) { + return osdmap_subscribe(m->oldest_map - 1, true); + } + skip_maps = true; + start = first; + } + // TODO: store new maps: queue for disk and put in the osdmap cache + store_maps(start, m); + + // even if this map isn't from a mon, we may have satisfied our subscription + monc.sub_got("osdmap", last); + if (!superblock.oldest_map || skip_maps) { + superblock.oldest_map = first; + } + superblock.newest_map = last; + superblock.current_epoch = last; + + // note in the superblock that we were clean thru the prior epoch + if (boot_epoch && boot_epoch >= superblock.mounted) { + superblock.mounted = boot_epoch; + superblock.clean_thru = last; + } + // TODO: write to superblock and commit the transaction + return committed_osd_maps(start, last, m); +} + +seastar::future<> OSD::committed_osd_maps(version_t first, + version_t last, + Ref m) +{ + logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last); + // advance through the new maps + for (epoch_t cur = first; cur <= last; cur++) { + osdmap = get_map(cur); + if (up_epoch != 0 && + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == client_msgr->get_myaddrs()) { + up_epoch = osdmap->get_epoch(); + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); + } + } + } + + if (osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == client_msgr->get_myaddrs() && + bind_epoch < osdmap->get_up_from(whoami)) { + if (state.is_booting()) { + logger().info("osd.{}: activating...", whoami); + state.set_active(); + } + } + + if (state.is_active()) { + logger().info("osd.{}: now active", whoami); + if (!osdmap->exists(whoami)) { + return shutdown(); + } + if (should_restart()) { + return restart(); + } else { + return seastar::now(); + } + } else if (state.is_preboot()) { + logger().info("osd.{}: now preboot", whoami); + + if (m->get_source().is_mon()) { + logger().info("osd.{}: _preboot", whoami); + return _preboot(m->oldest_map, m->newest_map); + } else { + logger().info("osd.{}: start_boot", whoami); + return start_boot(); + } + } else { + logger().info("osd.{}: now ???", whoami); + // XXX + return seastar::now(); + } +} + +bool OSD::should_restart() const +{ + if (!osdmap->is_up(whoami)) { + logger().info("map e {} marked osd.{} down", + osdmap->get_epoch(), whoami); + return true; + } else if (osdmap->get_addrs(whoami) != client_msgr->get_myaddrs()) { + logger().error("map e {} had wrong client addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_addrs(whoami), + client_msgr->get_myaddrs()); + return true; + } else if (osdmap->get_cluster_addrs(whoami) != cluster_msgr->get_myaddrs()) { + logger().error("map e {} had wrong cluster addr ({} != my {})", + osdmap->get_epoch(), + osdmap->get_cluster_addrs(whoami), + cluster_msgr->get_myaddrs()); + return true; + } else { + return false; + } +} + +seastar::future<> OSD::restart() +{ + up_epoch = 0; + bind_epoch = osdmap->get_epoch(); + // TODO: promote to shutdown if being marked down for multiple times + // rebind messengers + return start_boot(); +} + +seastar::future<> OSD::shutdown() +{ + // TODO + superblock.mounted = boot_epoch; + superblock.clean_thru = osdmap->get_epoch(); + return seastar::now(); +} diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 9abc11bbb56..13f1b2e67ba 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -1,11 +1,20 @@ #pragma once +#include #include #include +#include #include "crimson/mon/MonClient.h" #include "crimson/net/Dispatcher.h" +#include "crimson/os/cyan_store.h" #include "crimson/osd/chained_dispatchers.h" +#include "crimson/osd/state.h" + +#include "osd/OSDMap.h" + +class MOSDMap; +class OSDMap; namespace ceph::net { class Messenger; @@ -21,6 +30,25 @@ class OSD : public ceph::net::Dispatcher { ChainedDispatchers dispatchers; ceph::mon::Client monc; + // TODO: use LRU cache + std::map> osdmaps; + seastar::lw_shared_ptr osdmap; + // TODO: use a wrapper for ObjectStore + std::unique_ptr store; + + OSDState state; + + /// _first_ epoch we were marked up (after this process started) + epoch_t boot_epoch = 0; + /// _most_recent_ epoch we were marked up + epoch_t up_epoch = 0; + //< epoch we last did a bind to new ip:ports + epoch_t bind_epoch = 0; + //< since when there is no more pending pg creates from mon + epoch_t last_pg_create_epoch = 0; + + OSDSuperblock superblock; + // Dispatcher methods seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; @@ -33,4 +61,25 @@ public: seastar::future<> start(); seastar::future<> stop(); + +private: + seastar::future<> start_boot(); + seastar::future<> _preboot(version_t newest_osdmap, version_t oldest_osdmap); + seastar::future<> _send_boot(); + + seastar::lw_shared_ptr get_map(epoch_t e); + // TODO: should batch the write op along with superdisk modification as a + // transaction + void store_maps(epoch_t start, Ref m); + seastar::future<> osdmap_subscribe(version_t epoch, bool force_request); + seastar::future<> read_superblock(); + + seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> committed_osd_maps(version_t first, + version_t last, + Ref m); + bool should_restart() const; + seastar::future<> restart(); + seastar::future<> shutdown(); }; diff --git a/src/crimson/osd/state.h b/src/crimson/osd/state.h new file mode 100644 index 00000000000..2afd7b00060 --- /dev/null +++ b/src/crimson/osd/state.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +class OSDMap; + +class OSDState { + + enum class State { + INITIALIZING, + PREBOOT, + BOOTING, + ACTIVE, + STOPPING, + WAITING_FOR_HEALTHY, + }; + + State state = State::INITIALIZING; + +public: + bool is_initializing() const { + return state == State::INITIALIZING; + } + bool is_preboot() const { + return state == State::PREBOOT; + } + bool is_booting() const { + return state == State::BOOTING; + } + bool is_active() const { + return state == State::ACTIVE; + } + bool is_stopping() const { + return state == State::STOPPING; + } + bool is_waiting_for_healthy() const { + return state == State::WAITING_FOR_HEALTHY; + } + void set_preboot() { + state = State::PREBOOT; + } + void set_booting() { + state = State::BOOTING; + } + void set_active() { + state = State::ACTIVE; + } + void set_stopping() { + state = State::STOPPING; + } + const char* print() { + switch (state) { + case State::INITIALIZING: return "initializing"; + case State::PREBOOT: return "preboot"; + case State::BOOTING: return "booting"; + case State::ACTIVE: return "active"; + case State::STOPPING: return "stopping"; + case State::WAITING_FOR_HEALTHY: return "waiting_for_healthy"; + default: return "???"; + } + } +}; -- 2.39.5