shard_services.cc
osd_operation.cc
osd_operations/client_request.cc
- osd_operations/peering_event.cc
osd_operations/compound_peering_request.cc
+ osd_operations/peering_event.cc
osd_operations/pg_advance_map.cc
+ osd_operations/replicated_request.cc
osdmap_gate.cc
pg_map.cc
${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
+#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/osd_operations/pg_advance_map.h"
-#include "crimson/osd/osd_operations/client_request.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
namespace {
seastar::logger& logger() {
return seastar::now();
case MSG_OSD_PG_LOG:
return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+ case MSG_OSD_REPOP:
+ return handle_rep_op(conn, boost::static_pointer_cast<MOSDRepOp>(m));
case MSG_OSD_REPOPREPLY:
return handle_rep_op_reply(conn, boost::static_pointer_cast<MOSDRepOpReply>(m));
default:
return seastar::now();
}
+seastar::future<> OSD::handle_rep_op(ceph::net::Connection* conn,
+ Ref<MOSDRepOp> m)
+{
+ m->finish_decode();
+ shard_services.start_operation<RepRequest>(
+ *this,
+ conn->get_shared(),
+ std::move(m));
+ return seastar::now();
+}
+
seastar::future<> OSD::handle_rep_op_reply(ceph::net::Connection* conn,
Ref<MOSDRepOpReply> m)
{
const auto& pgs = pg_map.get_pgs();
if (auto pg = pgs.find(m->get_spg()); pg != pgs.end()) {
+ m->finish_decode();
pg->second->handle_rep_op_reply(conn, *m);
} else {
logger().warn("stale reply: {}", *m);
class MOSDMap;
class MOSDOp;
class MOSDRepOpReply;
+class MOSDRepOp;
class OSDMap;
class OSDMeta;
class Heartbeat;
Ref<MOSDMap> m);
seastar::future<> handle_osd_op(ceph::net::Connection* conn,
Ref<MOSDOp> m);
+ seastar::future<> handle_rep_op(ceph::net::Connection* conn,
+ Ref<MOSDRepOp> m);
seastar::future<> handle_rep_op_reply(ceph::net::Connection* conn,
Ref<MOSDRepOpReply> m);
seastar::future<> handle_pg_log(ceph::net::Connection* conn,
#include "crimson/osd/osd_operation.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
namespace ceph::osd {
struct OSDConnectionPriv : public ceph::net::Connection::user_private_t {
ClientRequest::ConnectionPipeline client_request_conn_pipeline;
RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
+ RepRequest::ConnectionPipeline replicated_request_conn_pipeline;
};
static OSDConnectionPriv &get_osd_priv(ceph::net::Connection *conn) {
compound_peering_request = 2,
pg_advance_map = 3,
pg_creation = 4,
- last_op = 5
+ replicated_request = 5,
+ last_op = 6
};
static constexpr const char* const OP_NAMES[] = {
"compound_peering_request",
"pg_advance_map",
"pg_creation",
+ "replicated_request",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "replicated_request.h"
+
+#include "common/Formatter.h"
+#include "messages/MOSDRepOp.h"
+
+#include "crimson/osd/osd.h"
+#include "crimson/osd/osd_connection_priv.h"
+#include "crimson/osd/pg.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+RepRequest::RepRequest(OSD &osd,
+ ceph::net::ConnectionRef&& conn,
+ Ref<MOSDRepOp> &&req)
+ : osd{osd},
+ conn{std::move(conn)},
+ req{req}
+{}
+
+void RepRequest::print(std::ostream& os) const
+{
+ os << "RepRequest("
+ << "from=" << req->from
+ << " req=" << *req
+ << ")";
+}
+
+void RepRequest::dump_detail(Formatter *f) const
+{
+ f->open_object_section("RepRequest");
+ f->dump_stream("reqid") << req->reqid;
+ f->dump_stream("pgid") << req->get_spg();
+ f->dump_unsigned("map_epoch", req->get_map_epoch());
+ f->dump_unsigned("min_epoch", req->get_min_epoch());
+ f->dump_stream("oid") << req->poid;
+ f->dump_stream("from") << req->from;
+ f->close_section();
+}
+
+RepRequest::ConnectionPipeline &RepRequest::cp()
+{
+ return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+}
+
+RepRequest::PGPipeline &RepRequest::pp(PG &pg)
+{
+ return pg.replicated_request_pg_pipeline;
+}
+
+seastar::future<> RepRequest::start()
+{
+ logger().info("{} start", *this);
+ IRef ref = this;
+ return with_blocking_future(handle.enter(cp().await_map))
+ .then([this]() {
+ return with_blocking_future(osd.osdmap_gate.wait_for_map(req->get_min_epoch()));
+ }).then([this](epoch_t epoch) {
+ return with_blocking_future(handle.enter(cp().get_pg));
+ }).then([this] {
+ return with_blocking_future(osd.wait_for_pg(req->get_spg()));
+ }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+ return pg->handle_rep_op(std::move(req));
+ });
+}
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "crimson/net/Connection.h"
+#include "crimson/osd/osd_operation.h"
+#include "crimson/common/type_helpers.h"
+
+class MOSDRepOp;
+
+namespace ceph::osd {
+
+class OSD;
+class PG;
+
+class RepRequest final : public OperationT<RepRequest> {
+public:
+ class ConnectionPipeline {
+ OrderedPipelinePhase await_map = {
+ "RepRequest::ConnectionPipeline::await_map"
+ };
+ OrderedPipelinePhase get_pg = {
+ "RepRequest::ConnectionPipeline::get_pg"
+ };
+ friend RepRequest;
+ };
+ class PGPipeline {
+ OrderedPipelinePhase await_map = {
+ "RepRequest::PGPipeline::await_map"
+ };
+ OrderedPipelinePhase process = {
+ "RepRequest::PGPipeline::process"
+ };
+ friend RepRequest;
+ };
+ static constexpr OperationTypeCode type = OperationTypeCode::replicated_request;
+ RepRequest(OSD&, ceph::net::ConnectionRef&&, Ref<MOSDRepOp>&&);
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+
+private:
+ ConnectionPipeline &cp();
+ PGPipeline &pp(PG &pg);
+
+ OSD &osd;
+ ceph::net::ConnectionRef conn;
+ Ref<MOSDRepOp> req;
+ OrderedPipelinePhase::Handle handle;
+};
+
+}
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
#include "osd/OSDMap.h"
});
}
+seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
+{
+ ceph::os::Transaction txn;
+ auto encoded_txn = req->get_data().cbegin();
+ decode(txn, encoded_txn);
+ return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
+ .then([req, lcod=peering_state.get_info().last_complete, this] {
+ peering_state.update_last_complete_ondisk(lcod);
+ const auto map_epoch = get_osdmap_epoch();
+ auto reply = make_message<MOSDRepOpReply>(
+ req.get(), pg_whoami, 0,
+ map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(lcod);
+ return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
+ });
+}
+
void PG::handle_rep_op_reply(ceph::net::Connection* conn,
const MOSDRepOpReply& m)
{
#include "crimson/common/type_helpers.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/replicated_request.h"
#include "crimson/osd/shard_services.h"
#include "crimson/osd/osdmap_gate.h"
ClientRequest::PGPipeline client_request_pg_pipeline;
PeeringEvent::PGPipeline peering_request_pg_pipeline;
+ RepRequest::PGPipeline replicated_request_pg_pipeline;
spg_t pgid;
pg_shard_t pg_whoami;
void handle_initialize(PeeringCtx &rctx);
seastar::future<> handle_op(ceph::net::Connection* conn,
Ref<MOSDOp> m);
+ seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
void handle_rep_op_reply(ceph::net::Connection* conn,
const MOSDRepOpReply& m);
friend std::ostream& operator<<(std::ostream&, const PG& pg);
friend class ClientRequest;
- friend class PeeringEvent;
friend class PGAdvanceMap;
+ friend class PeeringEvent;
+ friend class RepRequest;
};
std::ostream& operator<<(std::ostream&, const PG& pg);