case CEPH_MSG_OSD_OP:
return handle_osd_op(conn, boost::static_pointer_cast<MOSDOp>(m));
case MSG_OSD_PG_CREATE2:
- case MSG_OSD_PG_NOTIFY:
- case MSG_OSD_PG_INFO:
- case MSG_OSD_PG_QUERY:
shard_services.start_operation<CompoundPeeringRequest>(
*this,
conn->get_shared(),
m);
return seastar::now();
+ case MSG_OSD_PG_NOTIFY2:
+ [[fallthrough]];
+ case MSG_OSD_PG_INFO2:
+ [[fallthrough]];
+ case MSG_OSD_PG_QUERY2:
+ [[fallthrough]];
case MSG_OSD_PG_LOG:
- return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+ return handle_peering_op(conn, boost::static_pointer_cast<MOSDPeeringOp>(m));
case MSG_OSD_REPOP:
return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
case MSG_OSD_REPOPREPLY:
heartbeat->update_peers(whoami);
}
-seastar::future<> OSD::handle_pg_log(
+seastar::future<> OSD::handle_peering_op(
ceph::net::Connection* conn,
- Ref<MOSDPGLog> m)
+ Ref<MOSDPeeringOp> m)
{
const int from = m->get_source().num();
- logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
+ logger().debug("handle_peering_op on {} from {}", m->get_spg(), from);
shard_services.start_operation<RemotePeeringEvent>(
*this,
conn->get_shared(),
shard_services,
- pg_shard_t(from, m->from),
- spg_t(m->info.pgid.pgid, m->to),
+ pg_shard_t{from, m->get_spg().shard},
+ m->get_spg(),
std::move(*m->get_event()));
return seastar::now();
}
Ref<MOSDRepOp> m);
seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
Ref<MOSDRepOpReply> m);
- seastar::future<> handle_pg_log(ceph::net::Connection* conn,
- Ref<MOSDPGLog> m);
+ seastar::future<> handle_peering_op(ceph::net::Connection* conn,
+ Ref<MOSDPeeringOp> m);
seastar::future<> committed_osd_maps(version_t first,
version_t last,
#include "osd/PeeringState.h"
-#include "messages/MOSDPGInfo.h"
-#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGCreate2.h"
struct compound_state {
seastar::promise<BufferedRecoveryMessages> promise;
BufferedRecoveryMessages ctx;
+ compound_state()
+ // assuming crimson-osd won't need to be compatible with pre-octopus
+ // releases
+ : ctx{ceph_release_t::octopus}
+ {}
~compound_state() {
promise.set_value(std::move(ctx));
}
return ret;
}
-std::vector<OperationRef> handle_pg_notify(
- OSD &osd,
- ceph::net::ConnectionRef conn,
- compound_state_ref state,
- Ref<MOSDPGNotify> m)
-{
- std::vector<OperationRef> ret;
- ret.reserve(m->get_pg_list().size());
- const int from = m->get_source().num();
- for (auto& pg_notify : m->get_pg_list()) {
- spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
- MNotifyRec notify{pgid,
- pg_shard_t{from, pg_notify.from},
- pg_notify,
- 0}; // the features is not used
- logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
- auto create_info = new PGCreateInfo{
- pgid,
- pg_notify.query_epoch,
- pg_notify.info.history,
- pg_notify.past_intervals,
- false};
- auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
- state,
- osd,
- conn,
- osd.get_shard_services(),
- pg_shard_t(from, pg_notify.from),
- pgid,
- pg_notify.epoch_sent,
- pg_notify.query_epoch,
- notify,
- true, // requires_pg
- create_info).first;
- ret.push_back(op);
- }
- return ret;
-}
-
-std::vector<OperationRef> handle_pg_info(
- OSD &osd,
- ceph::net::ConnectionRef conn,
- compound_state_ref state,
- Ref<MOSDPGInfo> m)
-{
- std::vector<OperationRef> ret;
- ret.reserve(m->pg_list.size());
- const int from = m->get_source().num();
- for (auto& pg_notify : m->pg_list) {
- spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
- logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
- MInfoRec info{pg_shard_t{from, pg_notify.from},
- pg_notify.info,
- pg_notify.epoch_sent};
- auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
- state,
- osd,
- conn,
- osd.get_shard_services(),
- pg_shard_t(from, pg_notify.from),
- pgid,
- pg_notify.epoch_sent,
- pg_notify.query_epoch,
- std::move(info)).first;
- ret.push_back(op);
- }
- return ret;
-}
-
class QuerySubEvent : public PeeringSubEvent {
public:
template <typename... Args>
void on_pg_absent() final {
logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
pg_info_t empty(pgid);
- ctx.notify_list[from.osd].emplace_back(
+ ctx.send_notify(from.osd,
pg_notify_t(
from.shard, pgid.shard,
evt.get_epoch_sent(),
conn,
state,
boost::static_pointer_cast<MOSDPGCreate2>(m));
- case MSG_OSD_PG_NOTIFY:
- return handle_pg_notify(
- osd,
- conn,
- state,
- boost::static_pointer_cast<MOSDPGNotify>(m));
- case MSG_OSD_PG_INFO:
- return handle_pg_info(
- osd,
- conn,
- state,
- boost::static_pointer_cast<MOSDPGInfo>(m));
case MSG_OSD_PG_QUERY:
return handle_pg_query(
osd,
namespace ceph::osd {
-PGAdvanceMap::PGAdvanceMap(
- OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to)
- : osd(osd), pg(pg), from(from), to(to), do_init(false) {}
-
PGAdvanceMap::PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
PeeringCtx &&rctx, bool do_init)
const bool do_init;
public:
- PGAdvanceMap(
- OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to);
PGAdvanceMap(
OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
PeeringCtx &&rctx, bool do_init);
seastar::future<> ShardServices::dispatch_context_messages(
BufferedRecoveryMessages &&ctx)
{
- auto ret = seastar::when_all_succeed(
- seastar::parallel_for_each(std::move(ctx.notify_list),
- [this](auto& osd_notifies) {
- auto& [peer, notifies] = osd_notifies;
- auto m = make_message<MOSDPGNotify>(osdmap->get_epoch(),
- std::move(notifies));
- logger().debug("dispatch_context_messages sending notify to {}", peer);
- return send_to_osd(peer, m, osdmap->get_epoch());
- }),
- seastar::parallel_for_each(std::move(ctx.query_map),
- [this](auto& osd_queries) {
- auto& [peer, queries] = osd_queries;
- auto m = make_message<MOSDPGQuery>(osdmap->get_epoch(),
- std::move(queries));
- logger().debug("dispatch_context_messages sending query to {}", peer);
- return send_to_osd(peer, m, osdmap->get_epoch());
- }),
- seastar::parallel_for_each(std::move(ctx.info_map),
- [this](auto& osd_infos) {
- auto& [peer, infos] = osd_infos;
- auto m = make_message<MOSDPGInfo>(osdmap->get_epoch(),
- std::move(infos));
- logger().debug("dispatch_context_messages sending info to {}", peer);
- return send_to_osd(peer, m, osdmap->get_epoch());
- }));
- ctx.notify_list.clear();
- ctx.query_map.clear();
- ctx.info_map.clear();
+ auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
+ [this](auto& osd_messages) {
+ auto& [peer, messages] = osd_messages;
+ logger().debug("dispatch_context_messages sending messages to {}", peer);
+ return seastar::parallel_for_each(std::move(messages), [=](auto& m) {
+ return send_to_osd(peer, m, osdmap->get_epoch());
+ });
+ });
+ ctx.message_map.clear();
return ret;
}