#include <boost/iterator/counting_iterator.hpp>
#include <boost/range/join.hpp>
-#include <boost/smart_ptr/make_local_shared.hpp>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include <seastar/core/timer.hh>
mgrc{new crimson::mgr::Client{*public_msgr, *this}},
store{store},
pg_shard_manager{
- static_cast<OSDMapService&>(*this), whoami, *cluster_msgr,
+ whoami, *cluster_msgr,
*public_msgr, *monc, *mgrc, store},
shard_services{pg_shard_manager.get_shard_services()},
heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
log_client(cluster_msgr.get(), LogClient::NO_FLAGS),
clog(log_client.create_channel())
{
- osdmaps[0] = boost::make_local_shared<OSDMap>();
for (auto msgr : {std::ref(cluster_msgr), std::ref(public_msgr),
std::ref(hb_front_msgr), std::ref(hb_back_msgr)}) {
msgr.get()->set_auth_server(monc.get());
return store.open_collection(
coll_t::meta()
).then([this](auto ch) {
- meta_coll = make_unique<OSDMeta>(ch, store);
+ pg_shard_manager.init_meta_coll(ch, store);
return seastar::now();
});
}
return store.open_collection(coll_t::meta()).then([this] (auto ch) {
if (!ch) {
return store.create_new_collection(coll_t::meta()).then([this] (auto ch) {
- meta_coll = make_unique<OSDMeta>(ch, store);
+ pg_shard_manager.init_meta_coll(ch, store);
return seastar::now();
});
} else {
- meta_coll = make_unique<OSDMeta>(ch, store);
+ pg_shard_manager.init_meta_coll(ch, store);
return seastar::now();
}
});
seastar::future<> OSD::_write_superblock()
{
- return meta_coll->load_superblock().safe_then([this](OSDSuperblock&& sb) {
+ return pg_shard_manager.get_meta_coll().load_superblock(
+ ).safe_then([this](OSDSuperblock&& sb) {
if (sb.cluster_fsid != superblock.cluster_fsid) {
logger().error("provided cluster fsid {} != superblock's {}",
sb.cluster_fsid, superblock.cluster_fsid);
superblock.cluster_fsid,
superblock.osd_fsid);
ceph::os::Transaction t;
- meta_coll->create(t);
- meta_coll->store_superblock(t, superblock);
+ pg_shard_manager.get_meta_coll().create(t);
+ pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
logger().debug("OSD::_write_superblock: do_transaction...");
- return store.do_transaction(meta_coll->collection(), std::move(t));
+ return store.do_transaction(
+ pg_shard_manager.get_meta_coll().collection(),
+ std::move(t));
}),
crimson::ct_error::assert_all("_write_superbock error")
);
}).then([this] {
return open_meta_coll();
}).then([this] {
- return meta_coll->load_superblock(
+ return pg_shard_manager.get_meta_coll().load_superblock(
).handle_error(
crimson::ct_error::assert_all("open_meta_coll error")
);
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
- return get_map(superblock.current_epoch);
- }).then([this](cached_map_t&& map) {
+ return pg_shard_manager.get_map(superblock.current_epoch);
+ }).then([this](OSDMapService::cached_map_t&& map) {
pg_shard_manager.update_map(map);
pg_shard_manager.got_map(map->get_epoch());
osdmap = std::move(map);
});
}
-seastar::future<Ref<PG>> OSD::make_pg(cached_map_t create_map,
+seastar::future<Ref<PG>> OSD::make_pg(OSDMapService::cached_map_t create_map,
spg_t pgid,
bool do_create)
{
std::move(ec_profile)));
} else {
// pool was deleted; grab final pg_pool_t off disk.
- return meta_coll->load_final_pool_info(pgid.pool());
+ return pg_shard_manager.get_meta_coll().load_final_pool_info(pgid.pool());
}
};
auto get_collection = [pgid, do_create, this] {
return seastar::do_with(PGMeta(store, pgid), [](auto& pg_meta) {
return pg_meta.get_epoch();
}).then([this](epoch_t e) {
- return get_map(e);
+ return pg_shard_manager.get_map(e);
}).then([pgid, this] (auto&& create_map) {
return make_pg(std::move(create_map), pgid, false);
}).then([this](Ref<PG> pg) {
void OSD::update_stats()
{
osd_stat_seq++;
- osd_stat.up_from = get_up_epoch();
+ osd_stat.up_from = pg_shard_manager.get_up_epoch();
osd_stat.hb_peers = heartbeat->get_peers();
- osd_stat.seq = (static_cast<uint64_t>(get_up_epoch()) << 32) | osd_stat_seq;
+ osd_stat.seq = (
+ static_cast<uint64_t>(pg_shard_manager.get_up_epoch()) << 32
+ ) | osd_stat_seq;
gate.dispatch_in_background("statfs", *this, [this] {
(void) store.stat().then([this](store_statfs_t&& st) {
osd_stat.statfs = st;
return osd_stat.seq;
}
-OSD::cached_map_t OSD::get_map() const
-{
- return osdmap;
-}
-
-seastar::future<OSD::cached_map_t> OSD::get_map(epoch_t e)
-{
- // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
- if (auto found = osdmaps.find(e); found) {
- return seastar::make_ready_future<cached_map_t>(std::move(found));
- } else {
- return load_map(e).then([e, this](unique_ptr<OSDMap> osdmap) {
- return seastar::make_ready_future<cached_map_t>(
- osdmaps.insert(e, std::move(osdmap)));
- });
- }
-}
-
-void OSD::store_map_bl(ceph::os::Transaction& t,
- epoch_t e, bufferlist&& bl)
-{
- meta_coll->store_map(t, e, bl);
- map_bl_cache.insert(e, std::move(bl));
-}
-
-seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
-{
- if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
- return seastar::make_ready_future<bufferlist>(*found);
- } else {
- return meta_coll->load_map(e);
- }
-}
-
-seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
- epoch_t first,
- epoch_t last)
-{
- return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
- boost::make_counting_iterator<epoch_t>(last + 1),
- [this](epoch_t e) {
- return load_map_bl(e).then([e](auto&& bl) {
- return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
- std::make_pair(e, std::move(bl)));
- });
- },
- std::map<epoch_t, bufferlist>{},
- [](auto&& bls, auto&& epoch_bl) {
- bls.emplace(std::move(epoch_bl));
- return std::move(bls);
- });
-}
-
-seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
-{
- auto o = std::make_unique<OSDMap>();
- if (e > 0) {
- return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
- o->decode(bl);
- return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
- });
- } else {
- return seastar::make_ready_future<unique_ptr<OSDMap>>(std::move(o));
- }
-}
-
-seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
- epoch_t start, Ref<MOSDMap> m)
-{
- return seastar::do_for_each(boost::make_counting_iterator(start),
- boost::make_counting_iterator(m->get_last() + 1),
- [&t, m, this](epoch_t e) {
- if (auto p = m->maps.find(e); p != m->maps.end()) {
- auto o = std::make_unique<OSDMap>();
- o->decode(p->second);
- logger().info("store_maps osdmap.{}", e);
- store_map_bl(t, e, std::move(std::move(p->second)));
- osdmaps.insert(e, std::move(o));
- return seastar::now();
- } else if (auto p = m->incremental_maps.find(e);
- p != m->incremental_maps.end()) {
- return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
- OSDMap::Incremental inc;
- auto i = bl.cbegin();
- inc.decode(i);
- o->apply_incremental(inc);
- bufferlist fbl;
- o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
- store_map_bl(t, e, std::move(fbl));
- osdmaps.insert(e, std::move(o));
- return seastar::now();
- });
- } else {
- logger().error("MOSDMap lied about what maps it had?");
- return seastar::now();
- }
- });
-}
-
bool OSD::require_mon_peer(crimson::net::Connection *conn, Ref<Message> m)
{
if (!conn->peer_is_mon()) {
return seastar::do_with(
std::move(info),
[this](auto &info) -> seastar::future<Ref<PG>> {
- return get_map(info->epoch).then(
- [&info, this](cached_map_t startmap) ->
- seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
+ return pg_shard_manager.get_map(info->epoch).then(
+ [&info, this](OSDMapService::cached_map_t startmap) ->
+ seastar::future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>> {
const spg_t &pgid = info->pgid;
if (info->by_mon) {
int64_t pool_id = pgid.pgid.pool();
"{} ignoring pgid {}, pool dne",
__func__,
pgid);
- return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+ return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
std::make_tuple(Ref<PG>(), startmap));
}
ceph_assert(osdmap->require_osd_release >= ceph_release_t::octopus);
"{} dropping {} create, pool does not have CREATING flag set",
__func__,
pgid);
- return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+ return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
std::make_tuple(Ref<PG>(), startmap));
}
}
return make_pg(startmap, pgid, true).then(
[startmap=std::move(startmap)](auto pg) mutable {
- return seastar::make_ready_future<std::tuple<Ref<PG>, cached_map_t>>(
+ return seastar::make_ready_future<std::tuple<Ref<PG>, OSDMapService::cached_map_t>>(
std::make_tuple(std::move(pg), std::move(startmap)));
});
}).then([this, &info](auto&& ret) ->
return seastar::do_with(ceph::os::Transaction{},
[=](auto& t) {
- return store_maps(t, start, m).then([=, &t] {
+ return pg_shard_manager.store_maps(t, start, m).then([=, &t] {
// even if this map isn't from a mon, we may have satisfied our subscription
monc->sub_got("osdmap", last);
if (!superblock.oldest_map || skip_maps) {
superblock.mounted = boot_epoch;
superblock.clean_thru = last;
}
- meta_coll->store_superblock(t, superblock);
+ pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
logger().debug("OSD::handle_osd_map: do_transaction...");
- return store.do_transaction(meta_coll->collection(), std::move(t));
+ return store.do_transaction(
+ pg_shard_manager.get_meta_coll().collection(),
+ std::move(t));
});
}).then([=] {
// TODO: write to superblock and commit the transaction
return seastar::do_for_each(boost::make_counting_iterator(first),
boost::make_counting_iterator(last + 1),
[this](epoch_t cur) {
- return get_map(cur).then([this](cached_map_t&& o) {
+ return pg_shard_manager.get_map(cur).then([this](OSDMapService::cached_map_t&& o) {
osdmap = std::move(o);
pg_shard_manager.update_map(osdmap);
- if (up_epoch == 0 &&
+ if (pg_shard_manager.get_up_epoch() == 0 &&
osdmap->is_up(whoami) &&
osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
- up_epoch = osdmap->get_epoch();
+ pg_shard_manager.set_up_epoch(osdmap->get_epoch());
if (!boot_epoch) {
boot_epoch = osdmap->get_epoch();
}
epoch_t first)
{
if (first >= superblock.oldest_map) {
- return load_map_bls(first, superblock.newest_map)
+ return pg_shard_manager.load_map_bls(first, superblock.newest_map)
.then([this, conn, first](auto&& bls) {
auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
osdmap->get_encoding_features());
return conn->send(std::move(m));
});
} else {
- return load_map_bl(osdmap->get_epoch())
+ return pg_shard_manager.load_map_bl(osdmap->get_epoch())
.then([this, conn](auto&& bl) mutable {
auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
osdmap->get_encoding_features());
{
beacon_timer.cancel();
tick_timer.cancel();
- up_epoch = 0;
+ pg_shard_manager.set_up_epoch(0);
bind_epoch = osdmap->get_epoch();
// TODO: promote to shutdown if being marked down for multiple times
// rebind messengers
#include "crimson/common/gated.h"
#include "crimson/admin/admin_socket.h"
#include "crimson/common/simple_lru.h"
-#include "crimson/common/shared_lru.h"
#include "crimson/mgr/client.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/osd/osdmap_service.h"
class PG;
class OSD final : public crimson::net::Dispatcher,
- private OSDMapService,
private crimson::common::AuthHandler,
private crimson::mgr::WithStats {
const int whoami;
std::unique_ptr<crimson::mon::Client> monc;
std::unique_ptr<crimson::mgr::Client> mgrc;
- SharedLRU<epoch_t, OSDMap> osdmaps;
- SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
- cached_map_t osdmap;
// TODO: use a wrapper for ObjectStore
+ OSDMapService::cached_map_t osdmap;
crimson::os::FuturizedStore& store;
- std::unique_ptr<OSDMeta> meta_coll;
/// _first_ epoch we were marked up (after this process started)
epoch_t boot_epoch = 0;
- /// _most_recent_ epoch we were marked up
- epoch_t up_epoch = 0;
//< epoch we last did a bind to new ip:ports
epoch_t bind_epoch = 0;
//< since when there is no more pending pg creates from mon
seastar::future<> _send_boot();
seastar::future<> _add_me_to_crush();
- seastar::future<Ref<PG>> make_pg(cached_map_t create_map,
+ seastar::future<Ref<PG>> make_pg(OSDMapService::cached_map_t create_map,
spg_t pgid,
bool do_create);
seastar::future<Ref<PG>> load_pg(spg_t pgid);
seastar::future<> load_pgs();
- // OSDMapService methods
- epoch_t get_up_epoch() const final {
- return up_epoch;
- }
- seastar::future<cached_map_t> get_map(epoch_t e) final;
- cached_map_t get_map() const final;
- seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
- seastar::future<bufferlist> load_map_bl(epoch_t e);
- seastar::future<std::map<epoch_t, bufferlist>>
- load_map_bls(epoch_t first, epoch_t last);
- void store_map_bl(ceph::os::Transaction& t,
- epoch_t e, bufferlist&& bl);
- seastar::future<> store_maps(ceph::os::Transaction& t,
- epoch_t start, Ref<MOSDMap> m);
seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
void write_superblock(ceph::os::Transaction& t);
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include <boost/smart_ptr/make_local_shared.hpp>
+
#include "crimson/osd/shard_services.h"
#include "messages/MOSDAlive.h"
+#include "messages/MOSDMap.h"
#include "messages/MOSDPGCreated.h"
#include "messages/MOSDPGTemp.h"
CoreState::CoreState(
int whoami,
- OSDMapService &osdmap_service,
crimson::net::Messenger &cluster_msgr,
crimson::net::Messenger &public_msgr,
crimson::mon::Client &monc,
crimson::mgr::Client &mgrc,
crimson::os::FuturizedStore &store)
: whoami(whoami),
- osdmap_service(osdmap_service),
osdmap_gate("CoreState::osdmap_gate"),
cluster_msgr(cluster_msgr),
public_msgr(public_msgr),
crimson::common::local_conf()->osd_min_recovery_priority)
{
crimson::common::local_conf().add_observer(this);
+ osdmaps[0] = boost::make_local_shared<OSDMap>();
}
seastar::future<> CoreState::send_to_osd(
}
}
+CoreState::cached_map_t CoreState::get_map() const
+{
+ return osdmap;
+}
+
+seastar::future<CoreState::cached_map_t> CoreState::get_map(epoch_t e)
+{
+ // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
+ if (auto found = osdmaps.find(e); found) {
+ return seastar::make_ready_future<cached_map_t>(std::move(found));
+ } else {
+ return load_map(e).then([e, this](std::unique_ptr<OSDMap> osdmap) {
+ return seastar::make_ready_future<cached_map_t>(
+ osdmaps.insert(e, std::move(osdmap)));
+ });
+ }
+}
+
+void CoreState::store_map_bl(ceph::os::Transaction& t,
+ epoch_t e, bufferlist&& bl)
+{
+ meta_coll->store_map(t, e, bl);
+ map_bl_cache.insert(e, std::move(bl));
+}
+
+seastar::future<bufferlist> CoreState::load_map_bl(epoch_t e)
+{
+ if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
+ return seastar::make_ready_future<bufferlist>(*found);
+ } else {
+ return meta_coll->load_map(e);
+ }
+}
+
+seastar::future<std::map<epoch_t, bufferlist>> CoreState::load_map_bls(
+ epoch_t first,
+ epoch_t last)
+{
+ return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
+ boost::make_counting_iterator<epoch_t>(last + 1),
+ [this](epoch_t e) {
+ return load_map_bl(e).then([e](auto&& bl) {
+ return seastar::make_ready_future<std::pair<epoch_t, bufferlist>>(
+ std::make_pair(e, std::move(bl)));
+ });
+ },
+ std::map<epoch_t, bufferlist>{},
+ [](auto&& bls, auto&& epoch_bl) {
+ bls.emplace(std::move(epoch_bl));
+ return std::move(bls);
+ });
+}
+
+seastar::future<std::unique_ptr<OSDMap>> CoreState::load_map(epoch_t e)
+{
+ auto o = std::make_unique<OSDMap>();
+ if (e > 0) {
+ return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
+ o->decode(bl);
+ return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
+ });
+ } else {
+ return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
+ }
+}
+
+seastar::future<> CoreState::store_maps(ceph::os::Transaction& t,
+ epoch_t start, Ref<MOSDMap> m)
+{
+ return seastar::do_for_each(boost::make_counting_iterator(start),
+ boost::make_counting_iterator(m->get_last() + 1),
+ [&t, m, this](epoch_t e) {
+ if (auto p = m->maps.find(e); p != m->maps.end()) {
+ auto o = std::make_unique<OSDMap>();
+ o->decode(p->second);
+ logger().info("store_maps osdmap.{}", e);
+ store_map_bl(t, e, std::move(std::move(p->second)));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
+ } else if (auto p = m->incremental_maps.find(e);
+ p != m->incremental_maps.end()) {
+ return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
+ OSDMap::Incremental inc;
+ auto i = bl.cbegin();
+ inc.decode(i);
+ o->apply_incremental(inc);
+ bufferlist fbl;
+ o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+ store_map_bl(t, e, std::move(fbl));
+ osdmaps.insert(e, std::move(o));
+ return seastar::now();
+ });
+ } else {
+ logger().error("MOSDMap lied about what maps it had?");
+ return seastar::now();
+ }
+ });
+}
+
seastar::future<> ShardServices::dispatch_context_transaction(
crimson::os::CollectionRef col, PeeringCtx &ctx) {
if (ctx.transaction.empty()) {
#pragma once
+#include <memory>
+
#include <boost/intrusive_ptr.hpp>
#include <seastar/core/future.hh>
#include "osd_operation.h"
#include "msg/MessageRef.h"
#include "crimson/common/exception.h"
+#include "crimson/common/shared_lru.h"
#include "crimson/os/futurized_collection.h"
#include "osd/PeeringState.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/osd/osdmap_gate.h"
+#include "crimson/osd/osd_meta.h"
#include "crimson/osd/object_context.h"
#include "crimson/osd/state.h"
#include "common/AsyncReserver.h"
* OSD-wide singleton holding instances that need to be accessible
* from all PGs.
*/
-class CoreState : public md_config_obs_t {
+class CoreState : public md_config_obs_t, public OSDMapService {
friend class ShardServices;
friend class PGShardManager;
CoreState(
int whoami,
- OSDMapService &osdmap_service,
crimson::net::Messenger &cluster_msgr,
crimson::net::Messenger &public_msgr,
crimson::mon::Client &monc,
OSDState osd_state;
- OSDMapService &osdmap_service;
- OSDMapService::cached_map_t osdmap;
- OSDMapService::cached_map_t &get_osdmap() { return osdmap; }
- void update_map(OSDMapService::cached_map_t new_osdmap) {
+ cached_map_t osdmap;
+ cached_map_t &get_osdmap() { return osdmap; }
+ void update_map(cached_map_t new_osdmap) {
osdmap = std::move(new_osdmap);
}
OSD_OSDMapGate osdmap_gate;
return (ceph_tid_t)next_tid++;
}
+ std::unique_ptr<OSDMeta> meta_coll;
+ template <typename... Args>
+ void init_meta_coll(Args&&... args) {
+ meta_coll = std::make_unique<OSDMeta>(std::forward<Args>(args)...);
+ }
+ OSDMeta &get_meta_coll() {
+ assert(meta_coll);
+ return *meta_coll;
+ }
+
// global pg temp state
struct pg_temp_t {
std::vector<int> acting;
void handle_conf_change(
const ConfigProxy& conf,
const std::set <std::string> &changed) final;
+
+ // OSDMapService
+ epoch_t up_epoch = 0;
+ epoch_t get_up_epoch() const final {
+ return up_epoch;
+ }
+ void set_up_epoch(epoch_t e) {
+ up_epoch = e;
+ }
+
+ SharedLRU<epoch_t, OSDMap> osdmaps;
+ SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
+
+ seastar::future<cached_map_t> get_map(epoch_t e) final;
+ cached_map_t get_map() const final;
+ seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
+ seastar::future<bufferlist> load_map_bl(epoch_t e);
+ seastar::future<std::map<epoch_t, bufferlist>>
+ load_map_bls(epoch_t first, epoch_t last);
+ void store_map_bl(ceph::os::Transaction& t,
+ epoch_t e, bufferlist&& bl);
+ seastar::future<> store_maps(ceph::os::Transaction& t,
+ epoch_t start, Ref<MOSDMap> m);
};
#define FORWARD_CONST(FROM_METHOD, TO_METHOD, TARGET) \
// OSDMapService
const OSDMapService &get_osdmap_service() const {
- return core_state.osdmap_service;
+ return core_state;
}
template <typename T, typename... Args>