From f3a4573993e1e22422d1485f00bdaca2e7b6bb48 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 25 Feb 2019 21:42:09 +0800 Subject: [PATCH] crimson/osd: handle MOSDPGNotify in OSD will handle it in PG in follow-up changes. Signed-off-by: Kefu Chai --- src/crimson/osd/osd.cc | 66 ++++++++++++++++++++++++++++++++++++++++++ src/crimson/osd/osd.h | 11 ++++++- src/crimson/osd/pg.cc | 19 ++++++++++++ src/crimson/osd/pg.h | 6 ++++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 0bd9071bc4f..a81770e9006 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -9,9 +9,12 @@ #include "messages/MOSDBeacon.h" #include "messages/MOSDBoot.h" #include "messages/MOSDMap.h" +#include "messages/MOSDPGLog.h" +#include "messages/MOSDPGNotify.h" #include "messages/MPGStats.h" #include "crimson/mon/MonClient.h" + #include "crimson/net/Connection.h" #include "crimson/net/Messenger.h" #include "crimson/os/cyan_collection.h" @@ -23,6 +26,8 @@ #include "crimson/osd/pg.h" #include "crimson/osd/pg_meta.h" +#include "osd/PGPeeringEvent.h" + namespace { seastar::logger& logger() { return ceph::get_logger(ceph_subsys_osd); @@ -357,6 +362,8 @@ seastar::future<> OSD::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) switch (m->get_type()) { case CEPH_MSG_OSD_MAP: return handle_osd_map(conn, boost::static_pointer_cast(m)); + case MSG_OSD_PG_NOTIFY: + return handle_pg_notify(conn, boost::static_pointer_cast(m)); default: return seastar::now(); } @@ -690,3 +697,62 @@ void OSD::update_heartbeat_peers() } heartbeat->update_peers(whoami); } + +seastar::future<> OSD::handle_pg_notify(ceph::net::ConnectionRef conn, + Ref m) +{ + // assuming all pgs reside in a single shard + // see OSD::dequeue_peering_evt() + const int from = m->get_source().num(); + return seastar::parallel_for_each(m->get_pg_list(), + [from, this](pair p) { + auto& [pg_notify, past_intervals] = p; + spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to}; + MNotifyRec notify{pgid, + pg_shard_t{from, pg_notify.from}, + pg_notify, + 0, // the features is not used + past_intervals}; + auto create_info = new PGCreateInfo{pgid, + pg_notify.query_epoch, + pg_notify.info.history, + past_intervals, + false}; + auto evt = std::make_unique(pg_notify.epoch_sent, + pg_notify.query_epoch, + notify, + true, // requires_pg + create_info); + return do_peering_event(pgid, std::move(evt)); + }); +} + +seastar::future<> +OSD::do_peering_event(spg_t pgid, + std::unique_ptr evt) +{ + if (auto pg = pgs.find(pgid); pg != pgs.end()) { + return advance_pg_to(pg->second, osdmap->get_epoch()).then( + [pg, evt=std::move(evt)]() mutable { + return pg->second->do_peering_event(std::move(evt)); + }); + } else { + // todo: handle_pg_query_nopg() + return seastar::now(); + } +} + +seastar::future<> OSD::advance_pg_to(Ref pg, epoch_t to) +{ + auto from = pg->get_osdmap_epoch(); + // todo: merge/split support + return seastar::do_for_each(boost::make_counting_iterator(from + 1), + boost::make_counting_iterator(to + 1), + [pg, this](epoch_t next_epoch) { + return get_map(next_epoch).then([pg, this] (cached_map_t&& next_map) { + return pg->handle_advance_map(next_map); + }).then([pg, this] { + return pg->handle_activate_map(); + }); + }); +} diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 1bf351b9880..74b80176f86 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -17,12 +17,13 @@ #include "crimson/osd/osdmap_service.h" #include "crimson/osd/state.h" -#include "osd/OSDMap.h" +#include "osd/osd_types.h" class MOSDMap; class OSDMap; class OSDMeta; class PG; +class PGPeeringEvent; class Heartbeat; namespace ceph::mon { @@ -125,9 +126,17 @@ private: seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn, Ref m); + seastar::future<> handle_pg_notify(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> committed_osd_maps(version_t first, version_t last, Ref m); + + seastar::future<> do_peering_event(spg_t pgid, + std::unique_ptr evt); + seastar::future<> advance_pg_to(Ref pg, epoch_t to); + bool should_restart() const; seastar::future<> restart(); seastar::future<> shutdown(); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 8d3f973f44e..fb60176e722 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -1,5 +1,6 @@ #include "pg.h" +#include "messages/MOSDPGLog.h" #include "osd/OSDMap.h" #include "crimson/os/cyan_store.h" @@ -158,3 +159,21 @@ void PG::update_last_peering_reset() { last_peering_reset = get_osdmap_epoch(); } + +seastar::future<> PG::do_peering_event(std::unique_ptr evt) +{ + // todo + return seastar::now(); +} + +seastar::future<> PG::handle_advance_map(cached_map_t next_map) +{ + // todo + return seastar::now(); +} + +seastar::future<> PG::handle_activate_map() +{ + // todo + return seastar::now(); +} diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 413f0e93d51..601f876806d 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -12,6 +12,7 @@ template using Ref = boost::intrusive_ptr; class OSDMap; +class PGPeeringEvent; namespace ceph::net { class Messenger; @@ -52,6 +53,11 @@ public: seastar::future<> read_state(ceph::os::CyanStore* store); + // peering/recovery + seastar::future<> do_peering_event(std::unique_ptr evt); + seastar::future<> handle_advance_map(cached_map_t next_map); + seastar::future<> handle_activate_map(); + private: void update_primary_state(const std::vector& new_up, int new_up_primary, -- 2.39.5