std::pair<ghobject_t, bool>
do_delete_work(ceph::os::Transaction &t, ghobject_t _next) final;
- // merge/split not ready
- void clear_ready_to_merge() final {}
- void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {}
- void set_not_ready_to_merge_source(pg_t pgid) final {}
- void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {}
- void set_ready_to_merge_source(eversion_t lu) final {}
+ void clear_ready_to_merge() final {
+ LOG_PREFIX(PG::clear_ready_to_merge);
+ SUBDEBUGDPP(osd, "", *this);
+ (void)shard_services.clear_ready_to_merge(pgid.pgid).discard_result();
+ }
+ void set_not_ready_to_merge_target(pg_t pgid, pg_t src) final {
+ LOG_PREFIX(PG::set_not_ready_to_merge_target);
+ SUBDEBUGDPP(osd, "", *this);
+ (void)shard_services.set_not_ready_to_merge_target(pgid, src).discard_result();
+ }
+ void set_not_ready_to_merge_source(pg_t pgid) final {
+ LOG_PREFIX(PG::set_not_ready_to_merge_source);
+ SUBDEBUGDPP(osd, "", *this);
+ (void)shard_services.set_not_ready_to_merge_source(pgid).discard_result();
+ }
+ void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) final {
+ LOG_PREFIX(PG::set_ready_to_merge_target);
+ SUBDEBUGDPP(osd, "", *this);
+ (void)shard_services.set_ready_to_merge_target(pgid.pgid, lu, les, lec).discard_result();
+ }
+ void set_ready_to_merge_source(eversion_t lu) final {
+ LOG_PREFIX(PG::set_ready_to_merge_source);
+ SUBDEBUGDPP(osd, "", *this);
+ (void)shard_services.set_ready_to_merge_source(pgid.pgid, lu).discard_result();
+ }
void on_active_actmap() final;
void on_active_advmap(const OSDMapRef &osdmap) final;
#include "messages/MOSDMap.h"
#include "messages/MOSDPGCreated.h"
#include "messages/MOSDPGTemp.h"
+#include "messages/MOSDPGReadyToMerge.h"
#include "osd/osd_perf_counters.h"
#include "osd/PeeringState.h"
}
}
+seastar::future<> OSDSingletonState::set_ready_to_merge_source(pg_t pgid,
+ eversion_t version)
+{
+ LOG_PREFIX(OSDSingletonState::set_ready_to_merge_source);
+ DEBUG("{}", pgid);
+ ready_to_merge_source[pgid] = version;
+ ceph_assert(not_ready_to_merge_source.count(pgid) == 0);
+ return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::set_ready_to_merge_target(pg_t pgid,
+ eversion_t version,
+ epoch_t last_epoch_started,
+ epoch_t last_epoch_clean)
+{
+ LOG_PREFIX(OSDSingletonState::set_ready_to_merge_target);
+ DEBUG("{}", pgid);
+ ready_to_merge_target.insert(std::make_pair(pgid,
+ std::make_tuple(version,
+ last_epoch_started,
+ last_epoch_clean)));
+ ceph_assert(not_ready_to_merge_target.count(pgid) == 0);
+ return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::set_not_ready_to_merge_source(pg_t source)
+{
+ LOG_PREFIX(OSDSingletonState::set_not_ready_to_merge_source);
+ DEBUG("{}", source);
+ not_ready_to_merge_source.insert(source);
+ ceph_assert(ready_to_merge_source.count(source) == 0);
+ return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::set_not_ready_to_merge_target(pg_t target, pg_t source)
+{
+ LOG_PREFIX(OSDSingletonState::set_not_ready_to_merge_target);
+ DEBUG("{} source {}", target, source);
+ not_ready_to_merge_target[target] = source;
+ ceph_assert(ready_to_merge_source.count(target) == 0);
+ return send_ready_to_merge();
+
+}
+
+seastar::future<> OSDSingletonState::send_ready_to_merge()
+{
+ LOG_PREFIX(OSDSingletonState::send_ready_to_merge);
+ DEBUG(" ready_to_merge_source: {} not_ready_to_merge_source: {} \
+ ready_to_merge_target: {} not_ready_to_merge_target: {} \
+ sent_ready_to_merge_source {}", ready_to_merge_source,
+ not_ready_to_merge_source, ready_to_merge_target, not_ready_to_merge_target,
+ sent_ready_to_merge_source);
+ for (auto src : not_ready_to_merge_source) {
+ if (sent_ready_to_merge_source.count(src) == 0) {
+ return monc.send_message(crimson::make_message<MOSDPGReadyToMerge>(
+ src,
+ eversion_t{}, eversion_t{}, 0, 0,
+ false,
+ osdmap->get_epoch())).then([this, src] {
+ sent_ready_to_merge_source.insert(src);
+ });
+ }
+ }
+ for (auto p : not_ready_to_merge_target) {
+ if (sent_ready_to_merge_source.count(p.second) == 0) {
+ return monc.send_message(crimson::make_message<MOSDPGReadyToMerge>(
+ p.second,
+ eversion_t{}, eversion_t{}, 0, 0,
+ false,
+ osdmap->get_epoch())).then([this, p] {
+ sent_ready_to_merge_source.insert(p.second);
+ });
+ }
+ }
+ for (auto src : ready_to_merge_source) {
+ if (not_ready_to_merge_source.count(src.first) ||
+ not_ready_to_merge_target.count(src.first.get_parent())) {
+ continue;
+ }
+ auto p = ready_to_merge_target.find(src.first.get_parent());
+ if (p != ready_to_merge_target.end() &&
+ sent_ready_to_merge_source.count(src.first) == 0) {
+ return monc.send_message(crimson::make_message<MOSDPGReadyToMerge>(
+ src.first, // source pgid
+ src.second, // src version
+ std::get<0>(p->second), // target version
+ std::get<1>(p->second), // PG's last_epoch_started
+ std::get<2>(p->second), // PG's last_epoch_clean
+ true,
+ osdmap->get_epoch())).then([this, src] {
+ sent_ready_to_merge_source.insert(src.first);
+ });
+ }
+ }
+ return seastar::now();
+}
+
+void OSDSingletonState::clear_ready_to_merge(pg_t pgid)
+{
+ ready_to_merge_source.erase(pgid);
+ ready_to_merge_target.erase(pgid);
+ not_ready_to_merge_source.erase(pgid);
+ not_ready_to_merge_target.erase(pgid);
+ sent_ready_to_merge_source.erase(pgid);
+}
+
+void OSDSingletonState::clear_sent_ready_to_merge()
+{
+ sent_ready_to_merge_source.clear();
+}
+
+void OSDSingletonState::prune_sent_ready_to_merge(const OSDMapService::cached_map_t osdmap)
+{
+ LOG_PREFIX(OSDSingletonState::prune_sent_ready_to_merge);
+ auto source = sent_ready_to_merge_source.begin();
+ while (source != sent_ready_to_merge_source.end()) {
+ if (!osdmap->pg_exists(*source)) {
+ DEBUG("{}", *source);
+ source = sent_ready_to_merge_source.erase(source);
+ } else {
+ DEBUG(" exist {}", *source);
+ ++source;
+ }
+ }
+}
+
seastar::future<> OSDSingletonState::send_alive(const epoch_t want)
{
LOG_PREFIX(OSDSingletonState::send_alive);
seastar::future<> store_maps(ceph::os::Transaction& t,
epoch_t start, Ref<MOSDMap> m);
void trim_maps(ceph::os::Transaction& t, OSDSuperblock& superblock);
+
+ // -- PG merging --
+ std::map<pg_t, eversion_t> ready_to_merge_source;
+ std::map<pg_t,std::tuple<eversion_t,epoch_t,epoch_t>> ready_to_merge_target;
+ std::set<pg_t> not_ready_to_merge_source;
+ std::map<pg_t,pg_t> not_ready_to_merge_target;
+ std::set<pg_t> sent_ready_to_merge_source;
+ seastar::future<> set_ready_to_merge_source(pg_t pgid,
+ eversion_t version);
+ seastar::future<> set_ready_to_merge_target(pg_t pgid,
+ eversion_t version,
+ epoch_t last_epoch_started,
+ epoch_t last_epoch_clean);
+ seastar::future<> set_not_ready_to_merge_source(pg_t source);
+ seastar::future<> set_not_ready_to_merge_target(pg_t target, pg_t source);
+ void clear_ready_to_merge(pg_t pgid);
+ seastar::future<> send_ready_to_merge();
+ void clear_sent_ready_to_merge();
+ void prune_sent_ready_to_merge(const cached_map_t osdmap);
};
/**
});
}
+ FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_source)
+ FORWARD_TO_OSD_SINGLETON(set_ready_to_merge_target)
+ FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_source)
+ FORWARD_TO_OSD_SINGLETON(set_not_ready_to_merge_target)
+ FORWARD_TO_OSD_SINGLETON(clear_ready_to_merge)
+ FORWARD_TO_OSD_SINGLETON(send_ready_to_merge)
+ FORWARD_TO_OSD_SINGLETON(clear_sent_ready_to_merge)
+ FORWARD_TO_OSD_SINGLETON(prune_sent_ready_to_merge)
+
FORWARD_TO_OSD_SINGLETON(get_pool_info)
FORWARD(get_throttle, get_throttle, local_state.throttler)