}
}
-seastar::future<std::map<epoch_t, bufferlist>> OSDSingletonState::load_map_bls(
+seastar::future<OSDMapService::bls_map_t> OSDSingletonState::load_map_bls(
epoch_t first,
epoch_t last)
{
logger().debug("{} loading maps [{},{}]",
__func__, first, last);
ceph_assert(first <= last);
- // TODO: take osd_map_max into account
- //int max = cct->_conf->osd_map_message_max;
- //ssize_t max_bytes = cct->_conf->osd_map_message_max_bytes;
return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
boost::make_counting_iterator<epoch_t>(last + 1),
[this](epoch_t e) {
- return load_map_bl(e).then([e](auto&& bl) {
- return seastar::make_ready_future<std::pair<epoch_t, bufferlist>>(
- std::make_pair(e, std::move(bl)));
+ return load_inc_map_bl(e).safe_then([](auto&& bl) {
+ return seastar::make_ready_future<OSDMapService::bls_pair>(
+ std::make_pair(OSDMapService::encoded_osdmap_type_t::INCMAP,
+ std::move(bl)));
+ }, read_errorator::all_same_way([this, e] {
+ logger().debug("load_map_bls: can't load inc map {}, attempting full map instread",
+ e);
+ return load_map_bl(e).then([](auto&& bl) {
+ return seastar::make_ready_future<OSDMapService::bls_pair>(
+ std::make_pair(OSDMapService::encoded_osdmap_type_t::FULLMAP,
+ std::move(bl)));
+ });
+ })).then([e] (auto&& loaded_map) {
+ return seastar::make_ready_future<OSDMapService::bls_map_pair_t>(
+ std::make_pair(e, std::move(loaded_map)));
});
},
- std::map<epoch_t, bufferlist>{},
+ OSDMapService::bls_map_t{},
[](auto&& bls, auto&& epoch_bl) {
bls.emplace(std::move(epoch_bl));
return std::move(bls);
"loading osdmap.{}", e, e - 1);
ceph_assert(std::cmp_greater(e, 0u));
return load_map(e - 1).then(
- [&added_maps, e, bl=p->second, &t, this](auto o) {
+ [&added_maps, e, bl=p->second, &t, this](auto o) mutable {
OSDMap::Incremental inc;
auto i = bl.cbegin();
inc.decode(i);
});
}
-seastar::future<> OSDSingletonState::send_incremental_map(
- crimson::net::Connection &conn,
- epoch_t first)
+seastar::future<MURef<MOSDMap>> OSDSingletonState::build_incremental_map_msg(
+ epoch_t first,
+ epoch_t last)
{
- logger().info("{}: first osdmap: {} "
- "superblock's oldest map: {}",
- __func__, first, superblock.get_oldest_map());
- if (first >= superblock.get_oldest_map()) {
- // TODO: osd_map_share_max_epochs
- // See OSDService::build_incremental_map_msg
+ return seastar::do_with(crimson::common::local_conf()->osd_map_message_max,
+ crimson::make_message<MOSDMap>(
+ monc.get_fsid(),
+ osdmap->get_encoding_features()),
+ [this, &first, last](unsigned int map_message_max,
+ auto& m) {
+ m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound;
+ m->newest_map = superblock.get_newest_map();
+ auto maybe_handle_mapgap = seastar::now();
if (first < superblock.cluster_osdmap_trim_lower_bound) {
logger().info("{}: cluster osdmap lower bound: {} "
- " > first {}, starting with full map",
- __func__, superblock.cluster_osdmap_trim_lower_bound, first);
+ " > first {}, starting with full map",
+ __func__, superblock.cluster_osdmap_trim_lower_bound, first);
// we don't have the next map the target wants,
// so start with a full map.
first = superblock.cluster_osdmap_trim_lower_bound;
+ maybe_handle_mapgap = load_map_bl(first).then(
+ [&first, &map_message_max, &m](auto&& bl) {
+ m->maps[first] = std::move(bl);
+ --map_message_max;
+ ++first;
+ });
}
- return load_map_bls(
- first, superblock.get_newest_map()
- ).then([this, &conn](auto&& bls) {
- auto m = crimson::make_message<MOSDMap>(
- monc.get_fsid(),
- osdmap->get_encoding_features());
- m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound;
- m->newest_map = superblock.get_newest_map();
- m->maps = std::move(bls);
- return conn.send(std::move(m));
- });
- } else {
- // See OSDService::send_incremental_map
- // just send latest full map
- return load_map_bl(osdmap->get_epoch()
- ).then([this, &conn](auto&& bl) mutable {
- auto m = crimson::make_message<MOSDMap>(
- monc.get_fsid(),
- osdmap->get_encoding_features());
- m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound;
- m->newest_map = superblock.get_newest_map();
- m->maps.emplace(osdmap->get_epoch(), std::move(bl));
- return conn.send(std::move(m));
+ return maybe_handle_mapgap.then([this, first, last, &map_message_max, &m] {
+ if (first > last) {
+ // first may be later than last in the case of map gap
+ ceph_assert(!m->maps.empty());
+ return seastar::make_ready_future<MURef<MOSDMap>>(std::move(m));
+ }
+ return load_map_bls(
+ first,
+ ((last - first) > map_message_max) ? (first + map_message_max) : last
+ ).then([&m](auto&& bls) {
+ ssize_t map_message_max_bytes = crimson::common::local_conf()->osd_map_message_max_bytes;
+ for (auto const& [e, val] : bls) {
+ map_message_max_bytes -= val.second.length();
+ if (map_message_max_bytes < 0) {
+ break;
+ }
+ if (val.first == OSDMapService::encoded_osdmap_type_t::FULLMAP) {
+ m->maps.emplace(e, std::move(val.second));
+ } else if (val.first == OSDMapService::encoded_osdmap_type_t::INCMAP) {
+ m->incremental_maps.emplace(e, std::move(val.second));
+ } else {
+ ceph_abort();
+ }
+ }
+ return seastar::make_ready_future<MURef<MOSDMap>>(std::move(m));
+ });
});
+ });
+}
+
+seastar::future<> OSDSingletonState::send_incremental_map(
+ crimson::net::Connection &conn,
+ epoch_t first)
+{
+ epoch_t to = osdmap->get_epoch();
+ logger().info("{}: first osdmap: {} "
+ "superblock's oldest map: {}, "
+ "to {}",
+ __func__, first, superblock.get_oldest_map(), to);
+ if (to > first && (int64_t)(to - first) > crimson::common::local_conf()->osd_map_share_max_epochs) {
+ logger().debug("{} {} > max epochs to send of {}, only sending most recent,",
+ __func__, (to - first), crimson::common::local_conf()->osd_map_share_max_epochs);
+ first = to - crimson::common::local_conf()->osd_map_share_max_epochs;
}
+ return build_incremental_map_msg(first, to).then([&conn](auto&& m) {
+ return conn.send(std::move(m));
+ });
}
seastar::future<> OSDSingletonState::send_incremental_map_to_osd(