From: Matan Breizman Date: Wed, 22 Nov 2023 08:33:46 +0000 (+0000) Subject: crimson/osd: rewrite build_incremental_map_msg X-Git-Tag: v19.3.0~273^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7480bae4b30ec1633f5c8f31855f0556bcbfc136;p=ceph.git crimson/osd: rewrite build_incremental_map_msg * load_map_bls() to load inc maps first (if possible). * build_incremental_map_msg() to load full map in case of a map gap. * Conf options are now maintained: * osd_map_share_max_epochs * osd_map_message_max * osd_map_message_max_bytes Signed-off-by: Matan Breizman --- diff --git a/src/crimson/osd/osdmap_service.h b/src/crimson/osd/osdmap_service.h index 017303536dc0..b70f6635d809 100644 --- a/src/crimson/osd/osdmap_service.h +++ b/src/crimson/osd/osdmap_service.h @@ -12,6 +12,13 @@ class OSDMapService { public: using cached_map_t = OSDMapRef; using local_cached_map_t = LocalOSDMapRef; + enum class encoded_osdmap_type_t { + FULLMAP, + INCMAP + }; + using bls_pair = std::pair; + using bls_map_pair_t = std::pair; + using bls_map_t = std::map; virtual ~OSDMapService() = default; virtual seastar::future get_map(epoch_t e) = 0; diff --git a/src/crimson/osd/shard_services.cc b/src/crimson/osd/shard_services.cc index 9d9efb8945a6..60faaac047bd 100644 --- a/src/crimson/osd/shard_services.cc +++ b/src/crimson/osd/shard_services.cc @@ -417,25 +417,34 @@ read_errorator::future OSDSingletonState::load_inc_map_bl( } } -seastar::future> OSDSingletonState::load_map_bls( +seastar::future OSDSingletonState::load_map_bls( epoch_t first, epoch_t last) { logger().debug("{} loading maps [{},{}]", __func__, first, last); ceph_assert(first <= last); - // TODO: take osd_map_max into account - //int max = cct->_conf->osd_map_message_max; - //ssize_t max_bytes = cct->_conf->osd_map_message_max_bytes; return seastar::map_reduce(boost::make_counting_iterator(first), boost::make_counting_iterator(last + 1), [this](epoch_t e) { - return load_map_bl(e).then([e](auto&& bl) { - return seastar::make_ready_future>( - std::make_pair(e, std::move(bl))); + return load_inc_map_bl(e).safe_then([](auto&& bl) { + return seastar::make_ready_future( + std::make_pair(OSDMapService::encoded_osdmap_type_t::INCMAP, + std::move(bl))); + }, read_errorator::all_same_way([this, e] { + logger().debug("load_map_bls: can't load inc map {}, attempting full map instread", + e); + return load_map_bl(e).then([](auto&& bl) { + return seastar::make_ready_future( + std::make_pair(OSDMapService::encoded_osdmap_type_t::FULLMAP, + std::move(bl))); + }); + })).then([e] (auto&& loaded_map) { + return seastar::make_ready_future( + std::make_pair(e, std::move(loaded_map))); }); }, - std::map{}, + OSDMapService::bls_map_t{}, [](auto&& bls, auto&& epoch_bl) { bls.emplace(std::move(epoch_bl)); return std::move(bls); @@ -479,7 +488,7 @@ seastar::future<> OSDSingletonState::store_maps(ceph::os::Transaction& t, "loading osdmap.{}", e, e - 1); ceph_assert(std::cmp_greater(e, 0u)); return load_map(e - 1).then( - [&added_maps, e, bl=p->second, &t, this](auto o) { + [&added_maps, e, bl=p->second, &t, this](auto o) mutable { OSDMap::Incremental inc; auto i = bl.cbegin(); inc.decode(i); @@ -783,49 +792,80 @@ seastar::future<> ShardServices::dispatch_context( }); } -seastar::future<> OSDSingletonState::send_incremental_map( - crimson::net::Connection &conn, - epoch_t first) +seastar::future> OSDSingletonState::build_incremental_map_msg( + epoch_t first, + epoch_t last) { - logger().info("{}: first osdmap: {} " - "superblock's oldest map: {}", - __func__, first, superblock.get_oldest_map()); - if (first >= superblock.get_oldest_map()) { - // TODO: osd_map_share_max_epochs - // See OSDService::build_incremental_map_msg + return seastar::do_with(crimson::common::local_conf()->osd_map_message_max, + crimson::make_message( + monc.get_fsid(), + osdmap->get_encoding_features()), + [this, &first, last](unsigned int map_message_max, + auto& m) { + m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound; + m->newest_map = superblock.get_newest_map(); + auto maybe_handle_mapgap = seastar::now(); if (first < superblock.cluster_osdmap_trim_lower_bound) { logger().info("{}: cluster osdmap lower bound: {} " - " > first {}, starting with full map", - __func__, superblock.cluster_osdmap_trim_lower_bound, first); + " > first {}, starting with full map", + __func__, superblock.cluster_osdmap_trim_lower_bound, first); // we don't have the next map the target wants, // so start with a full map. first = superblock.cluster_osdmap_trim_lower_bound; + maybe_handle_mapgap = load_map_bl(first).then( + [&first, &map_message_max, &m](auto&& bl) { + m->maps[first] = std::move(bl); + --map_message_max; + ++first; + }); } - return load_map_bls( - first, superblock.get_newest_map() - ).then([this, &conn](auto&& bls) { - auto m = crimson::make_message( - monc.get_fsid(), - osdmap->get_encoding_features()); - m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound; - m->newest_map = superblock.get_newest_map(); - m->maps = std::move(bls); - return conn.send(std::move(m)); - }); - } else { - // See OSDService::send_incremental_map - // just send latest full map - return load_map_bl(osdmap->get_epoch() - ).then([this, &conn](auto&& bl) mutable { - auto m = crimson::make_message( - monc.get_fsid(), - osdmap->get_encoding_features()); - m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound; - m->newest_map = superblock.get_newest_map(); - m->maps.emplace(osdmap->get_epoch(), std::move(bl)); - return conn.send(std::move(m)); + return maybe_handle_mapgap.then([this, first, last, &map_message_max, &m] { + if (first > last) { + // first may be later than last in the case of map gap + ceph_assert(!m->maps.empty()); + return seastar::make_ready_future>(std::move(m)); + } + return load_map_bls( + first, + ((last - first) > map_message_max) ? (first + map_message_max) : last + ).then([&m](auto&& bls) { + ssize_t map_message_max_bytes = crimson::common::local_conf()->osd_map_message_max_bytes; + for (auto const& [e, val] : bls) { + map_message_max_bytes -= val.second.length(); + if (map_message_max_bytes < 0) { + break; + } + if (val.first == OSDMapService::encoded_osdmap_type_t::FULLMAP) { + m->maps.emplace(e, std::move(val.second)); + } else if (val.first == OSDMapService::encoded_osdmap_type_t::INCMAP) { + m->incremental_maps.emplace(e, std::move(val.second)); + } else { + ceph_abort(); + } + } + return seastar::make_ready_future>(std::move(m)); + }); }); + }); +} + +seastar::future<> OSDSingletonState::send_incremental_map( + crimson::net::Connection &conn, + epoch_t first) +{ + epoch_t to = osdmap->get_epoch(); + logger().info("{}: first osdmap: {} " + "superblock's oldest map: {}, " + "to {}", + __func__, first, superblock.get_oldest_map(), to); + if (to > first && (int64_t)(to - first) > crimson::common::local_conf()->osd_map_share_max_epochs) { + logger().debug("{} {} > max epochs to send of {}, only sending most recent,", + __func__, (to - first), crimson::common::local_conf()->osd_map_share_max_epochs); + first = to - crimson::common::local_conf()->osd_map_share_max_epochs; } + return build_incremental_map_msg(first, to).then([&conn](auto&& m) { + return conn.send(std::move(m)); + }); } seastar::future<> OSDSingletonState::send_incremental_map_to_osd( diff --git a/src/crimson/osd/shard_services.h b/src/crimson/osd/shard_services.h index 62f0080e2a7b..fbfe0e55c001 100644 --- a/src/crimson/osd/shard_services.h +++ b/src/crimson/osd/shard_services.h @@ -270,6 +270,10 @@ private: superblock = std::move(_superblock); } + seastar::future> build_incremental_map_msg( + epoch_t first, + epoch_t last); + seastar::future<> send_incremental_map( crimson::net::Connection &conn, epoch_t first); @@ -320,8 +324,8 @@ private: seastar::future get_local_map(epoch_t e); seastar::future> load_map(epoch_t e); seastar::future load_map_bl(epoch_t e); - seastar::future> read_errorator::future load_inc_map_bl(epoch_t e); + seastar::future load_map_bls(epoch_t first, epoch_t last); void store_map_bl(ceph::os::Transaction& t, epoch_t e, bufferlist&& bl); @@ -510,6 +514,7 @@ public: FORWARD_TO_OSD_SINGLETON(get_pool_info) FORWARD(with_throttle_while, with_throttle_while, local_state.throttler) + FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg) FORWARD_TO_OSD_SINGLETON(send_incremental_map) FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd)