replicated_backend.cc
shard_services.cc
osd_operation.cc
+ osd_operations/client_request.cc
+ osd_operations/peering_event.cc
+ osd_operations/compound_peering_request.cc
+ osdmap_gate.cc
+ pg_map.cc
${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
#include "messages/MOSDOp.h"
-#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGLog.h"
-#include "messages/MOSDPGNotify.h"
-#include "messages/MOSDPGQuery.h"
#include "messages/MPGStats.h"
-#include "messages/MOSDPGCreate2.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/pg_meta.h"
#include "osd/PGPeeringEvent.h"
#include "osd/PeeringState.h"
+#include "crimson/osd/osd_operations/compound_peering_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/client_request.h"
namespace {
seastar::logger& logger() {
store{ceph::os::FuturizedStore::create(
local_conf().get_val<std::string>("osd_objectstore"),
local_conf().get_val<std::string>("osd_data"))},
- shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store}
+ shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store},
+ osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
return get_map(superblock.current_epoch);
}).then([this](cached_map_t&& map) {
shard_services.update_map(osdmap);
+ osdmap_gate.got_map(map->get_epoch());
osdmap = std::move(map);
return load_pgs();
}).then([this] {
}
// get all the latest maps
if (osdmap->get_epoch() + 1 >= oldest) {
- return osdmap_subscribe(osdmap->get_epoch() + 1, false);
+ return shard_services.osdmap_subscribe(osdmap->get_epoch() + 1, false);
} else {
- return osdmap_subscribe(oldest - 1, true);
+ return shard_services.osdmap_subscribe(oldest - 1, true);
}
}
if (coll.is_pg(&pgid)) {
return load_pg(pgid).then([pgid, this](auto&& pg) {
logger().info("load_pgs: loaded {}", pgid);
- pgs.emplace(pgid, std::move(pg));
+ pg_map.pg_loaded(pgid, std::move(pg));
return seastar::now();
});
} else if (coll.is_temp(&pgid)) {
return handle_osd_map(conn, boost::static_pointer_cast<MOSDMap>(m));
case CEPH_MSG_OSD_OP:
return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
+ case MSG_OSD_PG_CREATE2:
case MSG_OSD_PG_NOTIFY:
- return handle_pg_notify(conn, boost::static_pointer_cast<MOSDPGNotify>(m));
case MSG_OSD_PG_INFO:
- return handle_pg_info(conn, boost::static_pointer_cast<MOSDPGInfo>(m));
case MSG_OSD_PG_QUERY:
- return handle_pg_query(conn, boost::static_pointer_cast<MOSDPGQuery>(m));
+ shard_services.start_operation<CompoundPeeringRequest>(
+ *this,
+ conn->get_shared(),
+ m);
+ return seastar::now();
case MSG_OSD_PG_LOG:
return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
- case MSG_OSD_PG_CREATE2:
- return handle_pg_create(conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
default:
logger().info("{} unhandled message {}", __func__, *m);
return seastar::now();
// MPGStats::had_map_for is not used since PGMonitor was removed
auto m = make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
- for (auto [pgid, pg] : pgs) {
+ for (auto [pgid, pg] : pg_map.get_pgs()) {
if (pg->is_primary()) {
auto stats = pg->get_stats();
// todo: update reported_epoch,reported_seq,last_fresh
});
}
-seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
-{
- logger().info("{}({})", __func__, epoch);
- if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
- force_request) {
- return monc->renew_subs();
- } else {
- return seastar::now();
- }
-}
-
bool OSD::require_mon_peer(ceph::net::Connection *conn, Ref<Message> m)
{
if (!conn->peer_is_mon()) {
pg->handle_activate_map(rctx);
logger().info("{} new pg {}", __func__, *pg);
- pgs.emplace(info->pgid, pg);
+ pg_map.pg_created(info->pgid, pg);
+
return seastar::when_all_succeed(
advance_pg_to(pg, osdmap->get_epoch()),
pg->get_need_up_thru() ? _send_alive() : seastar::now(),
});
}
-seastar::future<> OSD::handle_pg_create(
- ceph::net::Connection* conn,
- Ref<MOSDPGCreate2> m)
-{
- logger().info("{}: {} from {}", __func__, *m, m->get_source());
- if (!require_mon_peer(conn, m)) {
- return seastar::now();
- }
- return handle_batch_pg_message(
- m->pgs,
- [this, conn, m](auto p)
- -> std::optional<std::tuple<spg_t, std::unique_ptr<PGPeeringEvent>>> {
- const spg_t &pgid = p.first;
- const auto &[created, created_stamp] = p.second;
-
- auto q = m->pg_extra.find(pgid);
- ceph_assert(q != m->pg_extra.end());
- logger().debug(
- "{} {} e{} @{} history {} pi {}",
- __func__,
- pgid,
- created,
- created_stamp,
- q->second.first,
- q->second.second);
- if (!q->second.second.empty() &&
- m->epoch < q->second.second.get_bounds().second) {
- logger().error(
- "got pg_create on {} epoch {} unmatched past_intervals (history {})",
- pgid,
- m->epoch,
- q->second.second,
- q->second.first);
- return std::nullopt;
- } else {
- return std::make_optional(
- std::make_tuple(
- pgid,
- std::make_unique<PGPeeringEvent>(
- m->epoch,
- m->epoch,
- NullEvt(),
- true,
- new PGCreateInfo(
- pgid,
- m->epoch,
- q->second.first,
- q->second.second,
- true))));
- }
- });
-}
-
seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
Ref<MOSDMap> m)
{
logger().info("handle_osd_map message skips epochs {}..{}",
start, first - 1);
if (m->oldest_map <= start) {
- return osdmap_subscribe(start, false);
+ return shard_services.osdmap_subscribe(start, false);
}
// always try to get the full range of maps--as many as we can. this
// 1- is good to have
// 2- is at present the only way to ensure that we get a *full* map as
// the first map!
if (m->oldest_map < first) {
- return osdmap_subscribe(m->oldest_map - 1, true);
+ return shard_services.osdmap_subscribe(m->oldest_map - 1, true);
}
skip_maps = true;
start = first;
seastar::future<> OSD::handle_osd_op(ceph::net::Connection* conn,
Ref<MOSDOp> m)
{
- return wait_for_map(m->get_map_epoch()).then([=](epoch_t epoch) {
- if (auto found = pgs.find(m->get_spg()); found != pgs.end()) {
- return found->second->handle_op(conn, std::move(m));
- } else if (osdmap->is_up_acting_osd_shard(m->get_spg(), whoami)) {
- logger().info("no pg, should exist e{}, will wait", epoch);
- // todo, wait for peering, etc
- return seastar::now();
- } else {
- logger().info("no pg, shouldn't exist e{}, dropping", epoch);
- // todo: share map with client
- return seastar::now();
- }
- });
+ shard_services.start_operation<ClientRequest>(
+ *this,
+ conn->get_shared(),
+ std::move(m));
+ return seastar::now();
}
bool OSD::should_restart() const
if (!state.is_active()) {
return;
}
- for (auto& pg : pgs) {
+ for (auto& pg : pg_map.get_pgs()) {
vector<int> up, acting;
osdmap->pg_to_up_acting_osds(pg.first.pgid,
&up, nullptr,
heartbeat->update_peers(whoami);
}
-seastar::future<> OSD::handle_pg_notify(
- ceph::net::Connection* conn,
- Ref<MOSDPGNotify> m)
-{
- // assuming all pgs reside in a single shard
- // see OSD::dequeue_peering_evt()
- const int from = m->get_source().num();
- return handle_batch_pg_message(
- m->get_pg_list(),
- [from, this](pair<pg_notify_t, PastIntervals> p) {
- auto& [pg_notify, past_intervals] = p;
- spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
- MNotifyRec notify{pgid,
- pg_shard_t{from, pg_notify.from},
- pg_notify,
- 0, // the features is not used
- past_intervals};
- logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
- auto create_info = new PGCreateInfo{
- pgid,
- pg_notify.query_epoch,
- pg_notify.info.history,
- past_intervals,
- false};
- return std::make_optional(
- std::make_tuple(
- pgid,
- std::make_unique<PGPeeringEvent>(
- pg_notify.epoch_sent,
- pg_notify.query_epoch,
- notify,
- true, // requires_pg
- create_info)));
- });
-}
-
-seastar::future<> OSD::handle_pg_info(
- ceph::net::Connection* conn,
- Ref<MOSDPGInfo> m)
-{
- // assuming all pgs reside in a single shard
- // see OSD::dequeue_peering_evt()
- const int from = m->get_source().num();
- return handle_batch_pg_message(
- m->pg_list,
- [from, this](pair<pg_notify_t, PastIntervals> p) {
- auto& pg_notify = p.first;
- spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
- logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
- MInfoRec info{pg_shard_t{from, pg_notify.from},
- pg_notify.info,
- pg_notify.epoch_sent};
- return std::make_optional(
- std::tuple(
- pgid,
- std::make_unique<PGPeeringEvent>(
- pg_notify.epoch_sent,
- pg_notify.query_epoch,
- std::move(info))));
- });
-}
-
-seastar::future<> OSD::handle_pg_query(ceph::net::Connection* conn,
- Ref<MOSDPGQuery> m)
-{
- // assuming all pgs reside in a single shard
- // see OSD::dequeue_peering_evt()
- const int from = m->get_source().num();
- // TODO: handle missing pg -- handle_batch_pg_message ignores pgs
- // that don't exist
- return handle_batch_pg_message_with_missing_handler(
- m->pg_list,
- [from, this](pair<spg_t, pg_query_t> p) {
- auto& [pgid, pg_query] = p;
- MQuery query{pgid, pg_shard_t{from, pg_query.from},
- pg_query, pg_query.epoch_sent};
- logger().debug("handle_pg_query on {} from {}", pgid, from);
- return std::make_optional(
- std::make_tuple(
- pgid,
- std::make_unique<PGPeeringEvent>(
- pg_query.epoch_sent,
- pg_query.epoch_sent,
- std::move(query))));
- },
- [this, from](pair<spg_t, pg_query_t> p, PeeringCtx &ctx) {
- auto &[pgid, query] = p;
- logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
- pg_info_t empty(spg_t(pgid.pgid, query.to));
- ceph_assert(query.type == pg_query_t::INFO);
- ctx.notify_list[from].emplace_back(
- pg_notify_t(
- query.from, query.to,
- query.epoch_sent,
- osdmap->get_epoch(),
- empty),
- PastIntervals());
- });
-}
-
seastar::future<> OSD::handle_pg_log(
ceph::net::Connection* conn,
Ref<MOSDPGLog> m)
{
const int from = m->get_source().num();
logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
- return do_peering_event_and_dispatch(
- m->get_spg(),
- PGPeeringEventURef(m->get_event()));
+ shard_services.start_operation<RemotePeeringEvent>(
+ *this,
+ conn->get_shared(),
+ shard_services,
+ pg_shard_t(from, m->from),
+ spg_t(m->info.pgid.pgid, m->to),
+ std::move(*m->get_event()));
+ return seastar::now();
}
void OSD::check_osdmap_features()
seastar::future<> OSD::consume_map(epoch_t epoch)
{
// todo: m-to-n: broadcast this news to all shards
+ auto &pgs = pg_map.get_pgs();
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
return advance_pg_to(pg.second, epoch);
}).then([epoch, this] {
- auto first = waiting_peering.begin();
- auto last = waiting_peering.upper_bound(epoch);
- std::for_each(first, last, [epoch, this](auto& blocked_requests) {
- blocked_requests.second.set_value(epoch);
- });
- waiting_peering.erase(first, last);
- return seastar::now();
+ osdmap_gate.got_map(epoch);
+ return seastar::make_ready_future();
});
}
-seastar::future<Ref<PG>>
-OSD::get_pg(
+blocking_future<Ref<PG>>
+OSD::get_or_create_pg(
spg_t pgid,
epoch_t epoch,
std::unique_ptr<PGCreateInfo> info)
{
- return wait_for_map(epoch).then([this, pgid, epoch, info=std::move(info)](epoch_t) mutable {
- if (auto pg = pgs.find(pgid); pg != pgs.end()) {
- return advance_pg_to(pg->second, epoch).then([pg=pg->second]() {
- return seastar::make_ready_future<Ref<PG>>(pg);
- });
- } else if (!info) {
- return seastar::make_ready_future<Ref<PG>>();
- } else {
- auto creating = pgs_creating.find(pgid);
- if (creating == pgs_creating.end()) {
- creating = pgs_creating.emplace(
- pgid,
- seastar::shared_future<Ref<PG>>(handle_pg_create_info(std::move(info)).then([this, pgid](auto pg) {
- pgs_creating.erase(pgid);
- return seastar::make_ready_future<Ref<PG>>(pg);
- }))).first;
- }
- return creating->second.get_future().then([this, epoch](auto pg) {
- return advance_pg_to(pg, epoch).then([pg]() {
- return seastar::make_ready_future<Ref<PG>>(pg);
- });
- });
- }
- });
-}
-
-seastar::future<Ref<PG>>
-OSD::do_peering_event(
- spg_t pgid,
- PGPeeringEventURef evt,
- PeeringCtx &rctx)
-{
- return get_pg(pgid, evt->get_epoch_sent(), std::move(evt->create_info))
- .then([this, evt=std::move(evt), &rctx](Ref<PG> pg) mutable {
- if (pg) {
- pg->do_peering_event(std::move(evt), rctx);
- }
- return seastar::make_ready_future<Ref<PG>>(pg);
- });
-}
-
-seastar::future<bool>
-OSD::do_peering_event_and_dispatch_transaction(
- spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt,
- PeeringCtx &rctx)
-{
- return do_peering_event(pgid, std::move(evt), rctx).then(
- [this, pgid, &rctx](Ref<PG> pg) mutable {
- if (pg) {
- return seastar::when_all_succeed(
- pg->get_need_up_thru() ? _send_alive() : seastar::now(),
- shard_services.dispatch_context_transaction(
- pg->get_collection_ref(), rctx)).then([] { return true; });
- } else {
- return seastar::make_ready_future<bool>(false);
- }
- });
-}
-
-seastar::future<>
-OSD::do_peering_event_and_dispatch(
- spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt)
-{
- return seastar::do_with(
- PeeringCtx{},
- [this, pgid, evt=std::move(evt)](auto &rctx) mutable {
- return do_peering_event(pgid, std::move(evt), rctx).then(
- [this, pgid, &rctx](Ref<PG> pg) mutable {
- if (pg) {
- return seastar::when_all_succeed(
- pg->get_need_up_thru() ? _send_alive() : seastar::now(),
- shard_services.dispatch_context(
- pg->get_collection_ref(), std::move(rctx)));
- } else {
- return seastar::now();
- }
- });
- }).handle_exception([](auto ep) {
- logger().error("do_peering_event_and_dispatch saw {}", ep);
- return seastar::make_exception_future<>(ep);
- });
+ auto [fut, creating] = pg_map.get_pg(pgid, bool(info));
+ if (!creating && info) {
+ pg_map.set_creating(pgid);
+ handle_pg_create_info(std::move(info));
+ }
+ return std::move(fut);
}
-seastar::future<epoch_t> OSD::wait_for_map(epoch_t epoch)
+blocking_future<Ref<PG>> OSD::wait_for_pg(
+ spg_t pgid)
{
- const auto mine = osdmap->get_epoch();
- if (mine >= epoch) {
- return seastar::make_ready_future<epoch_t>(mine);
- } else {
- logger().info("evt epoch is {}, i have {}, will wait", epoch, mine);
- auto fut = waiting_peering[epoch].get_shared_future();
- return osdmap_subscribe(osdmap->get_epoch(), true).then(
- [fut=std::move(fut)]() mutable {
- return std::move(fut);
- });
- }
+ return pg_map.get_pg(pgid).first;
}
seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
[this, pg, &rctx](epoch_t next_epoch) {
return get_map(next_epoch).then(
[pg, this, &rctx] (cached_map_t&& next_map) {
- return pg->handle_advance_map(next_map, rctx);
+ pg->handle_advance_map(next_map, rctx);
});
}).then([this, &rctx, pg] {
pg->handle_activate_map(rctx);
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/state.h"
#include "crimson/osd/shard_services.h"
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/pg_map.h"
+#include "crimson/osd/osd_operations/peering_event.h"
#include "osd/PeeringState.h"
#include "osd/osd_types.h"
seastar::future<Ref<PG>> handle_pg_create_info(
std::unique_ptr<PGCreateInfo> info);
- template <typename C, typename F, typename G>
- seastar::future<> handle_batch_pg_message_with_missing_handler(
- const C &c,
- F &&f,
- G &&on_missing_pg) {
- using mapped_type = const typename C::value_type &;
- using event_type = std::optional<std::tuple<
- spg_t,
- std::unique_ptr<PGPeeringEvent>>>;
- return seastar::do_with(
- PeeringCtx{},
- std::move(f),
- std::move(on_missing_pg),
- [this, &c] (auto &rctx, auto &f, auto &on_missing_pg) {
- return seastar::parallel_for_each(
- c,
- [this, &rctx, &f, &on_missing_pg](mapped_type m) {
- event_type result = f(m);
- if (result) {
- auto [pgid, event] = std::move(*result);
- return do_peering_event_and_dispatch_transaction(
- pgid,
- std::move(event),
- rctx).then([m, &on_missing_pg, &rctx] (bool found) {
- if (!found) {
- on_missing_pg(m, rctx);
- }
- return seastar::now();
- });
- } else {
- return seastar::now();
- }
- }).then([this, &rctx] {
- return shard_services.dispatch_context(std::move(rctx));
- });
- });
- }
-
- template <typename C, typename F>
- seastar::future<> handle_batch_pg_message(
- const C &c,
- F &&f) {
- return handle_batch_pg_message_with_missing_handler(
- c,
- std::move(f),
- [](const typename C::value_type &, PeeringCtx &){});
- }
-
- seastar::future<> handle_pg_create(ceph::net::Connection *conn,
- Ref<MOSDPGCreate2> m);
seastar::future<> handle_osd_map(ceph::net::Connection* conn,
Ref<MOSDMap> m);
seastar::future<> handle_osd_op(ceph::net::Connection* conn,
Ref<MOSDOp> m);
seastar::future<> handle_pg_log(ceph::net::Connection* conn,
Ref<MOSDPGLog> m);
- seastar::future<> handle_pg_notify(ceph::net::Connection* conn,
- Ref<MOSDPGNotify> m);
- seastar::future<> handle_pg_info(ceph::net::Connection* conn,
- Ref<MOSDPGInfo> m);
- seastar::future<> handle_pg_query(ceph::net::Connection* conn,
- Ref<MOSDPGQuery> m);
seastar::future<> committed_osd_maps(version_t first,
version_t last,
Ref<MOSDMap> m);
+
void check_osdmap_features();
- // order the promises in descending order of the waited osdmap epoch,
- // so we can access all the waiters expecting a map whose epoch is less
- // than a given epoch
- using waiting_peering_t = std::map<epoch_t, seastar::shared_promise<epoch_t>,
- std::greater<epoch_t>>;
- waiting_peering_t waiting_peering;
- // wait for an osdmap whose epoch is greater or equal to given epoch
- seastar::future<epoch_t> wait_for_map(epoch_t epoch);
+
+public:
+ OSDMapGate osdmap_gate;
+
+ ShardServices &get_shard_services() {
+ return shard_services;
+ }
+
seastar::future<> consume_map(epoch_t epoch);
- std::map<spg_t, seastar::shared_future<Ref<PG>>> pgs_creating;
- seastar::future<Ref<PG>> get_pg(
+private:
+ PGMap pg_map;
+
+public:
+ blocking_future<Ref<PG>> get_or_create_pg(
spg_t pgid,
epoch_t epoch,
std::unique_ptr<PGCreateInfo> info);
-
- seastar::future<Ref<PG>> do_peering_event(
- spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt,
- PeeringCtx &rctx);
- seastar::future<> do_peering_event_and_dispatch(
- spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt);
- seastar::future<bool> do_peering_event_and_dispatch_transaction(
- spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt,
- PeeringCtx &rctx);
+ blocking_future<Ref<PG>> wait_for_pg(
+ spg_t pgid);
seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
bool should_restart() const;
seastar::future<> send_beacon();
void update_heartbeat_peers();
+
};
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+
+namespace ceph::osd {
+
+struct OSDConnectionPriv : public ceph::net::Connection::user_private_t {
+ ClientRequest::ConnectionPipeline client_request_conn_pipeline;
+ RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
+};
+
+static OSDConnectionPriv &get_osd_priv(ceph::net::Connection *conn) {
+ if (!conn->has_user_private()) {
+ conn->set_user_private(std::make_unique<OSDConnectionPriv>());
+ }
+ return static_cast<OSDConnectionPriv&>(conn->get_user_private());
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "messages/MOSDOp.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+ClientRequest::ClientRequest(
+ OSD &osd, ceph::net::ConnectionRef conn, Ref<MOSDOp> &&m)
+ : osd(osd), conn(conn), m(m)
+{}
+
+void ClientRequest::print(std::ostream &lhs) const
+{
+ lhs << *m;
+}
+
+void ClientRequest::dump_detail(Formatter *f) const
+{
+}
+
+ClientRequest::ConnectionPipeline &ClientRequest::cp()
+{
+ return get_osd_priv(conn.get()).client_request_conn_pipeline;
+}
+
+ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
+{
+ return pg.client_request_pg_pipeline;
+}
+
+seastar::future<> ClientRequest::start()
+{
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ with_blocking_future(handle.enter(cp().await_map))
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_map_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(m->get_spg()));
+ }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+ return seastar::do_with(
+ std::move(pg), std::move(ref), [this](auto pg, auto op) {
+ return with_blocking_future(
+ handle.enter(pp(*pg).await_map)
+ ).then([this, pg] {
+ return with_blocking_future(
+ pg->osdmap_gate.wait_for_map(m->get_map_epoch()));
+ }).then([this, pg] (auto) {
+ return with_blocking_future(handle.enter(pp(*pg).process));
+ }).then([this, pg] {
+ return pg->handle_op(conn.get(), std::move(m));
+ });
+ });
+ });
+ return seastar::make_ready_future();
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDOp;
+
+namespace ceph::osd {
+class PG;
+class OSD;
+
+class ClientRequest final : public OperationT<ClientRequest> {
+ OSD &osd;
+ ceph::net::ConnectionRef conn;
+ Ref<MOSDOp> m;
+ OrderedPipelinePhase::Handle handle;
+
+public:
+ class ConnectionPipeline {
+ OrderedPipelinePhase await_map = {
+ "ClientRequest::ConnectionPipeline::await_map"
+ };
+ OrderedPipelinePhase get_pg = {
+ "ClientRequest::ConnectionPipeline::get_pg"
+ };
+ friend class ClientRequest;
+ };
+ class PGPipeline {
+ OrderedPipelinePhase await_map = {
+ "ClientRequest::PGPipeline::await_map"
+ };
+ OrderedPipelinePhase process = {
+ "ClientRequest::PGPipeline::process"
+ };
+ friend class ClientRequest;
+ };
+
+ static constexpr OperationTypeCode type = OperationTypeCode::client_request;
+
+ ClientRequest(OSD &osd, ceph::net::ConnectionRef, Ref<MOSDOp> &&m);
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+
+private:
+ ConnectionPipeline &cp();
+ PGPipeline &pp(PG &pg);
+};
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "osd/PeeringState.h"
+
+#include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGNotify.h"
+#include "messages/MOSDPGQuery.h"
+#include "messages/MOSDPGCreate2.h"
+
+#include "common/Formatter.h"
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_operations/compound_peering_request.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace {
+using namespace ceph::osd;
+
+struct compound_state {
+ seastar::promise<BufferedRecoveryMessages> promise;
+ BufferedRecoveryMessages ctx;
+ ~compound_state() {
+ promise.set_value(std::move(ctx));
+ }
+};
+using compound_state_ref = seastar::lw_shared_ptr<compound_state>;
+
+class PeeringSubEvent : public RemotePeeringEvent {
+ compound_state_ref state;
+public:
+ template <typename... Args>
+ PeeringSubEvent(compound_state_ref state, Args &&... args) :
+ RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
+
+ seastar::future<> complete_rctx(Ref<ceph::osd::PG> pg) final {
+ logger().debug("{}: submitting ctx transaction", *this);
+ state->ctx.accept_buffered_messages(ctx);
+ state = {};
+ if (!pg) {
+ ceph_assert(ctx.transaction.empty());
+ return seastar::now();
+ } else {
+ return osd.get_shard_services().dispatch_context_transaction(
+ pg->get_collection_ref(), ctx);
+ }
+ }
+};
+
+std::vector<OperationRef> handle_pg_create(
+ OSD &osd,
+ ceph::net::ConnectionRef conn,
+ compound_state_ref state,
+ Ref<MOSDPGCreate2> m)
+{
+ std::vector<OperationRef> ret;
+ for (auto &p : m->pgs) {
+ const spg_t &pgid = p.first;
+ const auto &[created, created_stamp] = p.second;
+ auto q = m->pg_extra.find(pgid);
+ ceph_assert(q != m->pg_extra.end());
+ logger().debug(
+ "{}, {} {} e{} @{} history {} pi {}",
+ __func__,
+ pgid,
+ created,
+ created_stamp,
+ q->second.first,
+ q->second.second);
+ if (!q->second.second.empty() &&
+ m->epoch < q->second.second.get_bounds().second) {
+ logger().error(
+ "got pg_create on {} epoch {} unmatched past_intervals (history {})",
+ pgid,
+ m->epoch,
+ q->second.second,
+ q->second.first);
+ } else {
+ auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(),
+ pgid,
+ m->epoch,
+ m->epoch,
+ NullEvt(),
+ true,
+ new PGCreateInfo(
+ pgid,
+ m->epoch,
+ q->second.first,
+ q->second.second,
+ true));
+ }
+ }
+ return ret;
+}
+
+std::vector<OperationRef> handle_pg_notify(
+ OSD &osd,
+ ceph::net::ConnectionRef conn,
+ compound_state_ref state,
+ Ref<MOSDPGNotify> m)
+{
+ std::vector<OperationRef> ret;
+ ret.reserve(m->get_pg_list().size());
+ const int from = m->get_source().num();
+ for (auto &p : m->get_pg_list()) {
+ auto& [pg_notify, past_intervals] = p;
+ spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+ MNotifyRec notify{pgid,
+ pg_shard_t{from, pg_notify.from},
+ pg_notify,
+ 0, // the features is not used
+ past_intervals};
+ logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
+ auto create_info = new PGCreateInfo{
+ pgid,
+ pg_notify.query_epoch,
+ pg_notify.info.history,
+ past_intervals,
+ false};
+ auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(from, pg_notify.from),
+ pgid,
+ pg_notify.epoch_sent,
+ pg_notify.query_epoch,
+ notify,
+ true, // requires_pg
+ create_info);
+ op->start();
+ ret.push_back(op);
+ }
+ return ret;
+}
+
+std::vector<OperationRef> handle_pg_info(
+ OSD &osd,
+ ceph::net::ConnectionRef conn,
+ compound_state_ref state,
+ Ref<MOSDPGInfo> m)
+{
+ std::vector<OperationRef> ret;
+ ret.reserve(m->pg_list.size());
+ const int from = m->get_source().num();
+ for (auto &p : m->pg_list) {
+ auto& pg_notify = p.first;
+ spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+ logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
+ MInfoRec info{pg_shard_t{from, pg_notify.from},
+ pg_notify.info,
+ pg_notify.epoch_sent};
+ auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(from, pg_notify.from),
+ pgid,
+ pg_notify.epoch_sent,
+ pg_notify.query_epoch,
+ std::move(info));
+ ret.push_back(op);
+ }
+ return ret;
+}
+
+class QuerySubEvent : public PeeringSubEvent {
+public:
+ template <typename... Args>
+ QuerySubEvent(Args &&... args) :
+ PeeringSubEvent(std::forward<Args>(args)...) {}
+
+ void on_pg_absent() final {
+ logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
+ pg_info_t empty(pgid);
+ ctx.notify_list[from.osd].emplace_back(
+ pg_notify_t(
+ from.shard, pgid.shard,
+ evt.get_epoch_sent(),
+ osd.get_shard_services().get_osdmap()->get_epoch(),
+ empty),
+ PastIntervals());
+ }
+};
+
+std::vector<OperationRef> handle_pg_query(
+ OSD &osd,
+ ceph::net::ConnectionRef conn,
+ compound_state_ref state,
+ Ref<MOSDPGQuery> m)
+{
+ std::vector<OperationRef> ret;
+ ret.reserve(m->pg_list.size());
+ const int from = m->get_source().num();
+ for (auto &p : m->pg_list) {
+ auto& [pgid, pg_query] = p;
+ MQuery query{pgid, pg_shard_t{from, pg_query.from},
+ pg_query, pg_query.epoch_sent};
+ logger().debug("handle_pg_query on {} from {}", pgid, from);
+ auto op = osd.get_shard_services().start_operation<QuerySubEvent>(
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(from, pg_query.from),
+ pgid,
+ pg_query.epoch_sent,
+ pg_query.epoch_sent,
+ std::move(query));
+ ret.push_back(op);
+ }
+ return ret;
+}
+
+struct SubOpBlocker : BlockerT<SubOpBlocker> {
+ static constexpr const char * type_name = "CompoundOpBlocker";
+
+ std::vector<OperationRef> subops;
+ SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+
+ virtual void dump_detail(Formatter *f) const {
+ f->open_array_section("dependent_operations");
+ {
+ for (auto &i : subops) {
+ i->dump_brief(f);
+ }
+ }
+ f->close_section();
+ }
+};
+
+} // namespace
+
+namespace ceph::osd {
+
+CompoundPeeringRequest::CompoundPeeringRequest(
+ OSD &osd, ceph::net::ConnectionRef conn, Ref<Message> m)
+ : osd(osd),
+ conn(conn),
+ m(m)
+{}
+
+void CompoundPeeringRequest::print(std::ostream &lhs) const
+{
+ lhs << *m;
+}
+
+void CompoundPeeringRequest::dump_detail(Formatter *f) const
+{
+ f->dump_stream("message") << *m;
+}
+
+seastar::future<> CompoundPeeringRequest::start()
+{
+ logger().info("{}: starting", *this);
+ auto state = seastar::make_lw_shared<compound_state>();
+ auto blocker = std::make_unique<SubOpBlocker>(
+ [&] {
+ switch (m->get_type()) {
+ case MSG_OSD_PG_CREATE2:
+ return handle_pg_create(
+ osd,
+ conn,
+ state,
+ boost::static_pointer_cast<MOSDPGCreate2>(m));
+ case MSG_OSD_PG_NOTIFY:
+ return handle_pg_notify(
+ osd,
+ conn,
+ state,
+ boost::static_pointer_cast<MOSDPGNotify>(m));
+ case MSG_OSD_PG_INFO:
+ return handle_pg_info(
+ osd,
+ conn,
+ state,
+ boost::static_pointer_cast<MOSDPGInfo>(m));
+ case MSG_OSD_PG_QUERY:
+ return handle_pg_query(
+ osd,
+ conn,
+ state,
+ boost::static_pointer_cast<MOSDPGQuery>(m));
+ default:
+ ceph_assert("Invalid message type" == 0);
+ return std::vector<OperationRef>();
+ }
+ }());
+
+ add_blocker(blocker.get());
+ IRef ref = this;
+ logger().info("{}: about to fork future", *this);
+ state->promise.get_future().then(
+ [this, blocker=std::move(blocker)](auto &&ctx) {
+ clear_blocker(blocker.get());
+ logger().info("{}: sub events complete", *this);
+ return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+ }).then([this, ref=std::move(ref)] {
+ logger().info("{}: complete", *this);
+ });
+
+ logger().info("{}: forked, returning", *this);
+ return seastar::now();
+}
+
+} // namespace ceph::osd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "msg/MessageRef.h"
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace ceph::osd {
+
+class OSD;
+class PG;
+
+using osd_id_t = int;
+
+class CompoundPeeringRequest : public OperationT<CompoundPeeringRequest> {
+public:
+ static constexpr OperationTypeCode type =
+ OperationTypeCode::compound_peering_request;
+
+private:
+ OSD &osd;
+ ceph::net::ConnectionRef conn;
+ Ref<Message> m;
+
+public:
+ CompoundPeeringRequest(
+ OSD &osd, ceph::net::ConnectionRef conn, Ref<Message> m);
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+};
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_connection_priv.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+void PeeringEvent::print(std::ostream &lhs) const
+{
+ lhs << "PeeringEvent("
+ << "from=" << from
+ << " pgid=" << pgid
+ << " sent=" << evt.get_epoch_sent()
+ << " requested=" << evt.get_epoch_requested()
+ << " evt=" << evt.get_desc()
+ << ")";
+}
+
+void PeeringEvent::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PeeringEvent");
+ f->dump_stream("from") << from;
+ f->dump_stream("pgid") << pgid;
+ f->dump_int("sent", evt.get_epoch_sent());
+ f->dump_int("requested", evt.get_epoch_requested());
+ f->dump_string("evt", evt.get_desc());
+ f->close_section();
+}
+
+
+PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
+{
+ return pg.peering_request_pg_pipeline;
+}
+
+seastar::future<> PeeringEvent::start()
+{
+
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ get_pg().then([this](Ref<PG> pg) {
+ if (!pg) {
+ logger().debug("{}: pg absent, did not create", *this);
+ on_pg_absent();
+ handle.exit();
+ return complete_rctx(pg);
+ } else {
+ logger().debug("{}: pg present", *this);
+ return with_blocking_future(handle.enter(pp(*pg).await_map)
+ ).then([this, pg] {
+ return with_blocking_future(
+ pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+ }).then([this, pg](auto) {
+ return with_blocking_future(handle.enter(pp(*pg).process));
+ }).then([this, pg] {
+ pg->do_peering_event(evt, ctx);
+ handle.exit();
+ return complete_rctx(pg);
+ });
+ }
+ }).then([this, ref=std::move(ref)] {
+ logger().debug("{}: complete", *this);
+ });
+ return seastar::make_ready_future();
+}
+
+void PeeringEvent::on_pg_absent()
+{
+ logger().debug("{}: pg absent, dropping", *this);
+}
+
+seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
+{
+ logger().debug("{}: submitting ctx", *this);
+ return shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(ctx));
+}
+
+RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
+{
+ return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+}
+
+seastar::future<Ref<PG>> RemotePeeringEvent::get_pg() {
+ return with_blocking_future(
+ handle.enter(cp().await_map)
+ ).then([this] {
+ return with_blocking_future(
+ osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
+ }).then([this](auto epoch) {
+ logger().debug("{}: got map {}", *this, epoch);
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(
+ osd.get_or_create_pg(
+ pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
+ });
+}
+
+seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
+ return seastar::make_ready_future<Ref<PG>>(pg);
+}
+
+LocalPeeringEvent::~LocalPeeringEvent() {}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
+
+namespace ceph::osd {
+
+class OSD;
+class ShardServices;
+class PG;
+
+class PeeringEvent : public OperationT<PeeringEvent> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::peering_event;
+
+ class PGPipeline {
+ OrderedPipelinePhase await_map = {
+ "PeeringEvent::PGPipeline::await_map"
+ };
+ OrderedPipelinePhase process = {
+ "PeeringEvent::PGPipeline::process"
+ };
+ friend class PeeringEvent;
+ };
+
+protected:
+ OrderedPipelinePhase::Handle handle;
+ PGPipeline &pp(PG &pg);
+
+ ShardServices &shard_services;
+ PeeringCtx ctx;
+ pg_shard_t from;
+ spg_t pgid;
+ PGPeeringEvent evt;
+
+ const pg_shard_t get_from() const {
+ return from;
+ }
+
+ const spg_t get_pgid() const {
+ return pgid;
+ }
+
+ const PGPeeringEvent &get_event() const {
+ return evt;
+ }
+
+ virtual void on_pg_absent();
+ virtual seastar::future<> complete_rctx(Ref<PG>);
+ virtual seastar::future<Ref<PG>> get_pg() = 0;
+
+public:
+ template <typename... Args>
+ PeeringEvent(
+ ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
+ Args&&... args) :
+ shard_services(shard_services),
+ from(from),
+ pgid(pgid),
+ evt(std::forward<Args>(args)...)
+ {}
+
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+};
+
+class RemotePeeringEvent : public PeeringEvent {
+protected:
+ OSD &osd;
+ ceph::net::ConnectionRef conn;
+
+ seastar::future<Ref<PG>> get_pg() final;
+
+public:
+ class ConnectionPipeline {
+ OrderedPipelinePhase await_map = {
+ "PeeringRequest::ConnectionPipeline::await_map"
+ };
+ OrderedPipelinePhase get_pg = {
+ "PeeringRequest::ConnectionPipeline::get_pg"
+ };
+ friend class RemotePeeringEvent;
+ };
+
+ template <typename... Args>
+ RemotePeeringEvent(OSD &osd, ceph::net::ConnectionRef conn, Args&&... args) :
+ PeeringEvent(std::forward<Args>(args)...),
+ osd(osd),
+ conn(conn)
+ {}
+
+private:
+ ConnectionPipeline &cp();
+};
+
+class LocalPeeringEvent final : public PeeringEvent {
+protected:
+ seastar::future<Ref<PG>> get_pg() final;
+
+ Ref<PG> pg;
+
+public:
+ template <typename... Args>
+ LocalPeeringEvent(Ref<PG> pg, Args&&... args) :
+ PeeringEvent(std::forward<Args>(args)...),
+ pg(pg)
+ {}
+
+ virtual ~LocalPeeringEvent();
+};
+
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/shard_services.h"
+#include "common/Formatter.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+void OSDMapGate::OSDMapBlocker::dump_detail(Formatter *f) const
+{
+ f->open_object_section("OSDMapGate");
+ f->dump_int("epoch", epoch);
+ f->close_section();
+}
+
+blocking_future<epoch_t> OSDMapGate::wait_for_map(epoch_t epoch)
+{
+ if (current >= epoch) {
+ return make_ready_blocking_future<epoch_t>(current);
+ } else {
+ logger().info("evt epoch is {}, i have {}, will wait", epoch, current);
+ auto &blocker = waiting_peering.emplace(
+ epoch, make_pair(blocker_type, epoch)).first->second;
+ auto fut = blocker.promise.get_shared_future();
+ if (shard_services) {
+ return blocker.make_blocking_future(
+ (*shard_services).get().osdmap_subscribe(current, true).then(
+ [fut=std::move(fut)]() mutable {
+ return std::move(fut);
+ }));
+ } else {
+ return blocker.make_blocking_future(std::move(fut));
+ }
+ }
+}
+
+void OSDMapGate::got_map(epoch_t epoch) {
+ current = epoch;
+ auto first = waiting_peering.begin();
+ auto last = waiting_peering.upper_bound(epoch);
+ std::for_each(first, last, [epoch, this](auto& blocked_requests) {
+ blocked_requests.second.promise.set_value(epoch);
+ });
+ waiting_peering.erase(first, last);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <functional>
+#include <map>
+#include <optional>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/types.h"
+#include "crimson/osd/osd_operation.h"
+
+namespace ceph {
+ class Formatter;
+ namespace osd {
+ class ShardServices;
+ }
+}
+
+namespace ceph::osd {
+
+class OSDMapGate {
+ struct OSDMapBlocker : public Blocker {
+ const char * type_name;
+ epoch_t epoch;
+
+ OSDMapBlocker(std::pair<const char *, epoch_t> args)
+ : type_name(args.first), epoch(args.second) {}
+
+ OSDMapBlocker(const OSDMapBlocker &) = delete;
+ OSDMapBlocker(OSDMapBlocker &&) = delete;
+ OSDMapBlocker &operator=(const OSDMapBlocker &) = delete;
+ OSDMapBlocker &operator=(OSDMapBlocker &&) = delete;
+
+ seastar::shared_promise<epoch_t> promise;
+
+ void dump_detail(Formatter *f) const final;
+ const char *get_type_name() const final {
+ return type_name;
+ }
+ };
+
+ // order the promises in descending order of the waited osdmap epoch,
+ // so we can access all the waiters expecting a map whose epoch is less
+ // than a given epoch
+ using waiting_peering_t = std::map<epoch_t,
+ OSDMapBlocker,
+ std::greater<epoch_t>>;
+ const char *blocker_type;
+ waiting_peering_t waiting_peering;
+ epoch_t current = 0;
+ std::optional<std::reference_wrapper<ShardServices>> shard_services;
+public:
+ OSDMapGate(
+ const char *blocker_type,
+ std::optional<std::reference_wrapper<ShardServices>> shard_services)
+ : blocker_type(blocker_type), shard_services(shard_services) {}
+
+ // wait for an osdmap whose epoch is greater or equal to given epoch
+ blocking_future<epoch_t> wait_for_map(epoch_t epoch);
+ void got_map(epoch_t epoch);
+};
+
+}
#include "osd/OSDMap.h"
+#include "os/Transaction.h"
+
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyan_collection.h"
-#include "crimson/os/futurized_store.h"
#include "os/Transaction.h"
+#include "crimson/os/cyan_store.h"
+
#include "crimson/osd/exceptions.h"
#include "crimson/osd/pg_meta.h"
-
-#include "pg_backend.h"
+#include "crimson/osd/pg_backend.h"
+#include "crimson/osd/osd_operations/peering_event.h"
namespace {
seastar::logger& logger() {
pg_whoami{pg_shard},
coll_ref(shard_services.get_store().open_collection(coll)),
pgmeta_oid{pgid.make_pgmeta_oid()},
+ osdmap_gate("PG::osdmap_gate", std::nullopt),
shard_services{shard_services},
osdmap{osdmap},
backend(
peering_state.set_backend_predicates(
new ReadablePredicate(pg_whoami),
new RecoverablePredicate());
+ osdmap_gate.got_map(osdmap->get_epoch());
}
+PG::~PG() {}
+
bool PG::try_flush_or_schedule_async() {
-// FIXME once there's a good way to schedule an "async" peering event
-#if 0
shard_services.get_store().do_transaction(
coll_ref,
ObjectStore::Transaction()).then(
- [this, epoch=peering_state.get_osdmap()->get_epoch()](){
- if (!peering_state.pg_has_reset_since(epoch)) {
- PeeringCtx rctx;
- auto evt = PeeringState::IntervalFlush();
- do_peering_event(evt, rctx);
- return shard_services.dispatch_context(std::move(rctx));
- } else {
- return seastar::now();
- }
+ [this, epoch=peering_state.get_osdmap()->get_epoch()]() {
+ return shard_services.start_operation<LocalPeeringEvent>(
+ this,
+ shard_services,
+ pg_whoami,
+ pgid,
+ epoch,
+ epoch,
+ PeeringState::IntervalFlush());
});
return false;
-#endif
- return true;
}
void PG::log_state_enter(const char *state) {
newacting,
acting_primary,
rctx);
+ osdmap_gate.got_map(next_map->get_epoch());
}
void PG::handle_activate_map(PeeringCtx &rctx)
#include "common/dout.h"
#include "crimson/net/Fwd.h"
#include "os/Transaction.h"
-#include "crimson/osd/shard_services.h"
#include "osd/osd_types.h"
#include "osd/osd_internal_types.h"
#include "osd/PeeringState.h"
#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/osdmap_gate.h"
+
class OSDMap;
class MQuery;
class PGBackend;
class FuturizedStore;
}
+namespace ceph::osd {
+class ClientRequest;
+
class PG : public boost::intrusive_ref_counter<
PG,
boost::thread_unsafe_counter>,
using ec_profile_t = std::map<std::string,std::string>;
using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+ ClientRequest::PGPipeline client_request_pg_pipeline;
+ PeeringEvent::PGPipeline peering_request_pg_pipeline;
+
spg_t pgid;
pg_shard_t pg_whoami;
coll_t coll;
pg_pool_t&& pool,
std::string&& name,
cached_map_t osdmap,
- ceph::osd::ShardServices &shard_services,
+ ShardServices &shard_services,
ec_profile_t profile);
+ ~PG();
+
// EpochSource
epoch_t get_osdmap_epoch() const final {
return peering_state.get_osdmap_epoch();
t.register_on_commit(
new LambdaContext([this, on_commit](){
PeeringCtx rctx;
- do_peering_event(on_commit, rctx);
+ do_peering_event(*on_commit, rctx);
shard_services.dispatch_context(std::move(rctx));
}));
}
void do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx);
- void do_peering_event(
- std::unique_ptr<PGPeeringEvent> evt,
- PeeringCtx &rctx) {
- return do_peering_event(*evt, rctx);
- }
- void do_peering_event(
- PGPeeringEventRef evt,
- PeeringCtx &rctx) {
- return do_peering_event(*evt, rctx);
- }
void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
void handle_activate_map(PeeringCtx &rctx);
uint64_t limit);
private:
+ OSDMapGate osdmap_gate;
ShardServices &shard_services;
cached_map_t osdmap;
seastar::future<> wait_for_active();
friend std::ostream& operator<<(std::ostream&, const PG& pg);
+ friend class ClientRequest;
+ friend class PeeringEvent;
};
std::ostream& operator<<(std::ostream&, const PG& pg);
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/pg_map.h"
+
+#include "crimson/osd/pg.h"
+#include "common/Formatter.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+PGMap::PGCreationState::PGCreationState(spg_t pgid) : pgid(pgid) {}
+PGMap::PGCreationState::~PGCreationState() {}
+
+void PGMap::PGCreationState::dump_detail(Formatter *f) const
+{
+ f->dump_stream("pgid") << pgid;
+ f->dump_bool("creating", creating);
+}
+
+std::pair<blocking_future<Ref<PG>>, bool> PGMap::get_pg(spg_t pgid, bool wait)
+{
+ if (auto pg = pgs.find(pgid); pg != pgs.end()) {
+ return make_pair(make_ready_blocking_future<Ref<PG>>(pg->second), true);
+ } else if (!wait) {
+ return make_pair(make_ready_blocking_future<Ref<PG>>(nullptr), true);
+ } else {
+ auto &state = pgs_creating.emplace(pgid, pgid).first->second;
+ return make_pair(
+ state.make_blocking_future(state.promise.get_shared_future()),
+ state.creating);
+ }
+}
+
+void PGMap::set_creating(spg_t pgid)
+{
+ logger().debug("Creating {}", pgid);
+ ceph_assert(pgs.count(pgid) == 0);
+ auto pg = pgs_creating.find(pgid);
+ ceph_assert(pg != pgs_creating.end());
+ ceph_assert(pg->second.creating == false);
+ pg->second.creating = true;
+}
+
+void PGMap::pg_created(spg_t pgid, Ref<PG> pg)
+{
+ logger().debug("Created {}", pgid);
+ ceph_assert(!pgs.count(pgid));
+ pgs.emplace(pgid, pg);
+
+ auto state = pgs_creating.find(pgid);
+ ceph_assert(state != pgs_creating.end());
+ state->second.promise.set_value(pg);
+ pgs_creating.erase(pgid);
+}
+
+void PGMap::pg_loaded(spg_t pgid, Ref<PG> pg)
+{
+ ceph_assert(!pgs.count(pgid));
+ pgs.emplace(pgid, pg);
+}
+
+PGMap::~PGMap() {}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/types.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+
+namespace ceph::osd {
+class PG;
+
+class PGMap {
+ struct PGCreationState : BlockerT<PGCreationState> {
+ static constexpr const char * type_name = "PGCreation";
+
+ void dump_detail(Formatter *f) const final;
+
+ spg_t pgid;
+ seastar::shared_promise<Ref<PG>> promise;
+ bool creating = false;
+ PGCreationState(spg_t pgid);
+
+ PGCreationState(const PGCreationState &) = delete;
+ PGCreationState(PGCreationState &&) = delete;
+ PGCreationState &operator=(const PGCreationState &) = delete;
+ PGCreationState &operator=(PGCreationState &&) = delete;
+
+ ~PGCreationState();
+ };
+
+ std::map<spg_t, PGCreationState> pgs_creating;
+ std::map<spg_t, Ref<PG>> pgs;
+
+public:
+ /**
+ * Get future for pg with a bool indicating whether it's already being
+ * created.
+ */
+ std::pair<blocking_future<Ref<PG>>, bool> get_pg(spg_t pgid, bool wait=true);
+
+ /**
+ * Set creating
+ */
+ void set_creating(spg_t pgid);
+
+ /**
+ * Set newly created pg
+ */
+ void pg_created(spg_t pgid, Ref<PG> pg);
+
+ /**
+ * Add newly loaded pg
+ */
+ void pg_loaded(spg_t pgid, Ref<PG> pg);
+
+ decltype(pgs) &get_pgs() { return pgs; }
+
+ PGMap() = default;
+ ~PGMap();
+};
+
+}
public_msgr(public_msgr),
monc(monc),
mgrc(mgrc),
- store(store) {
+ store(store)
+{
perf = build_osd_logger(&cct);
cct.get_perfcounters_collection()->add(perf);
}
seastar::future<> ShardServices::dispatch_context_messages(
- PeeringCtx &ctx)
+ BufferedRecoveryMessages &&ctx)
{
auto ret = seastar::when_all_succeed(
seastar::parallel_for_each(std::move(ctx.notify_list),
PeeringCtx &&ctx)
{
ceph_assert(col || ctx.transaction.empty());
- return seastar::do_with(
- PeeringCtx{ctx},
- [this, col](auto& todo) {
- return seastar::when_all_succeed(
- dispatch_context_messages(todo),
- col ? dispatch_context_transaction(col, todo) : seastar::now());
- });
+ return seastar::when_all_succeed(
+ dispatch_context_messages(BufferedRecoveryMessages(ctx)),
+ col ? dispatch_context_transaction(col, ctx) : seastar::now());
}
void ShardServices::queue_want_pg_temp(pg_t pgid,
_sent_pg_temp();
}
+void ShardServices::update_map(cached_map_t new_osdmap)
+{
+ osdmap = std::move(new_osdmap);
+}
+
+ShardServices::cached_map_t &ShardServices::get_osdmap()
+{
+ return osdmap;
+}
+
seastar::future<> ShardServices::send_pg_created(pg_t pgid)
{
logger().debug(__func__);
}
}
+seastar::future<> ShardServices::osdmap_subscribe(version_t epoch, bool force_request)
+{
+ logger().info("{}({})", __func__, epoch);
+ if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
+ force_request) {
+ return monc.renew_subs();
+ } else {
+ return seastar::now();
+ }
+}
+
};
#include <boost/intrusive_ptr.hpp>
#include <seastar/core/future.hh>
+#include "osd_operation.h"
#include "msg/MessageRef.h"
#include "crimson/os/cyan_collection.h"
class PerfCounters;
class OSDMap;
class PeeringCtx;
+class BufferedRecoveryMessages;
namespace ceph::osd {
return &cct;
}
+ // Op Tracking
+ OperationRegistry registry;
+
+ template <typename T, typename... Args>
+ typename T::IRef start_operation(Args&&... args) {
+ auto op = registry.create_operation<T>(std::forward<Args>(args)...);
+ op->start();
+ return op;
+ }
+
// Loggers
PerfCounters &get_recoverystate_perf_logger() {
return *recoverystate_perf;
/// Dispatch and reset ctx messages
seastar::future<> dispatch_context_messages(
- PeeringCtx &ctx);
+ BufferedRecoveryMessages &&ctx);
/// Dispatch ctx and dispose of context
seastar::future<> dispatch_context(
private:
cached_map_t osdmap;
public:
- void update_map(cached_map_t new_osdmap) {
- osdmap = std::move(new_osdmap);
- }
- cached_map_t &get_osdmap() {
- return osdmap;
- }
+ void update_map(cached_map_t new_osdmap);
+ cached_map_t &get_osdmap();
// PG Created State
private:
seastar::future<> send_pg_created(pg_t pgid);
seastar::future<> send_pg_created();
void prune_pg_created();
+
+ seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
};