);
}).then([this](OSDSuperblock&& sb) {
superblock = std::move(sb);
+ pg_shard_manager.set_superblock(sb);
return pg_shard_manager.get_local_map(superblock.current_epoch);
}).then([this](OSDMapService::local_cached_map_t&& map) {
osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(map));
superblock.clean_thru = last;
}
pg_shard_manager.get_meta_coll().store_superblock(t, superblock);
+ pg_shard_manager.set_superblock(superblock);
logger().debug("OSD::handle_osd_map: do_transaction...");
return store.do_transaction(
pg_shard_manager.get_meta_coll().collection(),
return seastar::now();
}
-seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
- epoch_t first)
-{
- if (first >= superblock.oldest_map) {
- return pg_shard_manager.load_map_bls(first, superblock.newest_map)
- .then([this, conn, first](auto&& bls) {
- auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
- osdmap->get_encoding_features());
- m->oldest_map = first;
- m->newest_map = superblock.newest_map;
- m->maps = std::move(bls);
- return conn->send(std::move(m));
- });
- } else {
- return pg_shard_manager.load_map_bl(osdmap->get_epoch())
- .then([this, conn](auto&& bl) mutable {
- auto m = crimson::make_message<MOSDMap>(monc->get_fsid(),
- osdmap->get_encoding_features());
- m->oldest_map = superblock.oldest_map;
- m->newest_map = superblock.newest_map;
- m->maps.emplace(osdmap->get_epoch(), std::move(bl));
- return conn->send(std::move(m));
- });
- }
-}
-
seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
Ref<MOSDRepOp> m)
{
seastar::future<> dump_pg_state_history(Formatter*) const;
void print(std::ostream&) const;
- seastar::future<> send_incremental_map(crimson::net::ConnectionRef conn,
- epoch_t first);
-
/// @return the seq id of the pg stats being sent
uint64_t send_pg_stats();
auto instance_handle = get_instance_handle();
auto &ihref = *instance_handle;
return interruptor::with_interruption(
- [this, pgref, this_instance_id, &ihref]() mutable {
+ [this, pgref, this_instance_id, &ihref, &shard_services]() mutable {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
- return osd.send_incremental_map(
- conn, m->get_map_epoch()
+ return shard_services.send_incremental_map(
+ std::ref(*conn), m->get_map_epoch()
).then([this, this_instance_id, pgref] {
logger().debug("{}.{}: discarding", *this, this_instance_id);
pgref->client_request_orderer.remove_request(*this);
return ihref.enter_stage<interruptor>(pp(*pg).send_reply, *this
).then_interruptible(
[this, reply=std::move(reply)]() mutable {
+ logger().debug("{}: sending response", *this);
return conn->send(std::move(reply)).then([] {
return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
});
FORWARD_TO_OSD_SINGLETON(init_meta_coll)
FORWARD_TO_OSD_SINGLETON(get_meta_coll)
+ FORWARD_TO_OSD_SINGLETON(set_superblock)
+
// Core OSDMap methods
FORWARD_TO_OSD_SINGLETON(get_local_map)
FORWARD_TO_OSD_SINGLETON(load_map_bl)
epoch_t first,
epoch_t last)
{
+ ceph_assert(first <= last);
return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
boost::make_counting_iterator<epoch_t>(last + 1),
[this](epoch_t e) {
});
}
+seastar::future<> OSDSingletonState::send_incremental_map(
+ crimson::net::Connection &conn,
+ epoch_t first)
+{
+ if (first >= superblock.oldest_map) {
+ return load_map_bls(
+ first, superblock.newest_map
+ ).then([this, &conn, first](auto&& bls) {
+ auto m = crimson::make_message<MOSDMap>(
+ monc.get_fsid(),
+ osdmap->get_encoding_features());
+ m->oldest_map = first;
+ m->newest_map = superblock.newest_map;
+ m->maps = std::move(bls);
+ return conn.send(std::move(m));
+ });
+ } else {
+ return load_map_bl(osdmap->get_epoch()
+ ).then([this, &conn](auto&& bl) mutable {
+ auto m = crimson::make_message<MOSDMap>(
+ monc.get_fsid(),
+ osdmap->get_encoding_features());
+ m->oldest_map = superblock.oldest_map;
+ m->newest_map = superblock.newest_map;
+ m->maps.emplace(osdmap->get_epoch(), std::move(bl));
+ return conn.send(std::move(m));
+ });
+ }
+}
+
+
};
return *meta_coll;
}
+ OSDSuperblock superblock;
+ void set_superblock(OSDSuperblock _superblock) {
+ superblock = std::move(_superblock);
+ }
+
+ seastar::future<> send_incremental_map(
+ crimson::net::Connection &conn,
+ epoch_t first);
+
auto get_pool_info(int64_t poolid) {
return get_meta_coll().load_final_pool_info(poolid);
}
FORWARD_TO_OSD_SINGLETON(get_pool_info)
FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)
+ FORWARD_TO_OSD_SINGLETON(send_incremental_map)
+
FORWARD_TO_OSD_SINGLETON(osdmap_subscribe)
FORWARD_TO_OSD_SINGLETON(queue_want_pg_temp)
FORWARD_TO_OSD_SINGLETON(remove_want_pg_temp)