osd_operations/client_request.cc
osd_operations/peering_event.cc
osd_operations/compound_peering_request.cc
+ osd_operations/pg_advance_map.cc
osdmap_gate.cc
pg_map.cc
${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
#include "osd/PeeringState.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
#include "crimson/osd/osd_operations/peering_event.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
#include "crimson/osd/osd_operations/client_request.h"
namespace {
false,
rctx.transaction);
- pg->handle_initialize(rctx);
- pg->handle_activate_map(rctx);
-
- logger().info("{} new pg {}", __func__, *pg);
- pg_map.pg_created(info->pgid, pg);
-
- return seastar::when_all_succeed(
- advance_pg_to(pg, osdmap->get_epoch()),
- pg->get_need_up_thru() ? _send_alive() : seastar::now(),
- shard_services.dispatch_context(
- pg->get_collection_ref(),
- std::move(rctx)).then(
- [pg]() { return seastar::make_ready_future<Ref<PG>>(pg); }));
+ return shard_services.start_operation<PGAdvanceMap>(
+ *this, pg, pg->get_osdmap_epoch(),
+ osdmap->get_epoch(), std::move(rctx), true).second.then([pg] {
+ return seastar::make_ready_future<Ref<PG>>(pg);
+ });
});
});
}
// todo: m-to-n: broadcast this news to all shards
auto &pgs = pg_map.get_pgs();
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
- return advance_pg_to(pg.second, epoch);
+ return shard_services.start_operation<PGAdvanceMap>(
+ *this, pg.second, pg.second->get_osdmap_epoch(), epoch).second;
}).then([epoch, this] {
osdmap_gate.got_map(epoch);
return seastar::make_ready_future();
return pg_map.get_pg(pgid).first;
}
-seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
-{
- auto from = pg->get_osdmap_epoch();
- // todo: merge/split support
- return seastar::do_with(
- PeeringCtx{},
- [this, pg, from, to](auto &rctx) {
- return seastar::do_for_each(
- boost::make_counting_iterator(from + 1),
- boost::make_counting_iterator(to + 1),
- [this, pg, &rctx](epoch_t next_epoch) {
- return get_map(next_epoch).then(
- [pg, this, &rctx] (cached_map_t&& next_map) {
- pg->handle_advance_map(next_map, rctx);
- });
- }).then([this, &rctx, pg] {
- pg->handle_activate_map(rctx);
- return seastar::when_all_succeed(
- pg->get_need_up_thru() ? _send_alive() : seastar::now(),
- shard_services.dispatch_context(
- pg->get_collection_ref(),
- std::move(rctx)));
- });
- });
-}
-
}
blocking_future<Ref<PG>> wait_for_pg(
spg_t pgid);
- seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
bool should_restart() const;
seastar::future<> restart();
seastar::future<> shutdown();
seastar::future<> send_beacon();
void update_heartbeat_peers();
+ friend class PGAdvanceMap;
};
}
enum class OperationTypeCode {
client_request = 0,
- peering_event,
- compound_peering_request,
- last_op
+ peering_event = 1,
+ compound_peering_request = 2,
+ pg_advance_map = 3,
+ pg_creation = 4,
+ last_op = 5
};
static constexpr const char* const OP_NAMES[] = {
- "client_write",
+ "client_request",
"peering_event",
- "compound_peering_request"
+ "compound_peering_request",
+ "pg_advance_map",
+ "pg_creation",
};
// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
logger().debug("{}: start", *this);
IRef ref = this;
- with_blocking_future(handle.enter(cp().await_map))
+ return with_blocking_future(handle.enter(cp().await_map))
.then([this]() {
return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_map_epoch()));
}).then([this](epoch_t epoch) {
});
});
});
- return seastar::make_ready_future();
}
}
q->second.first);
} else {
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
- state,
- osd,
- conn,
- osd.get_shard_services(),
- pg_shard_t(),
- pgid,
- m->epoch,
- m->epoch,
- NullEvt(),
- true,
- new PGCreateInfo(
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(),
pgid,
m->epoch,
- q->second.first,
- q->second.second,
- true));
+ m->epoch,
+ NullEvt(),
+ true,
+ new PGCreateInfo(
+ pgid,
+ m->epoch,
+ q->second.first,
+ q->second.second,
+ true)).first;
+ ret.push_back(op);
}
}
return ret;
pg_notify.query_epoch,
notify,
true, // requires_pg
- create_info);
- op->start();
+ create_info).first;
ret.push_back(op);
}
return ret;
pgid,
pg_notify.epoch_sent,
pg_notify.query_epoch,
- std::move(info));
+ std::move(info)).first;
ret.push_back(op);
}
return ret;
pg_query, pg_query.epoch_sent};
logger().debug("handle_pg_query on {} from {}", pgid, from);
auto op = osd.get_shard_services().start_operation<QuerySubEvent>(
- state,
- osd,
- conn,
- osd.get_shard_services(),
- pg_shard_t(from, pg_query.from),
- pgid,
- pg_query.epoch_sent,
- pg_query.epoch_sent,
- std::move(query));
+ state,
+ osd,
+ conn,
+ osd.get_shard_services(),
+ pg_shard_t(from, pg_query.from),
+ pgid,
+ pg_query.epoch_sent,
+ pg_query.epoch_sent,
+ std::move(query)).first;
ret.push_back(op);
}
return ret;
add_blocker(blocker.get());
IRef ref = this;
logger().info("{}: about to fork future", *this);
- state->promise.get_future().then(
+ return state->promise.get_future().then(
[this, blocker=std::move(blocker)](auto &&ctx) {
clear_blocker(blocker.get());
logger().info("{}: sub events complete", *this);
}).then([this, ref=std::move(ref)] {
logger().info("{}: complete", *this);
});
-
- logger().info("{}: forked, returning", *this);
- return seastar::now();
}
} // namespace ceph::osd
logger().debug("{}: start", *this);
IRef ref = this;
- get_pg().then([this](Ref<PG> pg) {
+ return get_pg().then([this](Ref<PG> pg) {
if (!pg) {
logger().debug("{}: pg absent, did not create", *this);
on_pg_absent();
}).then([this, ref=std::move(ref)] {
logger().debug("{}: complete", *this);
});
- return seastar::make_ready_future();
}
void PeeringEvent::on_pg_absent()
"PeeringEvent::PGPipeline::process"
};
friend class PeeringEvent;
+ friend class PGAdvanceMap;
};
protected:
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <seastar/core/future.hh>
+
+#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include "include/types.h"
+#include "crimson/osd/osd_operations/pg_advance_map.h"
+#include "crimson/osd/pg.h"
+#include "crimson/osd/osd.h"
+#include "common/Formatter.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+PGAdvanceMap::PGAdvanceMap(
+ OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to)
+ : osd(osd), pg(pg), from(from), to(to), do_init(false) {}
+
+PGAdvanceMap::PGAdvanceMap(
+ OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+ PeeringCtx &&rctx, bool do_init)
+ : osd(osd), pg(pg), from(from), to(to),
+ rctx(std::move(rctx)), do_init(do_init) {}
+
+PGAdvanceMap::~PGAdvanceMap() {}
+
+void PGAdvanceMap::print(std::ostream &lhs) const
+{
+ lhs << "PGAdvanceMap("
+ << "pg=" << pg->get_pgid()
+ << " from=" << from
+ << " to=" << to;
+ if (do_init) {
+ lhs << " do_init";
+ }
+ lhs << ")";
+}
+
+void PGAdvanceMap::dump_detail(Formatter *f) const
+{
+ f->open_object_section("PGAdvanceMap");
+ f->dump_stream("pgid") << pg->get_pgid();
+ f->dump_int("from", from);
+ f->dump_int("to", to);
+ f->dump_bool("do_init", do_init);
+ f->close_section();
+}
+
+seastar::future<> PGAdvanceMap::start()
+{
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ return with_blocking_future(
+ handle.enter(pg->peering_request_pg_pipeline.process))
+ .then([this] {
+ if (do_init) {
+ pg->handle_initialize(rctx);
+ pg->handle_activate_map(rctx);
+ }
+ return seastar::do_for_each(
+ boost::make_counting_iterator(from + 1),
+ boost::make_counting_iterator(to + 1),
+ [this](epoch_t next_epoch) {
+ return osd.get_map(next_epoch).then(
+ [this] (cached_map_t&& next_map) {
+ pg->handle_advance_map(next_map, rctx);
+ });
+ }).then([this] {
+ pg->handle_activate_map(rctx);
+ handle.exit();
+ if (do_init) {
+ osd.pg_map.pg_created(pg->get_pgid(), pg);
+ logger().info("{} new pg {}", __func__, *pg);
+ }
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru() ? osd._send_alive() : seastar::now(),
+ osd.shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(rctx)));
+ });
+ }).then([this, ref=std::move(ref)] {
+ logger().debug("{}: complete", *this);
+ });
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <iostream>
+#include <seastar/core/future.hh>
+
+#include "crimson/osd/osd_operation.h"
+#include "osd/osd_types.h"
+#include "crimson/common/type_helpers.h"
+#include "osd/PeeringState.h"
+
+namespace ceph::osd {
+
+class OSD;
+class PG;
+
+class PGAdvanceMap : public OperationT<PGAdvanceMap> {
+public:
+ static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
+
+protected:
+ OrderedPipelinePhase::Handle handle;
+
+ OSD &osd;
+ Ref<PG> pg;
+
+ epoch_t from;
+ epoch_t to;
+
+ PeeringCtx rctx;
+ const bool do_init;
+
+public:
+ PGAdvanceMap(
+ OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to);
+ PGAdvanceMap(
+ OSD &osd, Ref<PG> pg, epoch_t from, epoch_t to,
+ PeeringCtx &&rctx, bool do_init);
+ ~PGAdvanceMap();
+
+ void print(std::ostream &) const final;
+ void dump_detail(Formatter *f) const final;
+ seastar::future<> start();
+};
+
+}
~PG();
+ const pg_shard_t &get_pg_whoami() const {
+ return pg_whoami;
+ }
+
+ const spg_t&get_pgid() const {
+ return pgid;
+ }
+
// EpochSource
epoch_t get_osdmap_epoch() const final {
return peering_state.get_osdmap_epoch();
friend std::ostream& operator<<(std::ostream&, const PG& pg);
friend class ClientRequest;
friend class PeeringEvent;
+ friend class PGAdvanceMap;
};
std::ostream& operator<<(std::ostream&, const PG& pg);
#include "include/types.h"
#include "crimson/common/type_helpers.h"
#include "crimson/osd/osd_operation.h"
+#include "crimson/osd/pg.h"
#include "osd/osd_types.h"
namespace ceph::osd {
OperationRegistry registry;
template <typename T, typename... Args>
- typename T::IRef start_operation(Args&&... args) {
+ auto start_operation(Args&&... args) {
auto op = registry.create_operation<T>(std::forward<Args>(args)...);
- op->start();
- return op;
+ return std::make_pair(op, op->start());
}
// Loggers