#include "messages/MOSDBeacon.h"
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
+#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGNotify.h"
#include "messages/MPGStats.h"
#include "crimson/mon/MonClient.h"
+
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyan_collection.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_meta.h"
+#include "osd/PGPeeringEvent.h"
+
namespace {
seastar::logger& logger() {
return ceph::get_logger(ceph_subsys_osd);
switch (m->get_type()) {
case CEPH_MSG_OSD_MAP:
return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
+ case MSG_OSD_PG_NOTIFY:
+ return handle_pg_notify(conn, boost::static_pointer_cast<MOSDPGNotify>(m));
default:
return seastar::now();
}
}
heartbeat->update_peers(whoami);
}
+
+seastar::future<> OSD::handle_pg_notify(ceph::net::ConnectionRef conn,
+ Ref<MOSDPGNotify> m)
+{
+ // assuming all pgs reside in a single shard
+ // see OSD::dequeue_peering_evt()
+ const int from = m->get_source().num();
+ return seastar::parallel_for_each(m->get_pg_list(),
+ [from, this](pair<pg_notify_t, PastIntervals> p) {
+ auto& [pg_notify, past_intervals] = p;
+ spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+ MNotifyRec notify{pgid,
+ pg_shard_t{from, pg_notify.from},
+ pg_notify,
+ 0, // the features is not used
+ past_intervals};
+ auto create_info = new PGCreateInfo{pgid,
+ pg_notify.query_epoch,
+ pg_notify.info.history,
+ past_intervals,
+ false};
+ auto evt = std::make_unique<PGPeeringEvent>(pg_notify.epoch_sent,
+ pg_notify.query_epoch,
+ notify,
+ true, // requires_pg
+ create_info);
+ return do_peering_event(pgid, std::move(evt));
+ });
+}
+
+seastar::future<>
+OSD::do_peering_event(spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt)
+{
+ if (auto pg = pgs.find(pgid); pg != pgs.end()) {
+ return advance_pg_to(pg->second, osdmap->get_epoch()).then(
+ [pg, evt=std::move(evt)]() mutable {
+ return pg->second->do_peering_event(std::move(evt));
+ });
+ } else {
+ // todo: handle_pg_query_nopg()
+ return seastar::now();
+ }
+}
+
+seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
+{
+ auto from = pg->get_osdmap_epoch();
+ // todo: merge/split support
+ return seastar::do_for_each(boost::make_counting_iterator(from + 1),
+ boost::make_counting_iterator(to + 1),
+ [pg, this](epoch_t next_epoch) {
+ return get_map(next_epoch).then([pg, this] (cached_map_t&& next_map) {
+ return pg->handle_advance_map(next_map);
+ }).then([pg, this] {
+ return pg->handle_activate_map();
+ });
+ });
+}
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/state.h"
-#include "osd/OSDMap.h"
+#include "osd/osd_types.h"
class MOSDMap;
class OSDMap;
class OSDMeta;
class PG;
+class PGPeeringEvent;
class Heartbeat;
namespace ceph::mon {
seastar::future<> handle_osd_map(ceph::net::ConnectionRef conn,
Ref<MOSDMap> m);
+ seastar::future<> handle_pg_notify(ceph::net::ConnectionRef conn,
+ Ref<MOSDPGNotify> m);
+
seastar::future<> committed_osd_maps(version_t first,
version_t last,
Ref<MOSDMap> m);
+
+ seastar::future<> do_peering_event(spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt);
+ seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
+
bool should_restart() const;
seastar::future<> restart();
seastar::future<> shutdown();