${PROJECT_SOURCE_DIR}/src/common/HTMLFormatter.cc
${PROJECT_SOURCE_DIR}/src/common/Formatter.cc
${PROJECT_SOURCE_DIR}/src/common/Graylog.cc
+ ${PROJECT_SOURCE_DIR}/src/common/ostream_temp.cc
${PROJECT_SOURCE_DIR}/src/common/LogEntry.cc
${PROJECT_SOURCE_DIR}/src/common/Mutex.cc
${PROJECT_SOURCE_DIR}/src/common/SubProcess.cc
"CEPH_PKGLIBDIR=\"${CEPH_INSTALL_FULL_PKGLIBDIR}\""
"CEPH_DATADIR=\"${CEPH_INSTALL_DATADIR}\"")
+set(crimson_common_deps
+ Boost::iostreams
+ Boost::random)
+
+if(NOT WITH_SYSTEM_BOOST)
+ list(APPEND crimson_common_deps ${ZLIB_LIBRARIES})
+endif()
+
target_link_libraries(crimson-common
PUBLIC
json_spirit
PRIVATE
crc32
crimson::cflags
- Boost::iostreams
- Boost::random
+ ${crimson_common_deps}
${NSS_LIBRARIES} ${NSPR_LIBRARIES} OpenSSL::Crypto)
set(crimson_auth_srcs
bool sub_want_increment(const std::string& what, version_t start, unsigned flags);
seastar::future<> renew_subs();
- MonMap &get_monmap_ref() {
- return monmap;
- }
-
private:
// AuthServer methods
std::pair<std::vector<uint32_t>, std::vector<uint32_t>>
pg.cc
pg_backend.cc
pg_meta.cc
- recovery_machine.cc
- recovery_state.cc
- recovery_states.cc
- replicated_backend.cc)
+ replicated_backend.cc
+ shard_services.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/MissingLoc.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/PGLog.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/osd_perf_counters.cc
+ )
target_link_libraries(crimson-osd
crimson-common crimson-os crimson fmt::fmt)
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MPGStats.h"
+#include "messages/MOSDPGCreate2.h"
#include "crimson/mon/MonClient.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_backend.h"
#include "crimson/osd/pg_meta.h"
-
#include "osd/PGPeeringEvent.h"
+#include "osd/PeeringState.h"
namespace {
seastar::logger& logger() {
heartbeat{new Heartbeat{*this, *monc, hb_front_msgr, hb_back_msgr}},
heartbeat_timer{[this] { update_heartbeat_peers(); }},
store{std::make_unique<ceph::os::CyanStore>(
- local_conf().get_val<std::string>("osd_data"))}
+ local_conf().get_val<std::string>("osd_data"))},
+ shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store}
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
superblock = std::move(sb);
return get_map(superblock.current_epoch);
}).then([this](cached_map_t&& map) {
+ shard_services.update_map(osdmap);
osdmap = std::move(map);
return load_pgs();
}).then([this] {
return monc->send_message(m);
}
-seastar::future<> OSD::_send_alive(epoch_t want)
+seastar::future<> OSD::_send_alive()
{
+ auto want = osdmap->get_epoch();
+ logger().info(
+ "{} want {} up_thru_wanted {}",
+ __func__,
+ want,
+ up_thru_wanted);
if (!osdmap->exists(whoami)) {
return seastar::now();
} else if (want <= up_thru_wanted){
});
}
-seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map, spg_t pgid)
{
using ec_profile_t = map<string,string>;
- return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
- return get_map(e);
- }).then([pgid, this] (auto&& create_map) {
+ return ([&]() {
if (create_map->have_pg_pool(pgid.pool())) {
pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
string name = create_map->get_pool_name(pgid.pool());
ec_profile_t ec_profile;
if (pi.is_erasure()) {
- ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
+ ec_profile = create_map->get_erasure_code_profile(pi.erasure_code_profile);
}
- return seastar::make_ready_future<pg_pool_t,
- string,
- ec_profile_t>(std::move(pi),
- std::move(name),
- std::move(ec_profile));
+ return seastar::make_ready_future<pg_pool_t, string, ec_profile_t>(
+ std::move(pi),
+ std::move(name),
+ std::move(ec_profile));
} else {
// pool was deleted; grab final pg_pool_t off disk.
return meta_coll->load_final_pool_info(pgid.pool());
}
- }).then([pgid, this](pg_pool_t&& pool,
+ })().then([pgid, this, create_map](pg_pool_t&& pool,
string&& name,
ec_profile_t&& ec_profile) {
- auto backend = PGBackend::create(pgid, pool, store.get(), ec_profile);
- Ref<PG> pg{new PG{pgid,
- pg_shard_t{whoami, pgid.shard},
- std::move(pool),
- std::move(name),
- std::move(backend),
- osdmap,
- cluster_msgr}};
+ return seastar::make_ready_future<Ref<PG>>(Ref<PG>{new PG{pgid,
+ pg_shard_t{whoami, pgid.shard},
+ std::move(pool),
+ std::move(name),
+ create_map,
+ shard_services,
+ ec_profile}});
+ });
+}
+
+seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
+{
+ return PGMeta{store.get(), pgid}.get_epoch().then([this](epoch_t e) {
+ return get_map(e);
+ }).then([pgid, this] (auto&& create_map) {
+ return make_pg(std::move(create_map), pgid);
+ }).then([this, pgid](Ref<PG> pg) {
return pg->read_state(store.get()).then([pg] {
return seastar::make_ready_future<Ref<PG>>(std::move(pg));
+ }).handle_exception([pgid](auto ep) {
+ logger().info("pg {} saw exception on load {}", pgid, ep);
+ ceph_abort("Could not load pg" == 0);
+ return seastar::make_exception_future<Ref<PG>>(ep);
});
});
}
return handle_pg_query(conn, boost::static_pointer_cast<MOSDPGQuery>(m));
case MSG_OSD_PG_LOG:
return handle_pg_log(conn, boost::static_pointer_cast<MOSDPGLog>(m));
+ case MSG_OSD_PG_CREATE2:
+ return handle_pg_create(conn, boost::static_pointer_cast<MOSDPGCreate2>(m));
default:
+ logger().info("{} unhandled message {}", __func__, *m);
return seastar::now();
}
}
}
}
+bool OSD::require_mon_peer(ceph::net::Connection *conn, Ref<Message> m)
+{
+ if (!conn->peer_is_mon()) {
+ logger().info("{} received from non-mon {}, {}",
+ __func__,
+ conn->get_peer_addr(),
+ *m);
+ return false;
+ }
+ return true;
+}
+
+seastar::future<Ref<PG>> OSD::handle_pg_create_info(
+ std::unique_ptr<PGCreateInfo> info) {
+ return seastar::do_with(
+ std::move(info),
+ [this](auto &info) -> seastar::future<Ref<PG>> {
+ return get_map(info->epoch).then(
+ [&info, this](cached_map_t startmap) ->
+ seastar::future<Ref<PG>, cached_map_t> {
+ const spg_t &pgid = info->pgid;
+ if (info->by_mon) {
+ int64_t pool_id = pgid.pgid.pool();
+ const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
+ if (!pool) {
+ logger().debug(
+ "{} ignoring pgid {}, pool dne",
+ __func__,
+ pgid);
+ return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+ Ref<PG>(),
+ startmap);
+ }
+ ceph_assert(osdmap->require_osd_release >= ceph_release_t::nautilus);
+ if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
+ // this ensures we do not process old creating messages after the
+ // pool's initial pgs have been created (and pg are subsequently
+ // allowed to split or merge).
+ logger().debug(
+ "{} dropping {} create, pool does not have CREATING flag set",
+ __func__,
+ pgid);
+ return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+ Ref<PG>(),
+ startmap);
+ }
+ }
+ return make_pg(startmap, pgid).then(
+ [this, startmap=std::move(startmap)](auto pg) mutable {
+ return seastar::make_ready_future<Ref<PG>, cached_map_t>(
+ std::move(pg),
+ std::move(startmap));
+ });
+ }).then(
+ [this, &info](auto pg, auto startmap) -> seastar::future<Ref<PG>> {
+ if (!pg)
+ return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
+ PeeringCtx rctx;
+ const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
+
+ int up_primary, acting_primary;
+ vector<int> up, acting;
+ startmap->pg_to_up_acting_osds(
+ info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+ int role = startmap->calc_pg_role(whoami, acting, acting.size());
+ if (!pp->is_replicated() && role != info->pgid.shard) {
+ role = -1;
+ }
+
+
+ auto coll = store->create_new_collection(coll_t(info->pgid));
+ create_pg_collection(
+ rctx.transaction,
+ info->pgid,
+ info->pgid.get_split_bits(pp->get_pg_num()));
+ init_pg_ondisk(
+ rctx.transaction,
+ info->pgid,
+ pp);
+
+ pg->init(
+ coll,
+ role,
+ up,
+ up_primary,
+ acting,
+ acting_primary,
+ info->history,
+ info->past_intervals,
+ false,
+ rctx.transaction);
+
+ pg->handle_initialize(rctx);
+ pg->handle_activate_map(rctx);
+
+ logger().info("{} new pg {}", __func__, *pg);
+ pgs.emplace(info->pgid, pg);
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+ shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(rctx)).then(
+ [pg]() { return seastar::make_ready_future<Ref<PG>>(pg); }));
+ });
+ });
+}
+
+seastar::future<> OSD::handle_pg_create(
+ ceph::net::Connection* conn,
+ Ref<MOSDPGCreate2> m)
+{
+ logger().info("{}: {} from {}", __func__, *m, m->get_source());
+ if (!require_mon_peer(conn, m)) {
+ return seastar::now();
+ }
+ return handle_batch_pg_message(
+ m->pgs,
+ [this, conn, m](auto p)
+ -> std::optional<std::tuple<spg_t, std::unique_ptr<PGPeeringEvent>>> {
+ const spg_t &pgid = p.first;
+ const auto &[created, created_stamp] = p.second;
+
+ auto q = m->pg_extra.find(pgid);
+ ceph_assert(q != m->pg_extra.end());
+ logger().debug(
+ "{} {} e{} @{} history {} pi {}",
+ __func__,
+ pgid,
+ created,
+ created_stamp,
+ q->second.first,
+ q->second.second);
+ if (!q->second.second.empty() &&
+ m->epoch < q->second.second.get_bounds().second) {
+ logger().error(
+ "got pg_create on {} epoch {} unmatched past_intervals (history {})",
+ pgid,
+ m->epoch,
+ q->second.second,
+ q->second.first);
+ return std::nullopt;
+ } else {
+ return std::make_optional(
+ std::make_tuple(
+ pgid,
+ std::make_unique<PGPeeringEvent>(
+ m->epoch,
+ m->epoch,
+ NullEvt(),
+ true,
+ new PGCreateInfo(
+ pgid,
+ m->epoch,
+ q->second.first,
+ q->second.second,
+ true))));
+ }
+ });
+}
+
seastar::future<> OSD::handle_osd_map(ceph::net::Connection* conn,
Ref<MOSDMap> m)
{
[this](epoch_t cur) {
return get_map(cur).then([this](cached_map_t&& o) {
osdmap = std::move(o);
+ shard_services.update_map(osdmap);
if (up_epoch != 0 &&
osdmap->is_up(whoami) &&
osdmap->get_addrs(whoami) == public_msgr.get_myaddrs()) {
heartbeat->update_peers(whoami);
}
-seastar::future<> OSD::handle_pg_notify(ceph::net::Connection* conn,
- Ref<MOSDPGNotify> m)
+seastar::future<> OSD::handle_pg_notify(
+ ceph::net::Connection* conn,
+ Ref<MOSDPGNotify> m)
{
// assuming all pgs reside in a single shard
// see OSD::dequeue_peering_evt()
const int from = m->get_source().num();
- return seastar::parallel_for_each(m->get_pg_list(),
+ return handle_batch_pg_message(
+ m->get_pg_list(),
[from, this](pair<pg_notify_t, PastIntervals> p) {
auto& [pg_notify, past_intervals] = p;
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
pg_notify,
0, // the features is not used
past_intervals};
- auto create_info = new PGCreateInfo{pgid,
- pg_notify.query_epoch,
- pg_notify.info.history,
- past_intervals,
- false};
- auto evt = std::make_unique<PGPeeringEvent>(pg_notify.epoch_sent,
- pg_notify.query_epoch,
- notify,
- true, // requires_pg
- create_info);
- return do_peering_event(pgid, std::move(evt));
+ logger().debug("handle_pg_notify on {} from {}", pgid.pgid, from);
+ auto create_info = new PGCreateInfo{
+ pgid,
+ pg_notify.query_epoch,
+ pg_notify.info.history,
+ past_intervals,
+ false};
+ return std::make_optional(
+ std::make_tuple(
+ pgid,
+ std::make_unique<PGPeeringEvent>(
+ pg_notify.epoch_sent,
+ pg_notify.query_epoch,
+ notify,
+ true, // requires_pg
+ create_info)));
});
}
-seastar::future<> OSD::handle_pg_info(ceph::net::Connection* conn,
- Ref<MOSDPGInfo> m)
+seastar::future<> OSD::handle_pg_info(
+ ceph::net::Connection* conn,
+ Ref<MOSDPGInfo> m)
{
// assuming all pgs reside in a single shard
// see OSD::dequeue_peering_evt()
const int from = m->get_source().num();
- return seastar::parallel_for_each(m->pg_list,
+ return handle_batch_pg_message(
+ m->pg_list,
[from, this](pair<pg_notify_t, PastIntervals> p) {
auto& pg_notify = p.first;
spg_t pgid{pg_notify.info.pgid.pgid, pg_notify.to};
+ logger().debug("handle_pg_info on {} from {}", pgid.pgid, from);
MInfoRec info{pg_shard_t{from, pg_notify.from},
pg_notify.info,
pg_notify.epoch_sent};
- auto evt = std::make_unique<PGPeeringEvent>(pg_notify.epoch_sent,
- pg_notify.query_epoch,
- std::move(info));
- return do_peering_event(pgid, std::move(evt));
- });
+ return std::make_optional(
+ std::tuple(
+ pgid,
+ std::make_unique<PGPeeringEvent>(
+ pg_notify.epoch_sent,
+ pg_notify.query_epoch,
+ std::move(info))));
+ });
}
seastar::future<> OSD::handle_pg_query(ceph::net::Connection* conn,
Ref<MOSDPGQuery> m)
{
+ // assuming all pgs reside in a single shard
+ // see OSD::dequeue_peering_evt()
const int from = m->get_source().num();
- return seastar::parallel_for_each(m->pg_list,
+ // TODO: handle missing pg -- handle_batch_pg_message ignores pgs
+ // that don't exist
+ return handle_batch_pg_message_with_missing_handler(
+ m->pg_list,
[from, this](pair<spg_t, pg_query_t> p) {
auto& [pgid, pg_query] = p;
MQuery query{pgid, pg_shard_t{from, pg_query.from},
- pg_query, pg_query.epoch_sent};
- auto evt = std::make_unique<PGPeeringEvent>(pg_query.epoch_sent,
- pg_query.epoch_sent,
- std::move(query));
- return do_peering_event(pgid, std::move(evt));
- });
+ pg_query, pg_query.epoch_sent};
+ logger().debug("handle_pg_query on {} from {}", pgid, from);
+ return std::make_optional(
+ std::make_tuple(
+ pgid,
+ std::make_unique<PGPeeringEvent>(
+ pg_query.epoch_sent,
+ pg_query.epoch_sent,
+ std::move(query))));
+ },
+ [this, from](pair<spg_t, pg_query_t> p, PeeringCtx &ctx) {
+ auto &[pgid, query] = p;
+ logger().debug("handle_pg_query on absent pg {} from {}", pgid, from);
+ pg_info_t empty(spg_t(pgid.pgid, query.to));
+ ceph_assert(query.type == pg_query_t::INFO);
+ ctx.notify_list[from].emplace_back(
+ pg_notify_t(
+ query.from, query.to,
+ query.epoch_sent,
+ osdmap->get_epoch(),
+ empty),
+ PastIntervals());
+ });
}
-seastar::future<> OSD::handle_pg_log(ceph::net::Connection* conn,
- Ref<MOSDPGLog> m)
+seastar::future<> OSD::handle_pg_log(
+ ceph::net::Connection* conn,
+ Ref<MOSDPGLog> m)
{
const int from = m->get_source().num();
- MLogRec log{pg_shard_t{from, m->from}, m.get()};
- auto create_info = new PGCreateInfo{m->get_spg(),
- m->get_query_epoch(),
- m->info.history,
- m->past_intervals,
- false};
- auto evt = std::make_unique<PGPeeringEvent>(m->get_epoch(),
- m->get_query_epoch(),
- std::move(log),
- true,
- create_info);
- return do_peering_event(m->get_spg(), std::move(evt));
+ logger().debug("handle_pg_log on {} from {}", m->get_spg(), from);
+ return do_peering_event_and_dispatch(
+ m->get_spg(),
+ PGPeeringEventURef(m->get_event()));
}
void OSD::check_osdmap_features()
return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
return advance_pg_to(pg.second, epoch);
}).then([epoch, this] {
- auto first = waiting_peering.lower_bound(epoch);
- auto last = waiting_peering.end();
+ auto first = waiting_peering.begin();
+ auto last = waiting_peering.upper_bound(epoch);
std::for_each(first, last, [epoch, this](auto& blocked_requests) {
blocked_requests.second.set_value(epoch);
});
});
}
+
+seastar::future<Ref<PG>>
+OSD::get_pg(
+ spg_t pgid,
+ epoch_t epoch,
+ std::unique_ptr<PGCreateInfo> info)
+{
+ return wait_for_map(epoch).then([this, pgid, epoch, info=std::move(info)](epoch_t) mutable {
+ if (auto pg = pgs.find(pgid); pg != pgs.end()) {
+ return advance_pg_to(pg->second, epoch).then([pg=pg->second]() {
+ return seastar::make_ready_future<Ref<PG>>(pg);
+ });
+ } else if (!info) {
+ return seastar::make_ready_future<Ref<PG>>();
+ } else {
+ auto creating = pgs_creating.find(pgid);
+ if (creating == pgs_creating.end()) {
+ creating = pgs_creating.emplace(
+ pgid,
+ seastar::shared_future<Ref<PG>>(handle_pg_create_info(std::move(info)).then([this, pgid](auto pg) {
+ pgs_creating.erase(pgid);
+ return seastar::make_ready_future<Ref<PG>>(pg);
+ }))).first;
+ }
+ return creating->second.get_future().then([this, epoch](auto pg) {
+ return advance_pg_to(pg, epoch).then([pg]() {
+ return seastar::make_ready_future<Ref<PG>>(pg);
+ });
+ });
+ }
+ });
+}
+
+seastar::future<Ref<PG>>
+OSD::do_peering_event(
+ spg_t pgid,
+ PGPeeringEventURef evt,
+ PeeringCtx &rctx)
+{
+ return get_pg(pgid, evt->get_epoch_sent(), std::move(evt->create_info))
+ .then([this, evt=std::move(evt), &rctx](Ref<PG> pg) mutable {
+ if (pg) {
+ pg->do_peering_event(std::move(evt), rctx);
+ }
+ return seastar::make_ready_future<Ref<PG>>(pg);
+ });
+}
+
+seastar::future<bool>
+OSD::do_peering_event_and_dispatch_transaction(
+ spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt,
+ PeeringCtx &rctx)
+{
+ return do_peering_event(pgid, std::move(evt), rctx).then(
+ [this, pgid, &rctx](Ref<PG> pg) mutable {
+ if (pg) {
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+ shard_services.dispatch_context_transaction(
+ pg->get_collection_ref(), rctx)).then([] { return true; });
+ } else {
+ return seastar::make_ready_future<bool>(false);
+ }
+ });
+}
+
seastar::future<>
-OSD::do_peering_event(spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt)
-{
- if (auto pg = pgs.find(pgid); pg != pgs.end()) {
- return wait_for_map(evt->get_epoch_sent()).then(
- [pg=pg->second, this](epoch_t epoch) {
- return advance_pg_to(pg, epoch);
- }).then([pg, evt=std::move(evt)]() mutable {
- return pg->second->do_peering_event(std::move(evt));
- }).then([pg=pg->second, this] {
- return _send_alive(pg->get_need_up_thru());
+OSD::do_peering_event_and_dispatch(
+ spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt)
+{
+ return seastar::do_with(
+ PeeringCtx{},
+ [this, pgid, evt=std::move(evt)](auto &rctx) mutable {
+ return do_peering_event(pgid, std::move(evt), rctx).then(
+ [this, pgid, &rctx](Ref<PG> pg) mutable {
+ if (pg) {
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+ shard_services.dispatch_context(
+ pg->get_collection_ref(), std::move(rctx)));
+ } else {
+ return seastar::now();
+ }
+ });
+ }).handle_exception([](auto ep) {
+ logger().error("do_peering_event_and_dispatch saw {}", ep);
+ return seastar::make_exception_future<>(ep);
});
- } else {
- logger().warn("pg not found: {}", pgid);
- // todo: handle_pg_query_nopg()
- return seastar::now();
- }
}
seastar::future<epoch_t> OSD::wait_for_map(epoch_t epoch)
return seastar::make_ready_future<epoch_t>(mine);
} else {
logger().info("evt epoch is {}, i have {}, will wait", epoch, mine);
- return waiting_peering[epoch].get_shared_future();
+ auto fut = waiting_peering[epoch].get_shared_future();
+ return osdmap_subscribe(osdmap->get_epoch(), true).then(
+ [fut=std::move(fut)]() mutable {
+ return std::move(fut);
+ });
}
}
{
auto from = pg->get_osdmap_epoch();
// todo: merge/split support
- return seastar::do_for_each(boost::make_counting_iterator(from + 1),
- boost::make_counting_iterator(to + 1),
- [pg, this](epoch_t next_epoch) {
- return get_map(next_epoch).then([pg, this] (cached_map_t&& next_map) {
- return pg->handle_advance_map(next_map);
- }).then([pg, this] {
- return pg->handle_activate_map();
- });
+ return seastar::do_with(
+ PeeringCtx{},
+ [this, pg, from, to](auto &rctx) {
+ return seastar::do_for_each(
+ boost::make_counting_iterator(from + 1),
+ boost::make_counting_iterator(to + 1),
+ [this, pg, &rctx](epoch_t next_epoch) {
+ return get_map(next_epoch).then(
+ [pg, this, &rctx] (cached_map_t&& next_map) {
+ return pg->handle_advance_map(next_map, rctx);
+ });
+ }).then([this, &rctx, pg] {
+ pg->handle_activate_map(rctx);
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru() ? _send_alive() : seastar::now(),
+ shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(rctx)));
+ });
});
}
#pragma once
#include <map>
+#include <tuple>
+#include <optional>
#include <seastar/core/future.hh>
+#include <seastar/core/shared_future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/shared_future.hh>
#include "crimson/osd/chained_dispatchers.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/state.h"
+#include "crimson/osd/shard_services.h"
+#include "osd/PeeringState.h"
#include "osd/osd_types.h"
+#include "osd/osd_perf_counters.h"
+#include "osd/PGPeeringEvent.h"
class MOSDMap;
class MOSDOp;
class OSDMap;
class OSDMeta;
class PG;
-class PGPeeringEvent;
class Heartbeat;
namespace ceph::mon {
std::unique_ptr<ceph::os::CyanStore> store;
std::unique_ptr<OSDMeta> meta_coll;
- std::unordered_map<spg_t, Ref<PG>> pgs;
OSDState state;
/// _first_ epoch we were marked up (after this process started)
uint64_t global_id,
const AuthCapsInfo& caps) final;
+ ceph::osd::ShardServices shard_services;
+ std::unordered_map<spg_t, Ref<PG>> pgs;
+
public:
OSD(int id, uint32_t nonce,
ceph::net::Messenger& cluster_msgr,
seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap);
seastar::future<> _send_boot();
+ seastar::future<Ref<PG>> make_pg(cached_map_t create_map, spg_t pgid);
seastar::future<Ref<PG>> load_pg(spg_t pgid);
seastar::future<> load_pgs();
epoch_t up_thru_wanted = 0;
- seastar::future<> _send_alive(epoch_t want);
+ seastar::future<> _send_alive();
// OSDMapService methods
seastar::future<cached_map_t> get_map(epoch_t e) override;
void write_superblock(ceph::os::Transaction& t);
seastar::future<> read_superblock();
+ bool require_mon_peer(ceph::net::Connection *conn, Ref<Message> m);
+
+ seastar::future<Ref<PG>> handle_pg_create_info(
+ std::unique_ptr<PGCreateInfo> info);
+
+ template <typename C, typename F, typename G>
+ seastar::future<> handle_batch_pg_message_with_missing_handler(
+ const C &c,
+ F &&f,
+ G &&on_missing_pg) {
+ using mapped_type = const typename C::value_type &;
+ using event_type = std::optional<std::tuple<
+ spg_t,
+ std::unique_ptr<PGPeeringEvent>>>;
+ return seastar::do_with(
+ PeeringCtx{},
+ std::move(f),
+ std::move(on_missing_pg),
+ [this, &c] (auto &rctx, auto &f, auto &on_missing_pg) {
+ return seastar::parallel_for_each(
+ c,
+ [this, &rctx, &f, &on_missing_pg](mapped_type m) {
+ event_type result = f(m);
+ if (result) {
+ auto [pgid, event] = std::move(*result);
+ return do_peering_event_and_dispatch_transaction(
+ pgid,
+ std::move(event),
+ rctx).then([m, &on_missing_pg, &rctx] (bool found) {
+ if (!found) {
+ on_missing_pg(m, rctx);
+ }
+ return seastar::now();
+ });
+ } else {
+ return seastar::now();
+ }
+ }).then([this, &rctx] {
+ return shard_services.dispatch_context(std::move(rctx));
+ });
+ });
+ }
+
+ template <typename C, typename F>
+ seastar::future<> handle_batch_pg_message(
+ const C &c,
+ F &&f) {
+ return handle_batch_pg_message_with_missing_handler(
+ c,
+ std::move(f),
+ [](const typename C::value_type &, PeeringCtx &){});
+ }
+
+ seastar::future<> handle_pg_create(ceph::net::Connection *conn,
+ Ref<MOSDPGCreate2> m);
seastar::future<> handle_osd_map(ceph::net::Connection* conn,
Ref<MOSDMap> m);
seastar::future<> handle_osd_op(ceph::net::Connection* conn,
// wait for an osdmap whose epoch is greater or equal to given epoch
seastar::future<epoch_t> wait_for_map(epoch_t epoch);
seastar::future<> consume_map(epoch_t epoch);
- seastar::future<> do_peering_event(spg_t pgid,
- std::unique_ptr<PGPeeringEvent> evt);
- seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
+ std::map<spg_t, seastar::shared_future<Ref<PG>>> pgs_creating;
+ seastar::future<Ref<PG>> get_pg(
+ spg_t pgid,
+ epoch_t epoch,
+ std::unique_ptr<PGCreateInfo> info);
+
+ seastar::future<Ref<PG>> do_peering_event(
+ spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt,
+ PeeringCtx &rctx);
+ seastar::future<> do_peering_event_and_dispatch(
+ spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt);
+ seastar::future<bool> do_peering_event_and_dispatch_transaction(
+ spg_t pgid,
+ std::unique_ptr<PGPeeringEvent> evt,
+ PeeringCtx &rctx);
+
+ seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);
bool should_restart() const;
seastar::future<> restart();
seastar::future<> shutdown();
class OSDMapService {
public:
- using cached_map_t = boost::local_shared_ptr<OSDMap>;
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
virtual ~OSDMapService() = default;
virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0;
/// get the latest map
#include "crimson/osd/pg_meta.h"
#include "pg_backend.h"
-#include "recovery_events.h"
-#include "recovery_state.h"
namespace {
seastar::logger& logger() {
}
using ceph::common::local_conf;
-using recovery::AdvMap;
-using recovery::ActMap;
-using recovery::Initialize;
-PG::PG(spg_t pgid,
- pg_shard_t pg_shard,
- pg_pool_t&& pool,
- std::string&& name,
- std::unique_ptr<PGBackend> backend,
- cached_map_t osdmap,
- ceph::net::Messenger& msgr)
+class RecoverablePredicate : public IsPGRecoverablePredicate {
+public:
+ bool operator()(const set<pg_shard_t> &have) const override {
+ return !have.empty();
+ }
+};
+
+class ReadablePredicate: public IsPGReadablePredicate {
+ pg_shard_t whoami;
+public:
+ explicit ReadablePredicate(pg_shard_t whoami) : whoami(whoami) {}
+ bool operator()(const set<pg_shard_t> &have) const override {
+ return have.count(whoami);
+ }
+};
+
+PG::PG(
+ spg_t pgid,
+ pg_shard_t pg_shard,
+ pg_pool_t&& pool,
+ std::string&& name,
+ cached_map_t osdmap,
+ ceph::osd::ShardServices &shard_services,
+ ec_profile_t profile)
: pgid{pgid},
- whoami{pg_shard},
- pool{std::move(pool)},
- recovery_state{*this},
- info{pgid},
- backend{std::move(backend)},
+ pg_whoami{pg_shard},
+ coll_ref(shard_services.get_store().open_collection(coll)),
+ pgmeta_oid{pgid.make_pgmeta_oid()},
+ shard_services{shard_services},
osdmap{osdmap},
- msgr{msgr}
-{
- // TODO
-}
-
-seastar::future<> PG::read_state(ceph::os::CyanStore* store)
-{
- return PGMeta{store, pgid}.load().then(
- [this](pg_info_t pg_info_, PastIntervals past_intervals_) {
- info = std::move(pg_info_);
- last_written_info = info;
- past_intervals = std::move(past_intervals_);
- // initialize current mapping
- {
- vector<int> new_up, new_acting;
- int new_up_primary, new_acting_primary;
- osdmap->pg_to_up_acting_osds(pgid.pgid,
- &new_up, &new_up_primary,
- &new_acting, &new_acting_primary);
- update_primary_state(new_up, new_up_primary,
- new_acting, new_acting_primary);
- }
- info.stats.up = up;
- info.stats.up_primary = up_primary.osd;
- info.stats.acting = acting;
- info.stats.acting_primary = primary.osd;
- info.stats.mapping_epoch = info.history.same_interval_since;
- recovery_state.handle_event(Initialize{});
- // note: we don't activate here because we know the OSD will advance maps
- // during boot.
- return seastar::now();
- });
-}
-
-void
-PG::update_primary_state(const std::vector<int>& new_up,
- int new_up_primary,
- const std::vector<int>& new_acting,
- int new_acting_primary)
-{
- auto collect_pg_shards =
- [is_erasure=pool.is_erasure()](const std::vector<int>& osds,
- int osd_primary) {
- int8_t index = 0;
- pg_shard_set_t collected;
- pg_shard_t pg_primary;
- for (auto osd : osds) {
- if (osd != CRUSH_ITEM_NONE) {
- pg_shard_t pg_shard{
- osd, is_erasure ? shard_id_t{index} : shard_id_t::NO_SHARD};
- if (osd == osd_primary) {
- pg_primary = pg_shard;
- }
- collected.insert(pg_shard);
- }
- index++;
- }
- return std::make_pair(collected, pg_primary);
- };
- acting = new_acting;
- std::tie(actingset, primary) = collect_pg_shards(acting, new_acting_primary);
- ceph_assert(primary.osd == new_acting_primary);
- up = new_up;
- std::tie(upset, up_primary) = collect_pg_shards(up, new_up_primary);
- ceph_assert(up_primary.osd == new_up_primary);
-}
-
-epoch_t PG::get_osdmap_epoch() const
-{
- return osdmap->get_epoch();
-}
-
-pg_shard_t PG::get_whoami() const
-{
- return whoami;
-}
-
-const pg_info_t& PG::get_info() const
-{
- return info;
-}
-
-const pg_stat_t& PG::get_stats() const
-{
- return info.stats;
-}
-
-void PG::clear_state(uint64_t mask)
-{
- if (!test_state(mask))
- return;
- info.stats.state &= ~mask;
- const auto now = utime_t{coarse_real_clock::now()};
- info.stats.last_change = now;
- if (mask & PG_STATE_ACTIVE) {
- info.stats.last_active = now;
- }
-}
-
-bool PG::test_state(uint64_t mask) const
-{
- return info.stats.state & mask;
-}
-
-void PG::set_state(uint64_t mask)
-{
- if (test_state(mask)) {
- return;
- }
- info.stats.state |= mask;
- const auto now = utime_t{coarse_real_clock::now()};
- info.stats.last_change = now;
- if (mask & PG_STATE_ACTIVE) {
- info.stats.last_became_active = now;
- if (active_promise) {
- std::move(active_promise)->set_value();
- active_promise.reset();
- }
- }
- if (mask & (PG_STATE_ACTIVE | PG_STATE_PEERED) &&
- test_state(PG_STATE_ACTIVE | PG_STATE_PEERED)) {
- info.stats.last_became_peered = now;
- }
- if (mask & PG_STATE_CLEAN) {
- info.stats.last_epoch_clean = get_osdmap_epoch();
- }
-}
-
-const PastIntervals& PG::get_past_intervals() const
-{
- return past_intervals;
-}
-
-pg_shard_t PG::get_primary() const
-{
- return primary;
-}
-
-bool PG::is_primary() const
-{
- return whoami == primary;
-}
-
-
-namespace {
- bool has_shard(bool ec, const vector<int>& osds, pg_shard_t pg_shard)
- {
- if (ec) {
- return (osds.size() > static_cast<unsigned>(pg_shard.shard) &&
- osds[pg_shard.shard] == pg_shard.osd);
- } else {
- return std::find(osds.begin(), osds.end(), pg_shard.osd) != osds.end();
- }
- }
-}
-
-bool PG::is_acting(pg_shard_t pg_shard) const
-{
- return has_shard(pool.is_erasure(), acting, pg_shard);
-}
-
-bool PG::is_up(pg_shard_t pg_shard) const
-{
- return has_shard(pool.is_erasure(), up, pg_shard);
-}
-
-epoch_t PG::get_last_peering_reset() const
-{
- return last_peering_reset;
-}
-
-void PG::update_last_peering_reset()
-{
- last_peering_reset = get_osdmap_epoch();
-}
-
-epoch_t PG::get_need_up_thru() const
-{
- return need_up_thru;
-}
-
-void PG::update_need_up_thru(const OSDMap* o)
-{
- if (!o) {
- o = osdmap.get();
- }
- if (auto up_thru = o->get_up_thru(whoami.osd);
- up_thru < info.history.same_interval_since) {
- logger().info("up_thru {} < same_since {}, must notify monitor",
- up_thru, info.history.same_interval_since);
- need_up_thru = info.history.same_interval_since;
- } else {
- logger().info("up_thru {} >= same_since {}, all is well",
- up_thru, info.history.same_interval_since);
- need_up_thru = 0;
- }
-}
-
-std::vector<int>
-PG::calc_acting(pg_shard_t auth_shard,
- const vector<int>& acting,
- const map<pg_shard_t, pg_info_t>& all_info) const
-{
- // select primary
- auto auth_log_shard = all_info.find(auth_shard);
- auto primary = all_info.find(up_primary);
- if (up.empty() ||
- primary->second.is_incomplete() ||
- primary->second.last_update < auth_log_shard->second.log_tail) {
- ceph_assert(!auth_log_shard->second.is_incomplete());
- logger().info("up[0] needs backfill, osd.{} selected as primary instead",
- auth_shard);
- primary = auth_log_shard;
- }
- auto& [primary_shard_id, primary_info] = *primary;
- logger().info("primary is osd.{} with {}",
- primary_shard_id.osd, primary_info);
-
- vector<int> want{primary_shard_id.osd};
- // We include auth_log_shard->second.log_tail because in GetLog,
- // we will request logs back to the min last_update over our
- // acting_backfill set, which will result in our log being extended
- // as far backwards as necessary to pick up any peers which can
- // be log recovered by auth_log_shard's log
- auto oldest_auth_log_entry =
- std::min(primary_info.log_tail, auth_log_shard->second.log_tail);
- // select replicas that have log contiguity with primary.
- // prefer up, then acting, then any peer_info osds
- auto get_shard = [](int osd) {
- return pg_shard_t{osd, shard_id_t::NO_SHARD}; };
- auto get_info = [&](int osd) -> const pg_info_t& {
- return all_info.find(get_shard(osd))->second; };
- auto is_good = [&, oldest_auth_log_entry](int osd) {
- auto& info = get_info(osd);
- return (!info.is_incomplete() &&
- info.last_update >= oldest_auth_log_entry);
- };
- auto is_enough = [size=pool.get_size(), &want](int) {
- return want.size() >= size;
- };
- std::vector<std::reference_wrapper<const vector<int>>> covered;
- auto has_covered = [primary=primary_shard_id.osd, &covered](int osd) {
- if (osd == primary)
- return true;
- for (auto& c : covered) {
- if (std::find(c.get().begin(), c.get().end(), osd) != c.get().end()) {
- return true;
- }
- }
- return false;
- };
- boost::copy((up |
- boost::adaptors::filtered(std::not_fn(is_enough)) |
- boost::adaptors::filtered(std::not_fn(has_covered)) |
- boost::adaptors::filtered(is_good)),
- std::back_inserter(want));
- if (is_enough(0))
- return want;
- covered.push_back(std::cref(up));
- // let's select from acting. the later "last_update" is, the better
- // sort by last_update, in descending order.
- using cands_sorted_by_eversion_t = std::map<eversion_t,
- pg_shard_t,
- std::greater<eversion_t>>;
- auto shard_to_osd = [](const pg_shard_t& shard) { return shard.osd; };
- {
- // This no longer has backfill OSDs, as they are covered above.
- auto cands = boost::accumulate(
- (acting |
- boost::adaptors::filtered(std::not_fn(is_enough)) |
- boost::adaptors::filtered(std::not_fn(has_covered)) |
- boost::adaptors::filtered(is_good)),
- cands_sorted_by_eversion_t{},
- [&](cands_sorted_by_eversion_t& cands, int osd) {
- cands.emplace(get_info(osd).last_update, get_shard(osd));
- return std::move(cands);
+ backend(
+ PGBackend::create(
+ pgid,
+ pool,
+ coll_ref,
+ &shard_services.get_store(),
+ profile)),
+ peering_state(
+ shard_services.get_cct(),
+ pg_shard,
+ pgid,
+ PGPool(
+ shard_services.get_cct(),
+ osdmap,
+ pgid.pool(),
+ pool,
+ osdmap->get_pool_name(pgid.pool())),
+ osdmap,
+ this,
+ this)
+{
+ peering_state.set_backend_predicates(
+ new ReadablePredicate(pg_whoami),
+ new RecoverablePredicate());
+}
+
+bool PG::try_flush_or_schedule_async() {
+// FIXME once there's a good way to schedule an "async" peering event
+#if 0
+ shard_services.get_store().do_transaction(
+ coll_ref,
+ ObjectStore::Transaction()).then(
+ [this, epoch=peering_state.get_osdmap()->get_epoch()](){
+ if (!peering_state.pg_has_reset_since(epoch)) {
+ PeeringCtx rctx;
+ auto evt = PeeringState::IntervalFlush();
+ do_peering_event(evt, rctx);
+ return shard_services.dispatch_context(std::move(rctx));
+ } else {
+ return seastar::now();
+ }
});
- boost::copy(cands |
- boost::adaptors::map_values |
- boost::adaptors::transformed(shard_to_osd),
- std::back_inserter(want));
- if (is_enough(0)) {
- return want;
- }
- covered.push_back(std::cref(acting));
- }
- // continue to search stray for more suitable peers
- {
- auto pi_to_osd = [](const peer_info_t::value_type& pi) {
- return pi.first.osd; };
- auto cands = boost::accumulate(
- (all_info |
- boost::adaptors::transformed(pi_to_osd) |
- boost::adaptors::filtered(std::not_fn(is_enough)) |
- boost::adaptors::filtered(std::not_fn(has_covered)) |
- boost::adaptors::filtered(is_good)),
- cands_sorted_by_eversion_t{},
- [&](cands_sorted_by_eversion_t& cands, int osd) {
- cands.emplace(get_info(osd).last_update, get_shard(osd));
- return cands;
- });
- boost::copy(cands |
- boost::adaptors::map_values |
- boost::adaptors::transformed(shard_to_osd),
- std::back_inserter(want));
- }
- return want;
-}
-
-bool PG::proc_replica_info(pg_shard_t from,
- const pg_info_t& pg_info,
- epoch_t send_epoch)
-{
-
- if (auto found = peer_info.find(from);
- found != peer_info.end() &&
- found->second.last_update == pg_info.last_update) {
- logger().info("got info {} from osd.{}, identical to ours",
- info, from);
- return false;
- } else if (!osdmap->has_been_up_since(from.osd, send_epoch)) {
- logger().info("got info {} from down osd.{}. discarding",
- info, from);
- return false;
- } else {
- logger().info("got info {} from osd.{}", info, from);
- peer_info.emplace(from, pg_info);
- return true;
- }
-}
-
-void PG::proc_replica_log(pg_shard_t from,
- const pg_info_t& pg_info,
- const pg_log_t& pg_log,
- const pg_missing_t& pg_missing)
-{
-
- logger().info("{} for osd.{}: {} {} {}", from, pg_info, pg_log, pg_missing);
- peer_info.insert_or_assign(from, pg_info);
-}
-
-// Returns an iterator to the best info in infos sorted by:
-// 1) Prefer newer last_update
-// 2) Prefer longer tail if it brings another info into contiguity
-// 3) Prefer current primary
-pg_shard_t
-PG::find_best_info(const PG::peer_info_t& infos) const
-{
- // See doc/dev/osd_internals/last_epoch_started.rst before attempting
- // to make changes to this process. Also, make sure to update it
- // when you find bugs!
- auto min_last_update_acceptable = eversion_t::max();
- epoch_t max_les = 0;
- for ([[maybe_unused]] auto& [shard, info] : infos) {
- if (max_les < info.history.last_epoch_started) {
- max_les = info.history.last_epoch_started;
- }
- if (!info.is_incomplete() &&
- max_les < info.last_epoch_started) {
- max_les = info.last_epoch_started;
- }
- }
- for ([[maybe_unused]] auto& [shard, info] : infos) {
- if (max_les <= info.last_epoch_started &&
- min_last_update_acceptable > info.last_update) {
- min_last_update_acceptable = info.last_update;
- }
- }
- if (min_last_update_acceptable == eversion_t::max()) {
- return pg_shard_t{};
- }
- // find osd with newest last_update (oldest for ec_pool).
- // if there are multiples, prefer
- // - a longer tail, if it brings another peer into log contiguity
- // - the current primary
- struct is_good {
- // boost::max_element() copies the filter function, so make it copyable
- eversion_t min_last_update_acceptable;
- epoch_t max_les;
- const PG* thiz;
- is_good(eversion_t min_lua, epoch_t max_les, const PG* thiz)
- : min_last_update_acceptable{min_lua}, max_les{max_les}, thiz{thiz} {}
- is_good(const is_good& rhs) = default;
- is_good& operator=(const is_good& rhs) = default;
- bool operator()(const PG::peer_info_t::value_type& pi) const {
- auto& [shard, info] = pi;
- if (!thiz->is_up(shard) && !thiz->is_acting(shard)) {
- return false;
- // Only consider peers with last_update >= min_last_update_acceptable
- } else if (info.last_update < min_last_update_acceptable) {
- return false;
- // Disqualify anyone with a too old last_epoch_started
- } else if (info.last_epoch_started < max_les) {
- return false;
- // Disqualify anyone who is incomplete (not fully backfilled)
- } else if (info.is_incomplete()) {
- return false;
- } else {
- return true;
- }
- }
- };
- auto better = [require_rollback=pool.require_rollback(), this]
- (const PG::peer_info_t::value_type& lhs,
- const PG::peer_info_t::value_type& rhs) {
- if (require_rollback) {
- // prefer older last_update for ec_pool
- if (lhs.second.last_update > rhs.second.last_update) {
- return true;
- } else if (lhs.second.last_update < rhs.second.last_update) {
- return false;
- }
- } else {
- // prefer newer last_update for replica pool
- if (lhs.second.last_update > rhs.second.last_update) {
- return false;
- } else if (lhs.second.last_update < rhs.second.last_update) {
- return true;
- }
- }
- // Prefer longer tail
- if (lhs.second.log_tail > rhs.second.log_tail) {
- return true;
- } else if (lhs.second.log_tail < rhs.second.log_tail) {
- return false;
- }
- // prefer complete to missing
- if (lhs.second.has_missing() && !rhs.second.has_missing()) {
- return true;
- } else if (!lhs.second.has_missing() && rhs.second.has_missing()) {
- return false;
- }
- // prefer current primary (usually the caller), all things being equal
- if (rhs.first == whoami) {
- return true;
- } else if (lhs.first == whoami) {
- return false;
- }
- return false;
- };
- auto good_infos =
- (infos | boost::adaptors::filtered(is_good{min_last_update_acceptable,
- max_les, this}));
- if (good_infos.empty()) {
- return pg_shard_t{};
- } else {
- return boost::max_element(good_infos, better)->first;
- }
-}
-
-std::pair<PG::choose_acting_t, pg_shard_t> PG::choose_acting()
-{
- auto all_info = peer_info;
- all_info.emplace(whoami, info);
-
- auto auth_log_shard = find_best_info(all_info);
- if (auth_log_shard.is_undefined()) {
- if (up != acting) {
- logger().info("{} no suitable info found (incomplete backfills?), "
- "reverting to up", __func__);
- want_acting = up;
- // todo: reset pg_temp
- return {choose_acting_t::should_change, auth_log_shard};
- } else {
- logger().info("{} failed ", __func__);
- ceph_assert(want_acting.empty());
- return {choose_acting_t::pg_incomplete, auth_log_shard};
- }
- }
-
- auto want = calc_acting(auth_log_shard, acting, all_info);
- if (want != acting) {
- logger().info("{} want {} != acting {}, requesting pg_temp change",
- __func__, want, acting);
- want_acting = std::move(want);
- // todo: update pg temp
- return {choose_acting_t::should_change, auth_log_shard};
- } else {
- logger().info("{} want={}", __func__, want);
- want_acting.clear();
- acting_recovery_backfill.clear();
- std::transform(want.begin(), want.end(),
- std::inserter(acting_recovery_backfill, acting_recovery_backfill.end()),
- [](int osd) { return pg_shard_t{osd, shard_id_t::NO_SHARD}; });
- return {choose_acting_t::dont_change, auth_log_shard};
- }
-}
-
-bool PG::should_send_notify() const
-{
- return should_notify_primary && primary.osd >= 0;
-}
-
-pg_notify_t PG::get_notify(epoch_t query_epoch) const
-{
- return pg_notify_t{primary.shard,
- whoami.shard,
- query_epoch,
- get_osdmap_epoch(),
- info};
-}
-
-bool PG::is_last_activated_peer(pg_shard_t peer)
-{
- if (!acting_recovery_backfill.count(peer))
- return false;
- if (!peer_activated.insert(peer).second)
- return false;
- logger().info("peer osd.{} activated and committed", peer);
- return peer_activated.size() == acting_recovery_backfill.size();
-}
-
-
-void PG::clear_primary_state()
-{
- peer_info.clear();
- want_acting.clear();
- need_up_thru = 0;
- peer_activated.clear();
+ return false;
+#endif
+ return true;
}
-bool PG::should_restart_peering(int new_up_primary,
- int new_acting_primary,
- const std::vector<int>& new_up,
- const std::vector<int>& new_acting,
- cached_map_t last_map,
- cached_map_t osd_map) const
-{
- auto pgid = info.pgid.pgid;
- auto pool = last_map->get_pg_pool(pgid.pool());
- if (!pool) {
- return false;
- }
- auto new_pool = osd_map->get_pg_pool(pgid.pool());
- if (!new_pool) {
- return true;
- }
- if (PastIntervals::is_new_interval(
- primary.osd,
- new_acting_primary,
- acting,
- new_acting,
- up_primary.osd,
- new_up_primary,
- up,
- new_up,
- pool->size,
- new_pool->size,
- pool->min_size,
- new_pool->min_size,
- pool->get_pg_num(),
- new_pool->get_pg_num(),
- pool->get_pg_num_pending(),
- new_pool->get_pg_num_pending(),
- last_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
- osd_map->test_flag(CEPH_OSDMAP_SORTBITWISE),
- last_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
- osd_map->test_flag(CEPH_OSDMAP_RECOVERY_DELETES),
- pgid)) {
- logger().info("new interval new_up {} new_acting {}",
- new_up, new_acting);
- return true;
- }
- if (!last_map->is_up(whoami.osd) && osd_map->is_up(whoami.osd)) {
- logger().info(" osd transitioned from down -> up");
- return true;
- }
- return false;
+void PG::log_state_enter(const char *state) {
+ logger().info("Entering state: {}", state);
}
-template<class T>
-bool compare_n_set(T& v, const T& new_v)
-{
- if (v != new_v) {
- v = new_v;
- return true;
- } else {
- return false;
- }
+void PG::log_state_exit(
+ const char *state_name, utime_t enter_time,
+ uint64_t events, utime_t event_dur) {
+ logger().info(
+ "Exiting state: {}, entered at {}, {} spent on {} events",
+ state_name,
+ enter_time,
+ event_dur,
+ events);
}
-void PG::start_peering_interval(int new_up_primary,
- int new_acting_primary,
- const std::vector<int>& new_up,
- const std::vector<int>& new_acting,
- cached_map_t last_map)
+void PG::init(
+ ceph::os::CollectionRef coll,
+ int role,
+ const vector<int>& newup, int new_up_primary,
+ const vector<int>& newacting, int new_acting_primary,
+ const pg_history_t& history,
+ const PastIntervals& pi,
+ bool backfill,
+ ObjectStore::Transaction &t)
{
- // todo
- update_last_peering_reset();
-
- auto old_acting_primary = primary;
- auto old_acting = std::move(acting);
- auto old_up_primary = up_primary;
- auto old_up = std::move(up);
-
- update_primary_state(new_up, new_up_primary,
- new_acting, new_acting_primary);
- if (compare_n_set(info.stats.up, up) +
- compare_n_set(info.stats.up_primary, up_primary.osd) +
- compare_n_set(info.stats.acting, acting) +
- compare_n_set(info.stats.acting_primary, primary.osd)) {
- info.stats.mapping_epoch = osdmap->get_epoch();
- }
- if (old_up_primary != up_primary || old_up != up) {
- info.history.same_up_since = osdmap->get_epoch();
- }
- // this comparison includes primary rank via pg_shard_t
- if (old_acting_primary != get_primary()) {
- info.history.same_primary_since = osdmap->get_epoch();
- }
- // todo: always start a new interval
- info.history.same_interval_since = osdmap->get_epoch();
- // This will now be remapped during a backfill in cases
- // that it would not have been before.
- if (up != acting) {
- set_state(PG_STATE_REMAPPED);
- } else {
- clear_state(PG_STATE_REMAPPED);
- }
- // deactivate.
- clear_state(PG_STATE_ACTIVE);
- clear_state(PG_STATE_PEERED);
- clear_state(PG_STATE_DOWN);
-
- acting_recovery_backfill.clear();
- // should we tell the primary we are here?
- should_notify_primary = !is_primary();
+ coll_ref = coll;
+ peering_state.init(
+ role, newup, new_up_primary, newacting,
+ new_acting_primary, history, pi, backfill, t);
}
-void PG::activate(epoch_t activation_epoch)
+seastar::future<> PG::read_state(ceph::os::CyanStore* store)
{
- clear_state(PG_STATE_DOWN);
+ coll_ref = store->open_collection(coll_t(pgid));
+ return PGMeta{store, pgid}.load().then(
+ [this, store](pg_info_t pg_info, PastIntervals past_intervals) {
+ return peering_state.init_from_disk_state(
+ std::move(pg_info),
+ std::move(past_intervals),
+ [this, store, &pg_info] (PGLog &pglog) {
+ return pglog.read_log_and_missing_crimson(
+ *store,
+ coll_ref,
+ peering_state.get_info(),
+ pgmeta_oid);
+ });
+ }).then([this, store]() {
+ int primary, up_primary;
+ vector<int> acting, up;
+ peering_state.get_osdmap()->pg_to_up_acting_osds(
+ pgid.pgid, &up, &up_primary, &acting, &primary);
+ peering_state.init_primary_up_acting(
+ up,
+ acting,
+ up_primary,
+ primary);
+ int rr = OSDMap::calc_pg_role(pg_whoami.osd, acting);
+ if (peering_state.get_pool().info.is_replicated() || rr == pg_whoami.shard)
+ peering_state.set_role(rr);
+ else
+ peering_state.set_role(-1);
+
+ PeeringCtx rctx;
+ PeeringState::Initialize evt;
+ peering_state.handle_event(evt, &rctx);
+ peering_state.write_if_dirty(rctx.transaction);
+ store->do_transaction(
+ coll_ref,
+ std::move(rctx.transaction));
- if (is_primary()) {
- // only update primary last_epoch_started if we will go active
- if (acting.size() >= pool.min_size) {
- info.last_epoch_started = activation_epoch;
- info.last_interval_started = info.history.same_interval_since;
- }
- } else if (is_acting(whoami)) {
- // update last_epoch_started on acting replica to whatever the primary sent
- // unless it's smaller (could happen if we are going peered rather than
- // active, see doc/dev/osd_internals/last_epoch_started.rst)
- if (info.last_epoch_started < activation_epoch) {
- info.last_epoch_started = activation_epoch;
- info.last_interval_started = info.history.same_interval_since;
- }
- }
- if (is_primary()) {
- // start up replicas
- seastar::do_for_each(
- acting_recovery_backfill.begin(),
- acting_recovery_backfill.end(),
- [this](pg_shard_t peer) { return activate_peer(peer); });
- set_state(PG_STATE_ACTIVATING);
- } else {
- // todo: write/commit pg log, activate myself, and then tell primary
- on_activated();
- pg_notify_t notify{get_primary().shard,
- whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info};
- auto m = make_message<MOSDPGInfo>(
- get_osdmap_epoch(),
- MOSDPGInfo::pg_list_t{make_pair(std::move(notify), PastIntervals{})});
- send_to_osd(get_primary().osd, std::move(m), get_osdmap_epoch());
- }
- // todo:
- info.last_complete = info.last_update;
- update_need_up_thru();
+ return seastar::now();
+ });
}
-void PG::on_activated()
+void PG::do_peering_event(
+ const boost::statechart::event_base &evt,
+ PeeringCtx &rctx)
{
- if (acting.size() >= pool.min_size) {
- set_state(PG_STATE_ACTIVE);
- } else {
- set_state(PG_STATE_PEERED);
- }
+ peering_state.handle_event(
+ evt,
+ &rctx);
}
-seastar::future<> PG::activate_peer(pg_shard_t peer)
+void PG::do_peering_event(
+ PGPeeringEvent& evt, PeeringCtx &rctx)
{
- if (peer == whoami) {
- // todo: write/commit pg log
- peer_activated.insert(whoami);
- return seastar::now();
- }
- auto& pi = peer_info[peer];
- MOSDPGLog* m = nullptr;
- if (pi.last_update == info.last_update) {
- // empty log
- logger().info("activate peer osd.{} is up to date, "
- "but sending pg_log anyway", peer);
- m = new MOSDPGLog{peer.shard,
- whoami.shard,
- get_osdmap_epoch(),
- get_info(),
- get_last_peering_reset()};
- } else if (pi.last_backfill.is_min()) {
- logger().info("starting backfill to osd.{} from ({},{}] {} to {}", peer,
- pi.log_tail, pi.last_update,
- pi.last_backfill, info.last_update);
- // backfill
- pi.last_update = info.last_update;
- pi.last_complete = info.last_update;
- pi.last_epoch_started = info.last_epoch_started;
- pi.last_interval_started = info.last_interval_started;
- pi.history = info.history;
- pi.hit_set = info.hit_set;
- pi.stats.stats.clear();
- pi.purged_snaps = info.purged_snaps;
- m = new MOSDPGLog{peer.shard,
- whoami.shard,
- get_osdmap_epoch(),
- pi,
- get_last_peering_reset()};
+ if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
+ logger().debug("{} handling {}", __func__, evt.get_desc());
+ return do_peering_event(evt.get_event(), rctx);
} else {
- // catch up
- logger().info("send missing log to osd.{}", peer);
- m = new MOSDPGLog{peer.shard,
- whoami.shard,
- get_osdmap_epoch(),
- get_info(),
- get_last_peering_reset()};
- // todo. send pg_log
- pi.last_update = info.last_update;
- }
- return send_to_osd(peer.osd, Ref<Message>{m, false}, get_osdmap_epoch());
-}
-
-void PG::maybe_mark_clean()
-{
- if (actingset.size() == osdmap->get_pg_size(pgid.pgid)) {
- set_state(PG_STATE_CLEAN);
- info.history.last_epoch_clean = get_osdmap_epoch();
- info.history.last_interval_clean = info.history.same_interval_since;
+ logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
}
}
-seastar::future<> PG::do_peering_event(std::unique_ptr<PGPeeringEvent> evt)
+void PG::handle_advance_map(
+ cached_map_t next_map, PeeringCtx &rctx)
{
- return dispatch_context(recovery_state.handle_event(evt->get_event()));
+ vector<int> newup, newacting;
+ int up_primary, acting_primary;
+ next_map->pg_to_up_acting_osds(
+ pgid.pgid,
+ &newup, &up_primary,
+ &newacting, &acting_primary);
+ peering_state.advance_map(
+ next_map,
+ peering_state.get_osdmap(),
+ newup,
+ up_primary,
+ newacting,
+ acting_primary,
+ rctx);
}
-seastar::future<> PG::dispatch_context(recovery::Context&& ctx)
+void PG::handle_activate_map(PeeringCtx &rctx)
{
- return seastar::do_with(recovery::Context{ctx}, [this](auto& todo) {
- return seastar::when_all_succeed(
- seastar::parallel_for_each(std::move(todo.notifies),
- [this](auto& osd_notifies) {
- auto& [peer, notifies] = osd_notifies;
- auto m = make_message<MOSDPGNotify>(get_osdmap_epoch(),
- std::move(notifies));
- return send_to_osd(peer, m, get_osdmap_epoch());
- }),
- seastar::parallel_for_each(std::move(todo.queries),
- [this](auto& osd_queries) {
- auto& [peer, queries] = osd_queries;
- auto m = make_message<MOSDPGQuery>(get_osdmap_epoch(),
- std::move(queries));
- return send_to_osd(peer, m, get_osdmap_epoch());
- }),
- seastar::parallel_for_each(std::move(todo.infos),
- [this](auto& osd_infos) {
- auto& [peer, infos] = osd_infos;
- auto m = make_message<MOSDPGInfo>(get_osdmap_epoch(),
- std::move(infos));
- return send_to_osd(peer, m, get_osdmap_epoch());
- })
- );
- });
+ peering_state.activate_map(rctx);
}
-seastar::future<> PG::handle_advance_map(cached_map_t next_map)
+void PG::handle_initialize(PeeringCtx &rctx)
{
- auto last_map = std::move(osdmap);
- osdmap = std::move(next_map);
- vector<int> new_up, new_acting;
- int up_primary, acting_primary;
- osdmap->pg_to_up_acting_osds(pgid.pgid,
- &new_up,
- &up_primary,
- &new_acting,
- &acting_primary);
- logger().info("handle_advance_map {}/{} -- {}/{}",
- new_up, new_acting, up_primary, acting_primary);
- recovery_state.handle_event(AdvMap{osdmap, last_map,
- std::move(new_up), up_primary,
- std::move(new_acting), acting_primary});
- return seastar::now();
+ PeeringState::Initialize evt;
+ peering_state.handle_event(evt, &rctx);
}
-seastar::future<> PG::handle_activate_map()
-{
- recovery_state.handle_event(ActMap{});
- return seastar::now();
-}
void PG::print(ostream& out) const
{
- out << "pg[" << info
- << " " << up;
- if (acting != up)
- out << "/" << acting;
- out << " lpr=" << last_peering_reset
- << " " << pg_state_string(info.stats.state)
- << "]";
+ out << peering_state << " ";
}
std::ostream& operator<<(std::ostream& os, const PG& pg)
{
+ os << " pg_epoch " << pg.get_osdmap_epoch() << " ";
pg.print(os);
return os;
}
-void PG::reply_pg_query(const MQuery& query, recovery::Context* ctx)
-{
- switch (query.query.type) {
- case pg_query_t::INFO:
- return reply_pg_query_for_info(query, ctx);
- case pg_query_t::LOG:
- return reply_pg_query_for_log(query, false);
- case pg_query_t::FULLLOG:
- return reply_pg_query_for_log(query, true);
- }
-}
-
-void PG::reply_pg_query_for_info(const MQuery& query, recovery::Context* ctx)
-{
- recovery::Context::notify_t notify{pg_notify_t{query.from.shard,
- whoami.shard,
- query.query_epoch,
- get_osdmap_epoch(),
- info},
- past_intervals};
- ctx->notifies[query.from.osd].push_back(std::move(notify));
-}
-
-void PG::reply_pg_query_for_log(const MQuery& query, bool full)
-{
- auto m = make_message<MOSDPGLog>(query.from.shard,
- whoami.shard,
- get_osdmap_epoch(),
- info,
- query.query_epoch);
- // todo:
- // m->missing = pg_log.get_missing();
- if (full) {
- // m->log = pg_log.get_log();
- } else {
- // maybe partial
- }
- send_to_osd(query.from.osd, m, get_osdmap_epoch());
-}
-
-seastar::future<> PG::send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch)
-{
- if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
- return seastar::now();
- } else {
- return msgr.connect(osdmap->get_cluster_addrs(peer).front(),
- CEPH_ENTITY_TYPE_OSD)
- .then([m, this] (auto xconn) {
- return (*xconn)->send(m);
- });
- }
-}
-
-seastar::future<> PG::share_pg_info()
-{
- return seastar::do_for_each(
- acting_recovery_backfill.begin(),
- acting_recovery_backfill.end(),
- [this](pg_shard_t peer) {
- if (peer == whoami) return seastar::now();
- if (auto pi = peer_info.find(peer); pi != peer_info.end()) {
- pi->second.last_epoch_started = info.last_epoch_started;
- pi->second.last_interval_started = info.last_interval_started;
- pi->second.history.merge(info.history);
- }
- pg_notify_t notify{peer.shard,
- whoami.shard,
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- info};
- auto m = make_message<MOSDPGInfo>(
- get_osdmap_epoch(),
- MOSDPGInfo::pg_list_t{make_pair(std::move(notify),
- past_intervals)});
- return send_to_osd(peer.osd, m, get_osdmap_epoch());
- });
-}
-
seastar::future<> PG::wait_for_active()
{
- logger().debug("wait_for_active: {}", pg_state_string(info.stats.state));
+ logger().debug("wait_for_active: {}", peering_state.get_pg_state_string());
if (local_conf()->crimson_debug_pg_always_active) {
return seastar::now();
}
- if (test_state(PG_STATE_ACTIVE)) {
+
+ if (peering_state.is_active()) {
return seastar::now();
} else {
- if (!active_promise) {
- active_promise = seastar::shared_promise<>();
- }
- return active_promise->get_shared_future();
+ return active_promise.get_shared_future();
}
}
throw std::invalid_argument("unable to decode PGNLS handle");
}
const auto pg_start = pgid.pgid.get_hobj_start();
- const auto pg_end = pgid.pgid.get_hobj_end(pool.get_pg_num());
+ const auto pg_end = pgid.pgid.get_hobj_end(peering_state.get_pool().info.get_pg_num());
if (!(lower_bound.is_min() ||
lower_bound.is_max() ||
(lower_bound >= pg_start && lower_bound < pg_end))) {
backend->evict_object_state(oid);
auto reply = make_message<MOSDOpReply>(m.get(), -ENOENT, get_osdmap_epoch(),
0, false);
- reply->set_enoent_reply_versions(info.last_update,
- info.last_user_version);
+ reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
});
#include <seastar/core/future.hh>
#include <seastar/core/shared_future.hh>
+#include "common/dout.h"
#include "crimson/net/Fwd.h"
#include "os/Transaction.h"
+#include "crimson/osd/shard_services.h"
#include "osd/osd_types.h"
#include "osd/osd_internal_types.h"
-#include "recovery_state.h"
+#include "osd/PeeringState.h"
template<typename T> using Ref = boost::intrusive_ptr<T>;
class OSDMap;
class PG : public boost::intrusive_ref_counter<
PG,
- boost::thread_unsafe_counter>
+ boost::thread_unsafe_counter>,
+ PeeringState::PeeringListener,
+ DoutPrefixProvider
{
using ec_profile_t = std::map<std::string,std::string>;
- using cached_map_t = boost::local_shared_ptr<OSDMap>;
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+ spg_t pgid;
+ pg_shard_t pg_whoami;
+ coll_t coll;
+ ceph::os::CollectionRef coll_ref;
+ ghobject_t pgmeta_oid;
public:
PG(spg_t pgid,
pg_shard_t pg_shard,
pg_pool_t&& pool,
std::string&& name,
- std::unique_ptr<PGBackend> backend,
cached_map_t osdmap,
- ceph::net::Messenger& msgr);
-
- epoch_t get_osdmap_epoch() const;
- const pg_info_t& get_info() const;
- const pg_stat_t& get_stats() const;
- void clear_state(uint64_t mask);
- bool test_state(uint64_t mask) const;
- void set_state(uint64_t mask);
- const PastIntervals& get_past_intervals() const;
- pg_shard_t get_primary() const;
- bool is_primary() const;
- bool is_acting(pg_shard_t pg_shard) const;
- bool is_up(pg_shard_t pg_shard) const;
- pg_shard_t get_whoami() const;
- epoch_t get_last_peering_reset() const;
- void update_last_peering_reset();
- epoch_t get_need_up_thru() const;
- void update_need_up_thru(const OSDMap* o = nullptr);
-
- bool proc_replica_info(pg_shard_t from,
- const pg_info_t& pg_info,
- epoch_t send_epoch);
- void proc_replica_log(pg_shard_t from,
- const pg_info_t& pg_info,
- const pg_log_t& pg_log,
- const pg_missing_t& pg_missing);
-
- using peer_info_t = std::map<pg_shard_t, pg_info_t>;
- pg_shard_t find_best_info(const PG::peer_info_t& infos) const;
- enum class choose_acting_t {
- dont_change,
- should_change,
- pg_incomplete,
+ ceph::osd::ShardServices &shard_services,
+ ec_profile_t profile);
+
+ // EpochSource
+ epoch_t get_osdmap_epoch() const final {
+ return peering_state.get_osdmap_epoch();
+ }
+
+ // DoutPrefixProvider
+ std::ostream& gen_prefix(std::ostream& out) const final {
+ return out << *this;
+ }
+ CephContext *get_cct() const final {
+ return shard_services.get_cct();
+ }
+ unsigned get_subsys() const final {
+ return ceph_subsys_osd;
+ }
+
+ ceph::os::CollectionRef get_collection_ref() {
+ return coll_ref;
+ }
+
+ // PeeringListener
+ void prepare_write(
+ pg_info_t &info,
+ pg_info_t &last_written_info,
+ PastIntervals &past_intervals,
+ PGLog &pglog,
+ bool dirty_info,
+ bool dirty_big_info,
+ bool need_write_epoch,
+ ObjectStore::Transaction &t) final {
+ std::map<string,bufferlist> km;
+ if (dirty_big_info || dirty_info) {
+ int ret = prepare_info_keymap(
+ shard_services.get_cct(),
+ &km,
+ peering_state.get_osdmap()->get_epoch(),
+ info,
+ last_written_info,
+ past_intervals,
+ dirty_big_info,
+ need_write_epoch,
+ true,
+ nullptr,
+ this);
+ ceph_assert(ret == 0);
+ }
+ pglog.write_log_and_missing(
+ t, &km, coll, pgmeta_oid,
+ peering_state.get_pool().info.require_rollback());
+ if (!km.empty())
+ t.omap_setkeys(coll, pgmeta_oid, km);
+ }
+
+ void on_info_history_change() final {
+ // Not needed yet -- mainly for scrub scheduling
+ }
+
+ void scrub_requested(bool deep, bool repair) final {
+ ceph_assert(0 == "Not implemented");
+ }
+
+ uint64_t get_snap_trimq_size() const final {
+ return 0;
+ }
+
+ void send_cluster_message(
+ int osd, Message *m,
+ epoch_t epoch, bool share_map_update=false) final {
+ shard_services.send_to_osd(osd, m, epoch);
+ }
+
+ void send_pg_created(pg_t pgid) final {
+ shard_services.send_pg_created(pgid);
+ }
+
+ bool try_flush_or_schedule_async() final;
+
+ void start_flush_on_transaction(
+ ObjectStore::Transaction &t) final {
+ t.register_on_commit(
+ new LambdaContext([this](){
+ peering_state.complete_flush();
+ }));
+ }
+
+ void on_flushed() final {
+ // will be needed for unblocking IO operations/peering
+ }
+
+ void schedule_event_after(
+ PGPeeringEventRef event,
+ float delay) final {
+ ceph_assert(0 == "Not implemented yet");
+ }
+
+ void request_local_background_io_reservation(
+ unsigned priority,
+ PGPeeringEventRef on_grant,
+ PGPeeringEventRef on_preempt) final {
+ ceph_assert(0 == "Not implemented yet");
+ }
+
+ void update_local_background_io_priority(
+ unsigned priority) final {
+ ceph_assert(0 == "Not implemented yet");
+ }
+
+ void cancel_local_background_io_reservation() final {
+ // Not implemented yet, but gets called on exit() from some states
+ }
+
+ void request_remote_recovery_reservation(
+ unsigned priority,
+ PGPeeringEventRef on_grant,
+ PGPeeringEventRef on_preempt) final {
+ ceph_assert(0 == "Not implemented yet");
+ }
+
+ void cancel_remote_recovery_reservation() final {
+ // Not implemented yet, but gets called on exit() from some states
+ }
+
+ void schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit) final {
+ t.register_on_commit(
+ new LambdaContext([this, on_commit](){
+ PeeringCtx rctx;
+ do_peering_event(on_commit, rctx);
+ shard_services.dispatch_context(std::move(rctx));
+ }));
+ }
+
+ void update_heartbeat_peers(set<int> peers) final {
+ // Not needed yet
+ }
+ void set_probe_targets(const set<pg_shard_t> &probe_set) final {
+ // Not needed yet
+ }
+ void clear_probe_targets() final {
+ // Not needed yet
+ }
+ void queue_want_pg_temp(const std::vector<int> &wanted) final {
+ shard_services.queue_want_pg_temp(pgid.pgid, wanted);
+ }
+ void clear_want_pg_temp() final {
+ shard_services.remove_want_pg_temp(pgid.pgid);
+ }
+ void publish_stats_to_osd() final {
+ // Not needed yet
+ }
+ void clear_publish_stats() final {
+ // Not needed yet
+ }
+ void check_recovery_sources(const OSDMapRef& newmap) final {
+ // Not needed yet
+ }
+ void check_blacklisted_watchers() final {
+ // Not needed yet
+ }
+ void clear_primary_state() final {
+ // Not needed yet
+ }
+
+
+ void on_pool_change() final {
+ // Not needed yet
+ }
+ void on_role_change() final {
+ // Not needed yet
+ }
+ void on_change(ObjectStore::Transaction &t) final {
+ // Not needed yet
+ }
+ void on_activate(interval_set<snapid_t> to_trim) final {
+ // Not needed yet (will be needed for IO unblocking)
+ }
+ void on_activate_complete() final {
+ active_promise.set_value();
+ active_promise = {};
+ }
+ void on_new_interval() final {
+ // Not needed yet
+ }
+ Context *on_clean() final {
+ // Not needed yet (will be needed for IO unblocking)
+ return nullptr;
+ }
+ void on_activate_committed() final {
+ // Not needed yet (will be needed for IO unblocking)
+ }
+ void on_active_exit() final {
+ // Not needed yet
+ }
+
+ void on_removal(ObjectStore::Transaction &t) final {
+ // TODO
+ }
+ void do_delete_work(ObjectStore::Transaction &t) final {
+ // TODO
+ }
+
+ // merge/split not ready
+ void clear_ready_to_merge() final {}
+ void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
+ void set_not_ready_to_merge_source(pg_t pgid) final {}
+ void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
+ void set_ready_to_merge_source(eversion_t lu) final {}
+
+ void on_active_actmap() final {
+ // Not needed yet
+ }
+ void on_active_advmap(const OSDMapRef &osdmap) final {
+ // Not needed yet
+ }
+ epoch_t oldest_stored_osdmap() final {
+ // TODO
+ return 0;
+ }
+
+
+ void on_backfill_reserved() final {
+ ceph_assert(0 == "Not implemented");
+ }
+ void on_backfill_canceled() final {
+ ceph_assert(0 == "Not implemented");
+ }
+ void on_recovery_reserved() final {
+ ceph_assert(0 == "Not implemented");
+ }
+
+
+ bool try_reserve_recovery_space(
+ int64_t primary_num_bytes, int64_t local_num_bytes) final {
+ return true;
+ }
+ void unreserve_recovery_space() final {}
+
+ struct PGLogEntryHandler : public PGLog::LogEntryHandler {
+ PG *pg;
+ ObjectStore::Transaction *t;
+ PGLogEntryHandler(PG *pg, ObjectStore::Transaction *t) : pg(pg), t(t) {}
+
+ // LogEntryHandler
+ void remove(const hobject_t &hoid) override {
+ // TODO
+ }
+ void try_stash(const hobject_t &hoid, version_t v) override {
+ // TODO
+ }
+ void rollback(const pg_log_entry_t &entry) override {
+ // TODO
+ }
+ void rollforward(const pg_log_entry_t &entry) override {
+ // TODO
+ }
+ void trim(const pg_log_entry_t &entry) override {
+ // TODO
+ }
};
- std::vector<int>
- calc_acting(pg_shard_t auth_shard,
- const vector<int>& acting,
- const map<pg_shard_t, pg_info_t>& all_info) const;
- std::pair<choose_acting_t, pg_shard_t> choose_acting();
+ PGLog::LogEntryHandlerRef get_log_handler(
+ ObjectStore::Transaction &t) final {
+ return std::make_unique<PG::PGLogEntryHandler>(this, &t);
+ }
+
+ void rebuild_missing_set_with_deletes(PGLog &pglog) final {
+ ceph_assert(0 == "Impossible for crimson");
+ }
+
+ PerfCounters &get_peering_perf() final {
+ return shard_services.get_recoverystate_perf_logger();
+ }
+ PerfCounters &get_perf_logger() final {
+ return shard_services.get_perf_logger();
+ }
+
+ void log_state_enter(const char *state) final;
+ void log_state_exit(
+ const char *state_name, utime_t enter_time,
+ uint64_t events, utime_t event_dur) final;
+
+ void dump_recovery_info(Formatter *f) const final {
+ }
+
+ OstreamTemp get_clog_info() final {
+ // not needed yet: replace with not a stub (needs to be wired up to monc)
+ return OstreamTemp(CLOG_INFO, nullptr);
+ }
+ OstreamTemp get_clog_debug() final {
+ // not needed yet: replace with not a stub (needs to be wired up to monc)
+ return OstreamTemp(CLOG_DEBUG, nullptr);
+ }
+ OstreamTemp get_clog_error() final {
+ // not needed yet: replace with not a stub (needs to be wired up to monc)
+ return OstreamTemp(CLOG_ERROR, nullptr);
+ }
+
+ // Utility
+ bool is_primary() const {
+ return peering_state.is_primary();
+ }
+ pg_stat_t get_stats() {
+ auto stats = peering_state.prepare_stats_for_publish(
+ false,
+ pg_stat_t(),
+ object_stat_collection_t());
+ ceph_assert(stats);
+ return *stats;
+ }
+ bool get_need_up_thru() const {
+ return peering_state.get_need_up_thru();
+ }
+
+ /// initialize created PG
+ void init(
+ ceph::os::CollectionRef coll_ref,
+ int role,
+ const std::vector<int>& up,
+ int up_primary,
+ const std::vector<int>& acting,
+ int acting_primary,
+ const pg_history_t& history,
+ const PastIntervals& pim,
+ bool backfill,
+ ObjectStore::Transaction &t);
+
seastar::future<> read_state(ceph::os::CyanStore* store);
- // peering/recovery
- bool should_send_notify() const;
- pg_notify_t get_notify(epoch_t query_epoch) const;
- bool is_last_activated_peer(pg_shard_t peer);
- void clear_primary_state();
-
- bool should_restart_peering(int new_up_primary,
- int new_acting_primary,
- const std::vector<int>& new_up,
- const std::vector<int>& new_acting,
- cached_map_t last_map,
- cached_map_t osd_map) const;
- void start_peering_interval(int new_up_primary,
- int new_acting_primary,
- const std::vector<int>& new_up,
- const std::vector<int>& new_acting,
- cached_map_t last_map);
- void activate(epoch_t activation_epoch);
- void on_activated();
- void maybe_mark_clean();
-
- seastar::future<> do_peering_event(std::unique_ptr<PGPeeringEvent> evt);
- seastar::future<> dispatch_context(recovery::Context&& ctx);
- seastar::future<> handle_advance_map(cached_map_t next_map);
- seastar::future<> handle_activate_map();
- seastar::future<> share_pg_info();
- void reply_pg_query(const MQuery& query, recovery::Context* ctx);
+ void do_peering_event(
+ const boost::statechart::event_base &evt,
+ PeeringCtx &rctx);
+ void do_peering_event(
+ PGPeeringEvent& evt, PeeringCtx &rctx);
+ void do_peering_event(
+ std::unique_ptr<PGPeeringEvent> evt,
+ PeeringCtx &rctx) {
+ return do_peering_event(*evt, rctx);
+ }
+ void do_peering_event(
+ PGPeeringEventRef evt,
+ PeeringCtx &rctx) {
+ return do_peering_event(*evt, rctx);
+ }
+
+ void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
+ void handle_activate_map(PeeringCtx &rctx);
+ void handle_initialize(PeeringCtx &rctx);
seastar::future<> handle_op(ceph::net::Connection* conn,
Ref<MOSDOp> m);
void print(ostream& os) const;
private:
- seastar::future<> activate_peer(pg_shard_t peer);
- void reply_pg_query_for_info(const MQuery& query, recovery::Context* ctx);
- void reply_pg_query_for_log(const MQuery& query, bool full);
- seastar::future<> send_to_osd(int peer, Ref<Message> m, epoch_t from_epoch);
-
- void update_primary_state(const std::vector<int>& new_up,
- int new_up_primary,
- const std::vector<int>& new_acting,
- int new_acting_primary);
seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
seastar::future<> do_osd_op(
ObjectState& os,
uint64_t limit);
private:
- const spg_t pgid;
- pg_shard_t whoami;
- pg_pool_t pool;
-
- epoch_t last_peering_reset = 0;
- epoch_t need_up_thru = 0;
- recovery::State recovery_state;
-
- bool should_notify_primary = false;
-
- using pg_shard_set_t = std::set<pg_shard_t>;
- // peer_info -- projected (updates _before_ replicas ack)
- peer_info_t peer_info; //< info from peers (stray or prior)
- pg_shard_set_t peer_activated;
-
- //< pg state
- pg_info_t info;
- //< last written info, for fast info persistence
- pg_info_t last_written_info;
- PastIntervals past_intervals;
- // primary state
- pg_shard_t primary, up_primary;
- std::vector<int> acting, up;
- pg_shard_set_t actingset, upset;
- pg_shard_set_t acting_recovery_backfill;
- std::vector<int> want_acting;
+ ceph::osd::ShardServices &shard_services;
- seastar::future<> wait_for_active();
- std::optional<seastar::shared_promise<>> active_promise;
+ cached_map_t osdmap;
std::unique_ptr<PGBackend> backend;
- cached_map_t osdmap;
- ceph::net::Messenger& msgr;
+ PeeringState peering_state;
+
+ seastar::shared_promise<> active_promise;
+ seastar::future<> wait_for_active();
+
+ friend std::ostream& operator<<(std::ostream&, const PG& pg);
};
std::ostream& operator<<(std::ostream&, const PG& pg);
std::unique_ptr<PGBackend> PGBackend::create(const spg_t pgid,
const pg_pool_t& pool,
+ ceph::os::CollectionRef coll,
ceph::os::CyanStore* store,
const ec_profile_t& ec_profile)
{
- auto coll = store->open_collection(coll_t{pgid});
switch (pool.type) {
case pg_pool_t::TYPE_REPLICATED:
return std::make_unique<ReplicatedBackend>(pgid.shard, coll, store);
#include <string>
#include <boost/smart_ptr/local_shared_ptr.hpp>
+#include "crimson/os/cyan_store.h"
#include "crimson/common/shared_lru.h"
#include "os/Transaction.h"
#include "osd/osd_types.h"
#include "osd/osd_internal_types.h"
struct hobject_t;
-namespace ceph::os {
- class Collection;
- class CyanStore;
-}
class PGBackend
{
virtual ~PGBackend() = default;
static std::unique_ptr<PGBackend> create(const spg_t pgid,
const pg_pool_t& pool,
+ ceph::os::CollectionRef coll,
ceph::os::CyanStore* store,
const ec_profile_t& ec_profile);
using cached_os_t = boost::local_shared_ptr<ObjectState>;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <vector>
-
-#include <boost/smart_ptr/local_shared_ptr.hpp>
-#include <boost/statechart/custom_reaction.hpp>
-#include <boost/statechart/event.hpp>
-#include <boost/statechart/simple_state.hpp>
-#include <boost/statechart/state.hpp>
-#include <boost/statechart/state_machine.hpp>
-#include <boost/statechart/transition.hpp>
-#include <boost/statechart/event_base.hpp>
-
-#include "osd/PGPeeringEvent.h"
-
-namespace recovery {
-
-struct AdvMap : boost::statechart::event< AdvMap > {
- // TODO: use foreign_ptr<> once the osdmap cache needs to be shared across
- // cores
- using OSDMapRef = boost::local_shared_ptr<OSDMap>;
- OSDMapRef osdmap;
- OSDMapRef last_map;
- std::vector<int> new_up, new_acting;
- int up_primary, acting_primary;
- AdvMap(OSDMapRef osdmap, OSDMapRef last_map,
- const std::vector<int>& new_up, int up_primary,
- const std::vector<int>& new_acting, int acting_primary):
- osdmap(osdmap),
- last_map(last_map),
- new_up(new_up),
- new_acting(new_acting),
- up_primary(up_primary),
- acting_primary(acting_primary) {}
- void print(std::ostream *out) const {
- *out << "AdvMap";
- }
-};
-
-struct ActMap : boost::statechart::event< ActMap > {
- ActMap() : boost::statechart::event< ActMap >() {}
- void print(std::ostream *out) const {
- *out << "ActMap";
- }
-};
-
-struct Activate : boost::statechart::event< Activate > {
- epoch_t activation_epoch;
- explicit Activate(epoch_t q) : boost::statechart::event< Activate >(),
- activation_epoch(q) {}
- void print(std::ostream *out) const {
- *out << "Activate from " << activation_epoch;
- }
-};
-
-struct Initialize : boost::statechart::event<Initialize> {};
-
-}
+++ /dev/null
-#include "recovery_machine.h"
-#include "recovery_state.h"
-#include "pg.h"
-
-namespace recovery
-{
-void Machine::send_notify(pg_shard_t to,
- const pg_notify_t& info,
- const PastIntervals& pi)
-{
- state.context.notifies[to.osd].emplace_back(info, pi);
-}
-
-void Machine::send_query(pg_shard_t to,
- const pg_query_t& query)
-{
- spg_t pgid{pg.get_info().pgid.pgid, to.shard};
- state.context.queries[to.osd].emplace(pgid, query);
-}
-
-recovery::Context* Machine::get_context()
-{
- return &state.context;
-}
-
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <boost/statechart/state_machine.hpp>
-
-#include "osd/osd_types.h"
-
-class PG;
-
-namespace recovery {
-
-struct Initial;
-class State;
-class Context;
-
-struct Machine : public boost::statechart::state_machine<Machine,
- Initial> {
- Machine(State& state, PG& pg)
- : state{state},
- pg{pg}
- {}
- void send_notify(pg_shard_t to,
- const pg_notify_t& info,
- const PastIntervals& pi);
- void send_query(pg_shard_t to,
- const pg_query_t& query);
- recovery::Context* get_context();
- State& state;
- PG& pg;
-};
-}
+++ /dev/null
-#include "recovery_state.h"
-#include "recovery_states.h"
-#include "messages/MOSDPGLog.h" // MLogRec needs this
-
-namespace recovery {
-
-State::State(PG& pg)
- : machine{*this, pg}
-{
- machine.initiate();
-}
-
-Context State::handle_event(const boost::statechart::event_base& evt)
-{
- machine.process_event(evt);
- Context pending;
- std::swap(pending, context);
- return pending;
-}
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <vector>
-#include <map>
-
-#include "recovery_machine.h"
-
-class PG;
-
-namespace recovery {
-
-// PeeringMachine::handle_event() could send multiple notifications to a
-// certain peer OSD before it reaches the last state. for better performance,
-// we send them in batch. the pending messages are collected in RecoveryCtx
-// before being dispatched upon returning of handle_event().
-struct Context
-{
- using osd_id_t = int;
-
- using notify_t = std::pair<pg_notify_t, PastIntervals>;
- std::map<osd_id_t, std::vector<notify_t>> notifies;
-
- using queries_t = std::map<spg_t, pg_query_t>;
- std::map<osd_id_t, queries_t> queries;
-
- using infos_t = std::vector<pair<pg_notify_t, PastIntervals>>;
- std::map<osd_id_t, infos_t> infos;
-};
-
-/// Encapsulates PG recovery process,
-class State {
-public:
- explicit State(PG& pg);
- Context handle_event(const boost::statechart::event_base& evt);
-private:
- friend class Machine;
- Machine machine;
- Context context;
-};
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "recovery_states.h"
-
-#include "crimson/common/log.h"
-#include "messages/MOSDPGLog.h"
-#include "osd/OSDMap.h"
-#include "pg.h"
-
-namespace {
- seastar::logger& logger() {
- return ceph::get_logger(ceph_subsys_osd);
- }
-}
-
-namespace recovery {
-
-/*------Crashed-------*/
-Crashed::Crashed(my_context ctx)
- : my_base(ctx)
-{
- ceph_abort_msg("we got a bad state machine event");
-}
-
-/*------Initial-------*/
-
-Initial::Initial(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Initial");
-}
-
-boost::statechart::result Initial::react(const MNotifyRec& notify)
-{
- logger().info("Active <MNotifyRec>");
- // todo: process info from replica
- auto& pg = context<Machine>().pg;
- pg.update_last_peering_reset();
- return transit<Primary>();
-}
-
-boost::statechart::result Initial::react(const MInfoRec& i)
-{
- // todo
- logger().info("<MInfoRec> transitioning from Initial to Stray");
- post_event(i);
- return transit<Stray>();
-}
-
-boost::statechart::result Initial::react(const MLogRec& i)
-{
- // todo
- logger().info("<MLogRec> transitioning from Initial to Stray");
- post_event(i);
- return transit<Stray>();
-}
-
-void Initial::exit()
-{}
-
-/*--------Reset---------*/
-Reset::Reset(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Reset");
- auto& pg = context<Machine>().pg;
- pg.update_last_peering_reset();
-}
-
-boost::statechart::result Reset::react(const AdvMap& advmap)
-{
- logger().info("Reset advmap");
- auto& pg = context<Machine>().pg;
- if (pg.should_restart_peering(advmap.up_primary,
- advmap.acting_primary,
- advmap.new_up,
- advmap.new_acting,
- advmap.last_map,
- advmap.osdmap)) {
- pg.start_peering_interval(advmap.up_primary,
- advmap.acting_primary,
- advmap.new_up,
- advmap.new_acting,
- advmap.last_map);
- }
- return discard_event();
-}
-
-boost::statechart::result Reset::react(const ActMap&)
-{
- auto& pg = context<Machine>().pg;
- if (pg.should_send_notify()) {
- context<Machine>().send_notify(pg.get_primary(),
- pg.get_notify(pg.get_osdmap_epoch()),
- pg.get_past_intervals());
- }
- return transit<Started>();
-}
-
-void Reset::exit()
-{
- logger().info("Leaving Reset");
-}
-
-/*------Started-------*/
-Started::Started(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Started");
-}
-
-boost::statechart::result Started::react(const AdvMap& advmap)
-{
- auto& pg = context<Machine>().pg;
- logger().info("Started <AdvMap>");
- if (pg.should_restart_peering(advmap.up_primary,
- advmap.acting_primary,
- advmap.new_up,
- advmap.new_acting,
- advmap.last_map,
- advmap.osdmap)) {
- logger().info("should_restart_peering, transitioning to Reset");
- post_event(advmap);
- return transit<Reset>();
- }
- return discard_event();
-}
-
-void Started::exit()
-{
- logger().info("Leaving Started");
-}
-
-/*-------Start---------*/
-Start::Start(my_context ctx)
- : my_base(ctx)
-{
- auto& pg = context<Machine>().pg;
- if (pg.is_primary()) {
- logger().info("Start transitioning to Primary");
- post_event(MakePrimary{});
- } else { // is_stray
- logger().info("Start transitioning to Stray");
- post_event(MakeStray{});
- }
-}
-
-void Start::exit()
-{
- logger().info("Leaving Start");
-}
-
-/*---------Primary--------*/
-Primary::Primary(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Primary");
-}
-
-boost::statechart::result Primary::react(const MNotifyRec& notevt)
-{
- // todo
- auto& pg = context<Machine>().pg;
- pg.proc_replica_info(notevt.from,
- notevt.notify.info,
- notevt.notify.epoch_sent);
- logger().info("Primary <MNotifyRec> {}", pg);
- return discard_event();
-}
-
-boost::statechart::result Primary::react(const ActMap&)
-{
- // todo
- auto& pg = context<Machine>().pg;
- logger().info("Primary <ActMap> {}", pg);
- return discard_event();
-}
-
-void Primary::exit()
-{
- auto& pg = context<Machine>().pg;
- pg.clear_primary_state();
- pg.clear_state(PG_STATE_CREATING);
- logger().info("Leaving Primary: {}", pg);
-}
-
-/*---------Peering--------*/
-Peering::Peering(my_context ctx)
- : my_base(ctx)
-{
- auto& pg = context<Machine>().pg;
- pg.set_state(PG_STATE_PEERING);
- logger().info("Entering Peering");
-}
-
-boost::statechart::result Peering::react(const AdvMap& advmap)
-{
- logger().info("Peering <AdvMap>");
- context<Machine>().pg.update_need_up_thru(advmap.osdmap.get());
- return forward_event();
-}
-
-void Peering::exit()
-{
- auto& pg = context<Machine>().pg;
- logger().info("Leaving Peering: {}", pg);
- pg.clear_state(PG_STATE_PEERING);
-}
-
-/*--------GetInfo---------*/
-GetInfo::GetInfo(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering GetInfo");
- context<Machine>().pg.update_need_up_thru();
- post_event(GotInfo{});
-}
-
-boost::statechart::result GetInfo::react(const MNotifyRec& infoevt)
-{
- logger().info("GetInfo <MNotifyRec>");
- // todo: depends on get_infos()
- post_event(GotInfo());
- return discard_event();
-}
-
-void GetInfo::exit()
-{
- logger().info("Leaving GetInfo");
- // todo
-}
-
-/*------GetLog------------*/
-GetLog::GetLog(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering GetLog");
- auto& pg = context<Machine>().pg;
-
- PG::choose_acting_t adjust_acting;
- tie(adjust_acting, auth_log_shard) = pg.choose_acting();
- switch (adjust_acting) {
- case PG::choose_acting_t::dont_change:
- break;
- case PG::choose_acting_t::should_change:
- // post_event(NeedActingChange());
- return;
- case PG::choose_acting_t::pg_incomplete:
- // post_event(IsIncomplete());
- return;
- }
- // am i the best?
- if (auth_log_shard == pg.get_whoami()) {
- post_event(GotLog{});
- return;
- } else {
- // todo: request log from peer
- return;
- }
-}
-
-boost::statechart::result GetLog::react(const AdvMap& advmap)
-{
- // make sure our log source didn't go down. we need to check
- // explicitly because it may not be part of the prior set, which
- // means the Peering state check won't catch it going down.
- if (!advmap.osdmap->is_up(auth_log_shard.osd)) {
- logger().info("GetLog: auth_log_shard osd.{} went down",
- auth_log_shard.osd);
- post_event(advmap);
- return transit<Reset>();
- }
-
- // let the Peering state do its checks.
- return forward_event();
-}
-
-boost::statechart::result GetLog::react(const MLogRec& logevt)
-{
- assert(!msg);
- if (logevt.from != auth_log_shard) {
- logger().info("GetLog: discarding log from non-auth_log_shard osd.{}",
- logevt.from);
- return discard_event();
- }
- logger().info("GetLog: received master log from osd.{}",
- logevt.from);
- msg = logevt.msg;
- post_event(GotLog{});
- return discard_event();
-}
-
-boost::statechart::result GetLog::react(const GotLog&)
-{
- logger().info("leaving GetLog");
- return transit<GetMissing>();
-}
-
-void GetLog::exit()
-{}
-
-Recovered::Recovered(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Recovered");
- if (context<Active>().all_replicas_activated) {
- logger().info("all_replicas_activated");
- post_event(GoClean{});
- }
-}
-
-boost::statechart::result Recovered::react(const AllReplicasActivated&)
-{
- logger().info("Recovered <AllReplicasActivated>");
- post_event(GoClean{});
- return forward_event();
-}
-
-void Recovered::exit()
-{
- logger().info("Leaving Recovered");
-}
-
-Clean::Clean(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Clean");
- auto& pg = context<Machine>().pg;
- pg.maybe_mark_clean();
-}
-
-void Clean::exit()
-{
- logger().info("Leaving Clean");
- auto& pg = context<Machine>().pg;
- pg.clear_state(PG_STATE_CLEAN);
-}
-
-/*---------Active---------*/
-Active::Active(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Activate");
- auto& pg = context<Machine>().pg;
- assert(pg.is_primary());
- pg.activate(pg.get_osdmap_epoch());
- logger().info("Activate Finished");
-}
-
-boost::statechart::result Active::react(const AdvMap& advmap)
-{
- auto& pg = context<Machine>().pg;
- if (pg.should_restart_peering(advmap.up_primary,
- advmap.acting_primary,
- advmap.new_up,
- advmap.new_acting,
- advmap.last_map,
- advmap.osdmap)) {
- logger().info("Active advmap interval change, fast return");
- return forward_event();
- }
- logger().info("Active advmap");
- return forward_event();
-}
-
-boost::statechart::result Active::react(const ActMap&)
-{
- return forward_event();
-}
-
-boost::statechart::result Active::react(const MNotifyRec& notevt)
-{
- logger().info("Active <MNotifyRec>");
- auto& pg = context<Machine>().pg;
- pg.proc_replica_info(notevt.from,
- notevt.notify.info,
- notevt.notify.epoch_sent);
- return discard_event();
-}
-
-boost::statechart::result Active::react(const MInfoRec& infoevt)
-{
- logger().info("Active <MInfoRec>");
- auto& pg = context<Machine>().pg;
- assert(pg.is_primary());
-
- if (pg.is_last_activated_peer(infoevt.from)) {
- post_event(AllReplicasActivated{});
- }
- return discard_event();
-}
-
-boost::statechart::result Active::react(const MLogRec& logevt)
-{
- logger().info("Active <MLogRec>");
- auto& pg = context<Machine>().pg;
- pg.proc_replica_log(logevt.from,
- logevt.msg->info,
- logevt.msg->log,
- logevt.msg->missing);
- // todo
- return discard_event();
-}
-
-boost::statechart::result Active::react(const AllReplicasActivated &evt)
-{
- logger().info("Active <AllReplicasActivated>");
- all_replicas_activated = true;
- auto& pg = context<Machine>().pg;
- pg.clear_state(PG_STATE_ACTIVATING);
- pg.clear_state(PG_STATE_CREATING);
- pg.on_activated();
- pg.share_pg_info();
- post_event(AllReplicasRecovered{});
- return discard_event();
-}
-
-void Active::exit()
-{
- auto& pg = context<Machine>().pg;
- pg.clear_state(PG_STATE_ACTIVATING);
- pg.clear_state(PG_STATE_DEGRADED);
- pg.clear_state(PG_STATE_UNDERSIZED);
-
- logger().info("Leaving Active");
-}
-
-/*------Activating--------*/
-Activating::Activating(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering Activating");
-}
-
-void Activating::exit()
-{
- logger().info("Leaving Activating");
-}
-
-/*------ReplicaActive-----*/
-ReplicaActive::ReplicaActive(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering ReplicaActive");
-}
-
-
-boost::statechart::result ReplicaActive::react(const Activate& actevt)
-{
- auto& pg = context<Machine>().pg;
- logger().info("In ReplicaActive, about to call activate");
- pg.activate(actevt.activation_epoch);
- logger().info("Activate Finished");
- return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const MInfoRec& infoevt)
-{
- return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const MLogRec& logevt)
-{
- return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const ActMap&)
-{
- auto& pg = context<Machine>().pg;
- if (pg.should_send_notify()) {
- context<Machine>().send_notify(pg.get_primary(),
- pg.get_notify(pg.get_osdmap_epoch()),
- pg.get_past_intervals());
- };
- return discard_event();
-}
-
-boost::statechart::result ReplicaActive::react(const MQuery& query)
-{
- auto& pg = context<Machine>().pg;
- context<Machine>().send_notify(query.from,
- pg.get_notify(query.query_epoch),
- pg.get_past_intervals());
- return discard_event();
-}
-
-void ReplicaActive::exit()
-{}
-
-/*---RepNotRecovering----*/
-RepNotRecovering::RepNotRecovering(my_context ctx)
- : my_base(ctx)
-{}
-
-void RepNotRecovering::exit()
-{}
-
-/*-------Stray---*/
-Stray::Stray(my_context ctx)
- : my_base(ctx)
-{
- auto& pg = context<Machine>().pg;
- assert(!pg.is_primary());
- logger().info("{}, Entering Stray", pg);
-}
-
-boost::statechart::result Stray::react(const MLogRec& logevt)
-{
- auto& pg = context<Machine>().pg;
- logger().info("{} Stray <MLogRec>", pg);
- MOSDPGLog* msg = logevt.msg.get();
- logger().info("got info+log from osd.{} {} {}",
- logevt.from, msg->info, msg->log);
- if (msg->info.last_backfill == hobject_t()) {
- // todo: restart backfill
- } else {
- // todo: merge log
- }
- post_event(Activate{logevt.msg->info.last_epoch_started});
- return transit<ReplicaActive>();
-}
-
-boost::statechart::result Stray::react(const MInfoRec& infoevt)
-{
- auto& pg = context<Machine>().pg;
- logger().info("{} Stray <MInfoRec>", pg);
- logger().info("got info from osd.{} {}",
- infoevt.from, infoevt.info);
- // todo
- post_event(Activate{infoevt.info.last_epoch_started});
- return transit<ReplicaActive>();
-}
-
-boost::statechart::result Stray::react(const MQuery& query)
-{
- auto& pg = context<Machine>().pg;
- logger().info("{} Stray <MQuery>", pg);
- pg.reply_pg_query(query, context<Machine>().get_context());
- return discard_event();
-}
-
-boost::statechart::result Stray::react(const ActMap&)
-{
- auto& pg = context<Machine>().pg;
- logger().info("{} Stray <ActMap>", pg);
- if (pg.should_send_notify()) {
- context<Machine>().send_notify(pg.get_primary(),
- pg.get_notify(pg.get_osdmap_epoch()),
- pg.get_past_intervals());
- }
- return discard_event();
-}
-
-void Stray::exit()
-{
- logger().info("Leaving Stray");
-}
-
-/*------Down--------*/
-Down::Down(my_context ctx)
- : my_base(ctx)
-{}
-
-void Down::exit()
-{
-}
-
-boost::statechart::result Down::react(const MNotifyRec& infoevt)
-{
- // todo
- return discard_event();
-}
-
-/*------GetMissing--------*/
-GetMissing::GetMissing(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering GetMissing");
- auto& pg = context<Machine>().pg;
- if (pg.get_need_up_thru() != 0) {
- logger().info("still need up_thru update before going active");
- post_event(NeedUpThru{});
- } else {
- // all good!
- post_event(Activate{pg.get_osdmap_epoch()});
- }
-}
-
-boost::statechart::result GetMissing::react(const MLogRec& logevt)
-{
- logger().info("GetMissing <MLogRec>");
- auto& pg = context<Machine>().pg;
- pg.proc_replica_log(logevt.from,
- logevt.msg->info,
- logevt.msg->log,
- logevt.msg->missing);
- if (pg.get_need_up_thru() != 0) {
- logger().info(" still need up_thru update before going active");
- post_event(NeedUpThru{});
- } else {
- logger().info("Got last missing, don't need missing posting Activate");
- post_event(Activate{pg.get_osdmap_epoch()});
- }
- return discard_event();
-}
-
-void GetMissing::exit()
-{
- logger().info("Leaving GetMissing");
-}
-
-/*------WaitUpThru--------*/
-WaitUpThru::WaitUpThru(my_context ctx)
- : my_base(ctx)
-{
- logger().info("Entering WaitUpThru");
-}
-
-boost::statechart::result WaitUpThru::react(const ActMap& am)
-{
- logger().info("WaitUpThru <ActMap>");
- auto& pg = context<Machine>().pg;
- if (!pg.get_need_up_thru()) {
- logger().info("WaitUpThru: no need up thru!");
- post_event(Activate{pg.get_osdmap_epoch()});
- }
- return forward_event();
-}
-
-boost::statechart::result WaitUpThru::react(const MLogRec& logevt)
-{
- logger().info("WaitUpThru: <MLogRec>");
- auto& pg = context<Machine>().pg;
- pg.proc_replica_log(logevt.from,
- logevt.msg->info,
- logevt.msg->log,
- logevt.msg->missing);
- return discard_event();
-}
-
-void WaitUpThru::exit()
-{
- logger().info("Leaving WaitUpThru");
-}
-
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <boost/statechart/custom_reaction.hpp>
-#include <boost/statechart/event.hpp>
-#include <boost/statechart/event_base.hpp>
-#include <boost/statechart/simple_state.hpp>
-#include <boost/statechart/state.hpp>
-#include <boost/statechart/transition.hpp>
-
-#include "recovery_machine.h"
-#include "recovery_events.h"
-
-// Initial
-// Reset
-// Start
-// Started
-// Primary
-// WaitActingChange
-// Peering
-// GetInfo
-// GetLog
-// GetMissing
-// WaitUpThru
-// Incomplete
-// Active
-// Activating
-// Clean
-// Recovered
-// Backfilling
-// WaitRemoteBackfillReserved
-// WaitLocalBackfillReserved
-// NotBackfilling
-// NotRecovering
-// Recovering
-// WaitRemoteRecoveryReserved
-// WaitLocalRecoveryReserved
-// ReplicaActive
-// RepNotRecovering
-// RepRecovering
-// RepWaitBackfillReserved
-// RepWaitRecoveryReserved
-// Stray
-// ToDelete
-// WaitDeleteReserved
-// Deleting
-// Crashed
-
-namespace recovery {
-
-struct Crashed : boost::statechart::state<Crashed, Machine> {
- explicit Crashed(my_context ctx);
-};
-
-struct Reset;
-
-struct Initial : boost::statechart::state<Initial, Machine> {
- explicit Initial(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::transition<Initialize, Reset>,
- boost::statechart::custom_reaction<NullEvt>,
- boost::statechart::transition<boost::statechart::event_base, Crashed>
- >;
-
- boost::statechart::result react(const MNotifyRec&);
- boost::statechart::result react(const MInfoRec&);
- boost::statechart::result react(const MLogRec&);
- boost::statechart::result react(const boost::statechart::event_base&) {
- return discard_event();
- }
-};
-
-struct Reset : boost::statechart::state<Reset, Machine> {
- explicit Reset(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<AdvMap>,
- boost::statechart::custom_reaction<ActMap>,
- boost::statechart::custom_reaction<NullEvt>,
- boost::statechart::transition<boost::statechart::event_base, Crashed>
- >;
- boost::statechart::result react(const AdvMap&);
- boost::statechart::result react(const ActMap&);
- boost::statechart::result react(const boost::statechart::event_base&) {
- return discard_event();
- }
-};
-
-struct Start;
-
-struct Started : boost::statechart::state<Started, Machine, Start> {
- explicit Started(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<AdvMap>,
- // ignored
- boost::statechart::custom_reaction<NullEvt>,
- // crash
- boost::statechart::transition<boost::statechart::event_base, Crashed>
- >;
- boost::statechart::result react(const AdvMap&);
- boost::statechart::result react(const boost::statechart::event_base&) {
- return discard_event();
- }
-};
-
-struct Primary;
-struct Stray;
-
-struct MakePrimary : boost::statechart::event<MakePrimary> {};
-struct MakeStray : boost::statechart::event<MakeStray> {};
-
-struct Start : boost::statechart::state<Start, Started> {
- explicit Start(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::transition<MakePrimary, Primary>,
- boost::statechart::transition<MakeStray, Stray>
- >;
-};
-
-struct Peering;
-struct WaitActingChange;
-
-struct Primary : boost::statechart::state<Primary, Started, Peering> {
- explicit Primary(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<ActMap>,
- boost::statechart::custom_reaction<MNotifyRec>
- >;
- boost::statechart::result react(const ActMap&);
- boost::statechart::result react(const MNotifyRec&);
-};
-
-struct GetInfo;
-struct Active;
-
-struct Peering : boost::statechart::state<Peering, Primary, GetInfo> {
- PastIntervals::PriorSet prior_set;
- /// need osd_find_best_info_ignore_history_les
- bool history_les_bound = false;
-
- explicit Peering(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::transition<Activate, Active>,
- boost::statechart::custom_reaction<AdvMap>
- >;
- boost::statechart::result react(const AdvMap &advmap);
-};
-
-struct Activating;
-
-struct AllReplicasActivated : boost::statechart::event<AllReplicasActivated> {};
-
-struct Active : boost::statechart::state<Active, Primary, Activating> {
- explicit Active(my_context ctx);
- void exit();
-
- bool all_replicas_activated = false;
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction< ActMap >,
- boost::statechart::custom_reaction< AdvMap >,
- boost::statechart::custom_reaction< MInfoRec >,
- boost::statechart::custom_reaction< MNotifyRec >,
- boost::statechart::custom_reaction< MLogRec >,
- boost::statechart::custom_reaction< AllReplicasActivated >
- >;
- boost::statechart::result react(const ActMap&);
- boost::statechart::result react(const AdvMap&);
- boost::statechart::result react(const MInfoRec& infoevt);
- boost::statechart::result react(const MNotifyRec& notevt);
- boost::statechart::result react(const MLogRec& logevt);
- boost::statechart::result react(const AllReplicasActivated&);
-};
-
-struct GoClean : boost::statechart::event<GoClean> {};
-
-struct Clean : boost::statechart::state<Clean, Active> {
- explicit Clean(my_context ctx);
- void exit();
- boost::statechart::result react(const boost::statechart::event_base&) {
- return discard_event();
- }
-};
-
-struct Recovered : boost::statechart::state<Recovered, Active> {
- using reactions = boost::mpl::list<
- boost::statechart::transition<GoClean, Clean>,
- boost::statechart::custom_reaction<AllReplicasActivated>
- >;
- explicit Recovered(my_context ctx);
- void exit();
- boost::statechart::result react(const AllReplicasActivated&);
-};
-
-struct AllReplicasRecovered : boost::statechart::event<AllReplicasRecovered>
-{};
-
-struct Activating : boost::statechart::state<Activating, Active> {
- using reactions = boost::mpl::list <
- boost::statechart::transition< AllReplicasRecovered, Recovered >
- >;
- explicit Activating(my_context ctx);
- void exit();
-};
-
-struct RepNotRecovering;
-
-struct ReplicaActive : boost::statechart::state<ReplicaActive, Started, RepNotRecovering> {
- explicit ReplicaActive(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<ActMap>,
- boost::statechart::custom_reaction<MQuery>,
- boost::statechart::custom_reaction<MInfoRec>,
- boost::statechart::custom_reaction<MLogRec>,
- boost::statechart::custom_reaction<Activate>
- >;
- boost::statechart::result react(const MInfoRec& infoevt);
- boost::statechart::result react(const MLogRec& logevt);
- boost::statechart::result react(const ActMap&);
- boost::statechart::result react(const MQuery&);
- boost::statechart::result react(const Activate&);
-};
-
-struct RepNotRecovering : boost::statechart::state<RepNotRecovering, ReplicaActive> {
- explicit RepNotRecovering(my_context ctx);
- void exit();
-};
-
-struct Stray : boost::statechart::state<Stray, Started> {
- explicit Stray(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<MQuery>,
- boost::statechart::custom_reaction<MLogRec>,
- boost::statechart::custom_reaction<MInfoRec>,
- boost::statechart::custom_reaction<ActMap>
- >;
- boost::statechart::result react(const MQuery& query);
- boost::statechart::result react(const MLogRec& logevt);
- boost::statechart::result react(const MInfoRec& infoevt);
- boost::statechart::result react(const ActMap&);
-};
-
-struct GetLog;
-struct Down;
-
-struct GotInfo : boost::statechart::event<GotInfo> {};
-struct IsDown : boost::statechart::event<IsDown> {};
-
-struct GetInfo : boost::statechart::state<GetInfo, Peering> {
- std::set<pg_shard_t> peer_info_requested;
-
- explicit GetInfo(my_context ctx);
- void exit();
- void get_infos();
-
- using reactions = boost::mpl::list <
- boost::statechart::transition<GotInfo, GetLog>,
- boost::statechart::custom_reaction<MNotifyRec>,
- boost::statechart::transition<IsDown, Down>
- >;
- boost::statechart::result react(const MNotifyRec& infoevt);
-};
-
-struct GotLog : boost::statechart::event<GotLog> {};
-
-struct GetLog : boost::statechart::state<GetLog, Peering> {
- pg_shard_t auth_log_shard;
- boost::intrusive_ptr<MOSDPGLog> msg;
-
- explicit GetLog(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<MLogRec>,
- boost::statechart::custom_reaction<GotLog>,
- boost::statechart::custom_reaction<AdvMap>
- >;
- boost::statechart::result react(const AdvMap&);
- boost::statechart::result react(const MLogRec& logevt);
- boost::statechart::result react(const GotLog&);
-};
-
-struct NeedUpThru : boost::statechart::event<NeedUpThru> {};
-struct WaitUpThru;
-
-struct GetMissing : boost::statechart::state<GetMissing, Peering> {
- explicit GetMissing(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<MLogRec>,
- boost::statechart::transition<NeedUpThru, WaitUpThru>
- >;
- boost::statechart::result react(const MLogRec& logevt);
-};
-
-struct WaitUpThru : boost::statechart::state<WaitUpThru, Peering> {
- explicit WaitUpThru(my_context ctx);
- void exit();
-
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<ActMap>,
- boost::statechart::custom_reaction<MLogRec>
- >;
- boost::statechart::result react(const ActMap& am);
- boost::statechart::result react(const MLogRec& logrec);
-};
-
-struct Down : boost::statechart::state<Down, Peering> {
- explicit Down(my_context ctx);
- using reactions = boost::mpl::list <
- boost::statechart::custom_reaction<MNotifyRec>
- >;
- boost::statechart::result react(const MNotifyRec& infoevt);
- void exit();
-};
-
-}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/shard_services.h"
+
+#include "osd/osd_perf_counters.h"
+#include "osd/PeeringState.h"
+#include "crimson/osd/osdmap_service.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/mgr/client.h"
+#include "crimson/mon/MonClient.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/net/Connection.h"
+#include "crimson/os/cyan_store.h"
+#include "messages/MOSDPGTemp.h"
+#include "messages/MOSDPGCreated.h"
+#include "messages/MOSDPGNotify.h"
+#include "messages/MOSDPGInfo.h"
+#include "messages/MOSDPGQuery.h"
+
+namespace {
+ seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace ceph::osd {
+
+ShardServices::ShardServices(
+ ceph::net::Messenger &cluster_msgr,
+ ceph::net::Messenger &public_msgr,
+ ceph::mon::Client &monc,
+ ceph::mgr::Client &mgrc,
+ ceph::os::CyanStore &store)
+ : cluster_msgr(cluster_msgr),
+ public_msgr(public_msgr),
+ monc(monc),
+ mgrc(mgrc),
+ store(store) {
+ perf = build_osd_logger(&cct);
+ cct.get_perfcounters_collection()->add(perf);
+
+ recoverystate_perf = build_recoverystate_perf(&cct);
+ cct.get_perfcounters_collection()->add(recoverystate_perf);
+}
+
+seastar::future<> ShardServices::send_to_osd(
+ int peer, Ref<Message> m, epoch_t from_epoch) {
+ if (osdmap->is_down(peer) || osdmap->get_info(peer).up_from > from_epoch) {
+ return seastar::now();
+ } else {
+ return cluster_msgr.connect(osdmap->get_cluster_addrs(peer).front(),
+ CEPH_ENTITY_TYPE_OSD)
+ .then([m, this] (auto xconn) {
+ return (*xconn)->send(m);
+ });
+ }
+}
+
+seastar::future<> ShardServices::dispatch_context_transaction(
+ ceph::os::CollectionRef col, PeeringCtx &ctx) {
+ auto ret = store.do_transaction(
+ col,
+ std::move(ctx.transaction));
+ ctx.reset_transaction();
+ return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context_messages(
+ PeeringCtx &ctx)
+{
+ auto ret = seastar::when_all_succeed(
+ seastar::parallel_for_each(std::move(ctx.notify_list),
+ [this](auto& osd_notifies) {
+ auto& [peer, notifies] = osd_notifies;
+ auto m = make_message<MOSDPGNotify>(osdmap->get_epoch(),
+ std::move(notifies));
+ logger().debug("dispatch_context_messages sending notify to {}", peer);
+ return send_to_osd(peer, m, osdmap->get_epoch());
+ }),
+ seastar::parallel_for_each(std::move(ctx.query_map),
+ [this](auto& osd_queries) {
+ auto& [peer, queries] = osd_queries;
+ auto m = make_message<MOSDPGQuery>(osdmap->get_epoch(),
+ std::move(queries));
+ logger().debug("dispatch_context_messages sending query to {}", peer);
+ return send_to_osd(peer, m, osdmap->get_epoch());
+ }),
+ seastar::parallel_for_each(std::move(ctx.info_map),
+ [this](auto& osd_infos) {
+ auto& [peer, infos] = osd_infos;
+ auto m = make_message<MOSDPGInfo>(osdmap->get_epoch(),
+ std::move(infos));
+ logger().debug("dispatch_context_messages sending info to {}", peer);
+ return send_to_osd(peer, m, osdmap->get_epoch());
+ }));
+ ctx.notify_list.clear();
+ ctx.query_map.clear();
+ ctx.info_map.clear();
+ return ret;
+}
+
+seastar::future<> ShardServices::dispatch_context(
+ ceph::os::CollectionRef col,
+ PeeringCtx &&ctx)
+{
+ ceph_assert(col || ctx.transaction.empty());
+ return seastar::do_with(
+ PeeringCtx{ctx},
+ [this, col](auto& todo) {
+ return seastar::when_all_succeed(
+ dispatch_context_messages(todo),
+ col ? dispatch_context_transaction(col, todo) : seastar::now());
+ });
+}
+
+void ShardServices::queue_want_pg_temp(pg_t pgid,
+ const vector<int>& want,
+ bool forced)
+{
+ auto p = pg_temp_pending.find(pgid);
+ if (p == pg_temp_pending.end() ||
+ p->second.acting != want ||
+ forced) {
+ pg_temp_wanted[pgid] = {want, forced};
+ }
+}
+
+void ShardServices::remove_want_pg_temp(pg_t pgid)
+{
+ pg_temp_wanted.erase(pgid);
+ pg_temp_pending.erase(pgid);
+}
+
+void ShardServices::_sent_pg_temp()
+{
+#ifdef HAVE_STDLIB_MAP_SPLICING
+ pg_temp_pending.merge(pg_temp_wanted);
+#else
+ pg_temp_pending.insert(make_move_iterator(begin(pg_temp_wanted)),
+ make_move_iterator(end(pg_temp_wanted)));
+#endif
+ pg_temp_wanted.clear();
+}
+
+void ShardServices::requeue_pg_temp()
+{
+ unsigned old_wanted = pg_temp_wanted.size();
+ unsigned old_pending = pg_temp_pending.size();
+ _sent_pg_temp();
+ pg_temp_wanted.swap(pg_temp_pending);
+ logger().debug(
+ "{}: {} + {} -> {}",
+ __func__ ,
+ old_wanted,
+ old_pending,
+ pg_temp_wanted.size());
+}
+
+std::ostream& operator<<(
+ std::ostream& out,
+ const ShardServices::pg_temp_t& pg_temp)
+{
+ out << pg_temp.acting;
+ if (pg_temp.forced) {
+ out << " (forced)";
+ }
+ return out;
+}
+
+void ShardServices::send_pg_temp()
+{
+ if (pg_temp_wanted.empty())
+ return;
+ logger().debug("{}: {}", __func__, pg_temp_wanted);
+ boost::intrusive_ptr<MOSDPGTemp> ms[2] = {nullptr, nullptr};
+ for (auto& [pgid, pg_temp] : pg_temp_wanted) {
+ auto m = ms[pg_temp.forced];
+ if (!m) {
+ m = make_message<MOSDPGTemp>(osdmap->get_epoch());
+ m->forced = pg_temp.forced;
+ }
+ m->pg_temp.emplace(pgid, pg_temp.acting);
+ }
+ for (auto &m : ms) {
+ if (m) {
+ monc.send_message(m);
+ }
+ }
+ _sent_pg_temp();
+}
+
+seastar::future<> ShardServices::send_pg_created(pg_t pgid)
+{
+ logger().debug(__func__);
+ auto o = get_osdmap();
+ ceph_assert(o->require_osd_release >= ceph_release_t::luminous);
+ pg_created.insert(pgid);
+ return monc.send_message(new MOSDPGCreated(pgid));
+}
+
+seastar::future<> ShardServices::send_pg_created()
+{
+ logger().debug(__func__);
+ auto o = get_osdmap();
+ ceph_assert(o->require_osd_release >= ceph_release_t::luminous);
+ return seastar::parallel_for_each(pg_created,
+ [this](auto &pgid) {
+ return monc.send_message(new MOSDPGCreated(pgid));
+ });
+}
+
+void ShardServices::prune_pg_created()
+{
+ logger().debug(__func__);
+ auto o = get_osdmap();
+ auto i = pg_created.begin();
+ while (i != pg_created.end()) {
+ auto p = o->get_pg_pool(i->pool());
+ if (!p || !p->has_flag(pg_pool_t::FLAG_CREATING)) {
+ logger().debug("{} pruning {}", __func__, *i);
+ i = pg_created.erase(i);
+ } else {
+ logger().debug(" keeping {}", __func__, *i);
+ ++i;
+ }
+ }
+}
+
+};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/future.hh>
+
+#include "msg/MessageRef.h"
+#include "crimson/os/cyan_collection.h"
+
+namespace ceph::net {
+ class Messenger;
+}
+
+namespace ceph::mgr {
+ class Client;
+}
+
+namespace ceph::mon {
+ class Client;
+}
+
+namespace ceph::os {
+ class CyanStore;
+}
+
+class PerfCounters;
+class OSDMap;
+class PeeringCtx;
+
+namespace ceph::osd {
+
+/**
+ * Represents services available to each PG
+ */
+class ShardServices {
+ using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+ ceph::net::Messenger &cluster_msgr;
+ ceph::net::Messenger &public_msgr;
+ ceph::mon::Client &monc;
+ ceph::mgr::Client &mgrc;
+ ceph::os::CyanStore &store;
+
+ CephContext cct;
+
+ PerfCounters *perf = nullptr;
+ PerfCounters *recoverystate_perf = nullptr;
+
+public:
+ ShardServices(
+ ceph::net::Messenger &cluster_msgr,
+ ceph::net::Messenger &public_msgr,
+ ceph::mon::Client &monc,
+ ceph::mgr::Client &mgrc,
+ ceph::os::CyanStore &store);
+
+ seastar::future<> send_to_osd(
+ int peer,
+ MessageRef m,
+ epoch_t from_epoch);
+
+ ceph::os::CyanStore &get_store() {
+ return store;
+ }
+
+ CephContext *get_cct() {
+ return &cct;
+ }
+
+ // Loggers
+ PerfCounters &get_recoverystate_perf_logger() {
+ return *recoverystate_perf;
+ }
+ PerfCounters &get_perf_logger() {
+ return *perf;
+ }
+
+ /// Dispatch and reset ctx transaction
+ seastar::future<> dispatch_context_transaction(
+ ceph::os::CollectionRef col, PeeringCtx &ctx);
+
+ /// Dispatch and reset ctx messages
+ seastar::future<> dispatch_context_messages(
+ PeeringCtx &ctx);
+
+ /// Dispatch ctx and dispose of context
+ seastar::future<> dispatch_context(
+ ceph::os::CollectionRef col,
+ PeeringCtx &&ctx);
+
+ /// Dispatch ctx and dispose of ctx, transaction must be empty
+ seastar::future<> dispatch_context(
+ PeeringCtx &&ctx) {
+ return dispatch_context({}, std::move(ctx));
+ }
+
+ // PG Temp State
+private:
+ // TODO: hook into map processing and some kind of heartbeat/peering
+ // message processing
+ struct pg_temp_t {
+ std::vector<int> acting;
+ bool forced = false;
+ };
+ map<pg_t, pg_temp_t> pg_temp_wanted;
+ map<pg_t, pg_temp_t> pg_temp_pending;
+ void _sent_pg_temp();
+ friend std::ostream& operator<<(std::ostream&, const pg_temp_t&);
+public:
+ void queue_want_pg_temp(pg_t pgid, const vector<int>& want,
+ bool forced = false);
+ void remove_want_pg_temp(pg_t pgid);
+ void requeue_pg_temp();
+ void send_pg_temp();
+
+ // Shard-local OSDMap
+private:
+ cached_map_t osdmap;
+public:
+ void update_map(cached_map_t new_osdmap) {
+ osdmap = std::move(new_osdmap);
+ }
+ cached_map_t &get_osdmap() {
+ return osdmap;
+ }
+
+ // PG Created State
+private:
+ set<pg_t> pg_created;
+public:
+ seastar::future<> send_pg_created(pg_t pgid);
+ seastar::future<> send_pg_created();
+ void prune_pg_created();
+};
+
+
+}