#include "crimson/os/cyan_object.h"
#include "crimson/os/cyan_store.h"
#include "crimson/os/Transaction.h"
+#include "crimson/osd/osd_meta.h"
namespace {
seastar::logger& logger() {
superblock.whoami = whoami;
superblock.compat_features = get_osd_initial_compat_set();
- bufferlist bl;
- encode(superblock, bl);
-
- auto ch = store->create_new_collection(coll_t::meta());
+ meta_coll = make_unique<OSDMeta>(
+ store->create_new_collection(coll_t::meta()), store.get());
ceph::os::Transaction t;
- t.create_collection(coll_t::meta(), 0);
- t.write(coll_t::meta(), OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
- return store->do_transaction(ch, std::move(t));
+ meta_coll->create(t);
+ meta_coll->store_superblock(t, superblock);
+ return store->do_transaction(meta_coll->collection(), std::move(t));
}).then([cluster_fsid, this] {
store->write_meta("ceph_fsid", cluster_fsid.to_string());
store->write_meta("whoami", std::to_string(whoami));
const auto data_path = local_conf().get_val<std::string>("osd_data");
store = std::make_unique<ceph::os::CyanStore>(data_path);
return store->mount().then([this] {
- meta_coll = store->open_collection(coll_t::meta());
- return read_superblock();
- }).then([this] {
- osdmap = get_map(superblock.current_epoch);
+ meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
+ store.get());
+ return meta_coll->load_superblock();
+ }).then([this](OSDSuperblock&& sb) {
+ superblock = std::move(sb);
+ return get_map(superblock.current_epoch);
+ }).then([this](seastar::lw_shared_ptr<OSDMap> map) {
+ osdmap = std::move(map);
return client_msgr->start(&dispatchers);
}).then([this] {
return monc.start();
return seastar::now();
}
-seastar::lw_shared_ptr<OSDMap> OSD::get_map(epoch_t e)
+seastar::future<seastar::lw_shared_ptr<OSDMap>> OSD::get_map(epoch_t e)
{
// TODO: use LRU cache for managing osdmap, fallback to disk if we have to
- return osdmaps[e];
+ if (auto found = osdmaps.find(e); found != osdmaps.end()) {
+ return seastar::make_ready_future<seastar::lw_shared_ptr<OSDMap>>(
+ found->second);
+ } else {
+ return load_map_bl(e).then([e, this](bufferlist bl) {
+ auto osdmap = seastar::make_lw_shared<OSDMap>();
+ osdmap->decode(bl);
+ osdmaps.emplace(e, osdmap);
+ return seastar::make_ready_future<decltype(osdmap)>(std::move(osdmap));
+ });
+ }
+}
+
+void OSD::store_map_bl(ceph::os::Transaction& t,
+ epoch_t e, bufferlist&& bl)
+{
+ meta_coll->store_map(t, e, bl);
+ map_bl_cache[e] = std::move(bl);
+}
+
+seastar::future<bufferlist> OSD::load_map_bl(epoch_t e)
+{
+ if (auto found = map_bl_cache.find(e); found != map_bl_cache.end()) {
+ return seastar::make_ready_future<bufferlist>(found->second);
+ } else {
+ return meta_coll->load_map(e);
+ }
}
-void OSD::store_maps(epoch_t start, Ref<MOSDMap> m)
+seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
+ epoch_t start, Ref<MOSDMap> m)
{
- for (epoch_t e = start; e <= m->get_last(); e++) {
- seastar::lw_shared_ptr<OSDMap> o;
+ return seastar::do_for_each(boost::counting_iterator<epoch_t>(start),
+ boost::counting_iterator<epoch_t>(m->get_last() + 1),
+ [&t, m, this](epoch_t e) {
if (auto p = m->maps.find(e); p != m->maps.end()) {
- o = seastar::make_lw_shared<OSDMap>();
+ auto o = seastar::make_lw_shared<OSDMap>();
o->decode(p->second);
+ logger().info("store_maps osdmap.{}", e);
+ store_map_bl(t, e, std::move(std::move(p->second)));
+ osdmaps.emplace(e, std::move(o));
+ return seastar::now();
} else if (auto p = m->incremental_maps.find(e);
p != m->incremental_maps.end()) {
- o = get_map(e - 1);
OSDMap::Incremental inc;
auto i = p->second.cbegin();
inc.decode(i);
- o->apply_incremental(inc);
+ return load_map_bl(e - 1)
+ .then([&t, e, inc=std::move(inc), this](bufferlist bl) {
+ auto o = seastar::make_lw_shared<OSDMap>();
+ o->decode(bl);
+ o->apply_incremental(inc);
+ bufferlist fbl;
+ o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
+ store_map_bl(t, e, std::move(fbl));
+ osdmaps.emplace(e, std::move(o));
+ return seastar::now();
+ });
} else {
logger().error("MOSDMap lied about what maps it had?");
+ return seastar::now();
}
- osdmaps[e] = std::move(o);
- }
+ });
}
seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
+ logger().info("{}({})", __func__, epoch);
if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
force_request) {
return monc.renew_subs();
}
}
-void OSD::write_superblock(ceph::os::Transaction& t)
-{
- bufferlist bl;
- encode(superblock, bl);
- t.write(meta_coll->cid, OSD_SUPERBLOCK_GOBJECT, 0, bl.length(), bl);
-}
-
-seastar::future<> OSD::read_superblock()
-{
- // just-enough superblock so mon can ack my MOSDBoot
- return store->read(meta_coll, OSD_SUPERBLOCK_GOBJECT, 0, 0)
- .then([this] (bufferlist&& bl) {
- auto p = bl.cbegin();
- decode(superblock, p);
- return seastar::now();
- });
-}
-
seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
Ref<MOSDMap> m)
{
skip_maps = true;
start = first;
}
- // TODO: store new maps: queue for disk and put in the osdmap cache
- store_maps(start, m);
- // even if this map isn't from a mon, we may have satisfied our subscription
- monc.sub_got("osdmap", last);
- if (!superblock.oldest_map || skip_maps) {
- superblock.oldest_map = first;
- }
- superblock.newest_map = last;
- superblock.current_epoch = last;
+ return seastar::do_with(ceph::os::Transaction{},
+ [=](auto& t) {
+ return store_maps(t, start, m).then([=, &t] {
+ // even if this map isn't from a mon, we may have satisfied our subscription
+ monc.sub_got("osdmap", last);
+ if (!superblock.oldest_map || skip_maps) {
+ superblock.oldest_map = first;
+ }
+ superblock.newest_map = last;
+ superblock.current_epoch = last;
- // note in the superblock that we were clean thru the prior epoch
- if (boot_epoch && boot_epoch >= superblock.mounted) {
- superblock.mounted = boot_epoch;
- superblock.clean_thru = last;
- }
- // TODO: write to superblock and commit the transaction
- return committed_osd_maps(start, last, m);
+ // note in the superblock that we were clean thru the prior epoch
+ if (boot_epoch && boot_epoch >= superblock.mounted) {
+ superblock.mounted = boot_epoch;
+ superblock.clean_thru = last;
+ }
+ meta_coll->store_superblock(t, superblock);
+ return store->do_transaction(meta_coll->collection(), std::move(t));
+ });
+ }).then([=] {
+ // TODO: write to superblock and commit the transaction
+ return committed_osd_maps(start, last, m);
+ });
}
seastar::future<> OSD::committed_osd_maps(version_t first,
{
logger().info("osd.{}: committed_osd_maps({}, {})", whoami, first, last);
// advance through the new maps
- for (epoch_t cur = first; cur <= last; cur++) {
- osdmap = get_map(cur);
- if (up_epoch != 0 &&
- osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == client_msgr->get_myaddrs()) {
- up_epoch = osdmap->get_epoch();
- if (!boot_epoch) {
- boot_epoch = osdmap->get_epoch();
+ return seastar::parallel_for_each(boost::irange(first, last + 1),
+ [this](epoch_t cur) {
+ return get_map(cur).then([this](seastar::lw_shared_ptr<OSDMap> o) {
+ if (up_epoch != 0 &&
+ osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == client_msgr->get_myaddrs()) {
+ up_epoch = osdmap->get_epoch();
+ if (!boot_epoch) {
+ boot_epoch = osdmap->get_epoch();
+ }
+ }
+ });
+ }).then([m, this] {
+ if (osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == client_msgr->get_myaddrs() &&
+ bind_epoch < osdmap->get_up_from(whoami)) {
+ if (state.is_booting()) {
+ logger().info("osd.{}: activating...", whoami);
+ state.set_active();
+ beacon_timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_beacon_report_interval));
}
}
- }
-
- if (osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == client_msgr->get_myaddrs() &&
- bind_epoch < osdmap->get_up_from(whoami)) {
- if (state.is_booting()) {
- logger().info("osd.{}: activating...", whoami);
- state.set_active();
- beacon_timer.arm_periodic(
- std::chrono::seconds(local_conf()->osd_beacon_report_interval));
- }
- }
- if (state.is_active()) {
- logger().info("osd.{}: now active", whoami);
- if (!osdmap->exists(whoami)) {
- return shutdown();
- }
- if (should_restart()) {
- return restart();
+ if (state.is_active()) {
+ logger().info("osd.{}: now active", whoami);
+ if (!osdmap->exists(whoami)) {
+ return shutdown();
+ }
+ if (should_restart()) {
+ return restart();
+ } else {
+ return seastar::now();
+ }
+ } else if (state.is_preboot()) {
+ logger().info("osd.{}: now preboot", whoami);
+
+ if (m->get_source().is_mon()) {
+ logger().info("osd.{}: _preboot", whoami);
+ return _preboot(m->oldest_map, m->newest_map);
+ } else {
+ logger().info("osd.{}: start_boot", whoami);
+ return start_boot();
+ }
} else {
+ logger().info("osd.{}: now ???", whoami);
+ // XXX
return seastar::now();
}
- } else if (state.is_preboot()) {
- logger().info("osd.{}: now preboot", whoami);
-
- if (m->get_source().is_mon()) {
- logger().info("osd.{}: _preboot", whoami);
- return _preboot(m->oldest_map, m->newest_map);
- } else {
- logger().info("osd.{}: start_boot", whoami);
- return start_boot();
- }
- } else {
- logger().info("osd.{}: now ???", whoami);
- // XXX
- return seastar::now();
- }
+ });
}
bool OSD::should_restart() const
min_last_epoch_clean);
return monc.send_message(m);
}
+
+ghobject_t OSD::get_osdmap_pobject_name(epoch_t epoch) {
+ string name = fmt::format("osdmap.{}", epoch);
+ return ghobject_t(hobject_t(sobject_t(object_t(name), 0)));
+}
--- /dev/null
+#include "osd_meta.h"
+
+#include "crimson/os/cyan_collection.h"
+#include "crimson/os/cyan_store.h"
+#include "crimson/os/Transaction.h"
+
+void OSDMeta::create(ceph::os::Transaction& t)
+{
+ t.create_collection(coll->cid, 0);
+}
+
+void OSDMeta::store_map(ceph::os::Transaction& t,
+ epoch_t e, const bufferlist& m)
+{
+ t.write(coll->cid, osdmap_oid(e), 0, m.length(), m);
+}
+
+seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
+{
+ return store->read(coll,
+ osdmap_oid(e), 0, 0,
+ CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+}
+
+void OSDMeta::store_superblock(ceph::os::Transaction& t,
+ const OSDSuperblock& superblock)
+{
+ bufferlist bl;
+ encode(superblock, bl);
+ t.write(coll->cid, superblock_oid(), 0, bl.length(), bl);
+}
+
+seastar::future<OSDSuperblock> OSDMeta::load_superblock()
+{
+ return store->read(coll, superblock_oid(), 0, 0)
+ .then([this] (bufferlist&& bl) {
+ auto p = bl.cbegin();
+ OSDSuperblock superblock;
+ decode(superblock, p);
+ return seastar::make_ready_future<OSDSuperblock>(std::move(superblock));
+ });
+}
+
+seastar::future<pg_pool_t,
+ std::string,
+ OSDMeta::ec_profile_t>
+OSDMeta::load_final_pool_info(int64_t pool) {
+ return store->read(coll, final_pool_info_oid(pool),
+ 0, 0).then([this] (bufferlist&& bl) {
+ auto p = bl.cbegin();
+ pg_pool_t pi;
+ string name;
+ ec_profile_t ec_profile;
+ decode(pi, p);
+ decode(name, p);
+ decode(ec_profile, p);
+ return seastar::make_ready_future<pg_pool_t,
+ string,
+ ec_profile_t>(std::move(pi),
+ std::move(name),
+ std::move(ec_profile));
+ });
+}
+
+ghobject_t OSDMeta::osdmap_oid(epoch_t epoch)
+{
+ string name = fmt::format("osdmap.{}", epoch);
+ return ghobject_t(hobject_t(sobject_t(object_t(name), 0)));
+}
+
+ghobject_t OSDMeta::final_pool_info_oid(int64_t pool)
+{
+ string name = fmt::format("final_pool_{}", pool);
+ return ghobject_t(hobject_t(sobject_t(object_t(name), CEPH_NOSNAP)));
+}
+
+ghobject_t OSDMeta::superblock_oid()
+{
+ return ghobject_t(hobject_t(sobject_t(object_t("osd_superblock"), 0)));
+}