}
}
+seastar::future<std::map<epoch_t, bufferlist>> OSD::load_map_bls(
+ epoch_t first,
+ epoch_t last)
+{
+ return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
+ boost::make_counting_iterator<epoch_t>(last + 1),
+ [this](epoch_t e) {
+ return load_map_bl(e).then([e](auto&& bl) {
+ return seastar::make_ready_future<pair<epoch_t, bufferlist>>(
+ std::make_pair(e, std::move(bl)));
+ });
+ },
+ std::map<epoch_t, bufferlist>{},
+ [](auto&& bls, auto&& epoch_bl) {
+ bls.emplace(std::move(epoch_bl));
+ return std::move(bls);
+ });
+}
+
seastar::future<std::unique_ptr<OSDMap>> OSD::load_map(epoch_t e)
{
auto o = std::make_unique<OSDMap>();
return seastar::now();
}
+seastar::future<> OSD::send_incremental_map(crimson::net::Connection* conn,
+ epoch_t first)
+{
+ if (first >= superblock.oldest_map) {
+ return load_map_bls(first, superblock.newest_map)
+ .then([this, conn, first](auto&& bls) {
+ auto m = make_message<MOSDMap>(monc->get_fsid(),
+ osdmap->get_encoding_features());
+ m->oldest_map = first;
+ m->newest_map = superblock.newest_map;
+ m->maps = std::move(bls);
+ return conn->send(m);
+ });
+ } else {
+ return load_map_bl(osdmap->get_epoch())
+ .then([this, conn, first](auto&& bl) mutable {
+ auto m = make_message<MOSDMap>(monc->get_fsid(),
+ osdmap->get_encoding_features());
+ m->oldest_map = superblock.oldest_map;
+ m->newest_map = superblock.newest_map;
+ m->maps.emplace(osdmap->get_epoch(), std::move(bl));
+ return conn->send(m);
+ });
+ }
+}
+
seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
Ref<MOSDRepOp> m)
{
void dump_status(Formatter*) const;
void print(std::ostream&) const;
+
+ seastar::future<> send_incremental_map(crimson::net::Connection* conn,
+ epoch_t first);
private:
seastar::future<> start_boot();
seastar::future<> _preboot(version_t oldest_osdmap, version_t newest_osdmap);
cached_map_t get_map() const final;
seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
seastar::future<bufferlist> load_map_bl(epoch_t e);
+ seastar::future<std::map<epoch_t, bufferlist>>
+ load_map_bls(epoch_t first, epoch_t last);
void store_map_bl(ceph::os::Transaction& t,
epoch_t e, bufferlist&& bl);
seastar::future<> store_maps(ceph::os::Transaction& t,
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
}).then([this, opref](Ref<PG> pgref) {
PG &pg = *pgref;
+ if (__builtin_expect(m->get_map_epoch()
+ < pg.get_info().history.same_primary_since,
+ false)) {
+ return osd.send_incremental_map(conn.get(), m->get_map_epoch());
+ }
return with_blocking_future(
handle.enter(pp(pg).await_map)
).then([this, &pg]() mutable {