void OSDService::shutdown()
{
+ pg_timer.stop();
+
mono_timer.suspend();
{
#include "include/unordered_map.h"
+#include "common/intrusive_timer.h"
#include "common/shared_cache.hpp"
#include "common/simple_cache.hpp"
#include "messages/MOSDOp.h"
bool prepare_to_stop();
void got_stop_ack();
+ // -- PG timer --
+ common::intrusive_timer pg_timer;
#ifdef PG_DEBUG_REFS
ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock");
case MSG_OSD_REP_SCRUBMAP:
case MSG_OSD_PG_UPDATE_LOG_MISSING:
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
+ case MSG_OSD_PG_PCT:
case MSG_OSD_PG_RECOVERY_DELETE:
case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
case MSG_OSD_PG_LEASE:
#include "messages/MOSDECSubOpReadReply.h"
#include "messages/MOSDPGUpdateLogMissing.h"
#include "messages/MOSDPGUpdateLogMissingReply.h"
+#include "messages/MOSDPGPCT.h"
#include "messages/MOSDBackoff.h"
#include "messages/MOSDScrubReserve.h"
#include "messages/MOSDRepOp.h"
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
return can_discard_replica_op<
MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op);
+ case MSG_OSD_PG_PCT:
+ return can_discard_replica_op<
+ MOSDPGPCT, MSG_OSD_PG_PCT>(op);
case MSG_OSD_PG_SCAN:
return can_discard_scan(op);
#include "ECCommon.h"
#include "osd_types.h"
+#include "pg_features.h"
+#include "common/intrusive_timer.h"
#include "common/WorkQueue.h"
#include "include/Context.h"
#include "os/ObjectStore.h"
eversion_t v,
Context *on_complete) = 0;
+ /**
+ * pg_lock, pg_unlock, pg_add_ref, pg_dec_ref
+ *
+ * Utilities for locking and manipulating refcounts on
+ * implementation.
+ */
+ virtual void pg_lock() = 0;
+ virtual void pg_unlock() = 0;
+ virtual void pg_add_ref() = 0;
+ virtual void pg_dec_ref() = 0;
+
/**
* Bless a context
*
virtual epoch_t pgb_get_osdmap_epoch() const = 0;
virtual const pg_info_t &get_info() const = 0;
virtual const pg_pool_t &get_pool() const = 0;
+ virtual eversion_t get_pg_committed_to() const = 0;
virtual ObjectContextRef get_obc(
const hobject_t &hoid,
virtual void update_last_complete_ondisk(
eversion_t lcod) = 0;
+ virtual void update_pct(
+ eversion_t pct) = 0;
+
virtual void update_stats(
const pg_stat_t &stat) = 0;
GenContext<ThreadPool::TPHandle&> *c,
uint64_t cost) = 0;
+ virtual common::intrusive_timer &get_pg_timer() = 0;
+
virtual pg_shard_t whoami_shard() const = 0;
int whoami() const {
return whoami_shard().osd;
virtual pg_shard_t primary_shard() const = 0;
virtual uint64_t min_peer_features() const = 0;
virtual uint64_t min_upacting_features() const = 0;
+ virtual pg_feature_vec_t get_pg_acting_features() const = 0;
virtual hobject_t get_temp_recovery_object(const hobject_t& target,
eversion_t version) = 0;
bool transaction_applied,
bool async);
+ /**
+ * update_pct
+ *
+ * Updates pg_committed_to. Generally invoked on replica on
+ * receipt of MODPGPCT from primary.
+ */
+ void update_pct(eversion_t pct) {
+ pg_committed_to = pct;
+ }
+
/**
* retrieve the min last_backfill among backfill targets
*/
recovery_state.get_recovery_op_priority());
}
+common::intrusive_timer &PrimaryLogPG::get_pg_timer()
+{
+ return osd->pg_timer;
+}
+
void PrimaryLogPG::replica_clear_repop_obc(
const vector<pg_log_entry_t> &logv,
ObjectStore::Transaction &t)
#include "messages/MOSDOpReply.h"
#include "common/admin_finisher.h"
#include "common/Checksummer.h"
+#include "common/intrusive_timer.h"
#include "common/sharedptr_registry.hpp"
#include "common/shared_cache.hpp"
#include "ReplicatedBackend.h"
eversion_t v,
Context *on_complete) override;
+ void pg_lock() override {
+ lock();
+ }
+ void pg_unlock() override {
+ unlock();
+ }
+ void pg_add_ref() override {
+ intrusive_ptr_add_ref(this);
+ }
+ void pg_dec_ref() override {
+ intrusive_ptr_release(this);
+ }
+
template<class T> class BlessedGenContext;
template<class T> class UnlockedBlessedGenContext;
class BlessedContext;
const pg_pool_t &get_pool() const override {
return pool.info;
}
+ eversion_t get_pg_committed_to() const override {
+ return recovery_state.get_pg_committed_to();
+ }
ObjectContextRef get_obc(
const hobject_t &hoid,
recovery_state.update_last_complete_ondisk(lcod);
}
+ void update_pct(eversion_t pct) override {
+ recovery_state.update_pct(pct);
+ }
+
void update_stats(
const pg_stat_t &stat) override {
recovery_state.update_stats(
GenContext<ThreadPool::TPHandle&> *c,
uint64_t cost) override;
+ common::intrusive_timer &get_pg_timer() override;
+
pg_shard_t whoami_shard() const override {
return pg_whoami;
}
uint64_t min_upacting_features() const override {
return recovery_state.get_min_upacting_features();
}
+ pg_feature_vec_t get_pg_acting_features() const override {
+ return recovery_state.get_pg_acting_features();
+ }
void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch) override {
osd->send_message_osd_cluster(peer, m, from_epoch);
#include "common/errno.h"
#include "ReplicatedBackend.h"
#include "messages/MOSDOp.h"
+#include "messages/MOSDPGPCT.h"
#include "messages/MOSDRepOp.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDPGPush.h"
ObjectStore::CollectionHandle &c,
ObjectStore *store,
CephContext *cct) :
- PGBackend(cct, pg, store, coll, c) {}
+ PGBackend(cct, pg, store, coll, c),
+ pct_callback(this)
+{}
void ReplicatedBackend::run_recovery_op(
PGBackend::RecoveryHandle *_h,
return true;
}
+ case MSG_OSD_PG_PCT:
+ do_pct(op);
+ return true;
+
default:
break;
}
}
in_progress_ops.clear();
clear_recovery_state();
+ cancel_pct_update();
}
int ReplicatedBackend::objects_read_sync(
});
}
+void ReplicatedBackend::do_pct(OpRequestRef op)
+{
+ const MOSDPGPCT *m = static_cast<const MOSDPGPCT*>(op->get_req());
+ dout(10) << __func__ << ": received pct update to "
+ << m->pg_committed_to << dendl;
+ parent->update_pct(m->pg_committed_to);
+}
+
+void ReplicatedBackend::send_pct_update()
+{
+ dout(10) << __func__ << ": sending pct update" << dendl;
+ ceph_assert(
+ PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT));
+ for (const auto &i: parent->get_acting_shards()) {
+ if (i == parent->whoami_shard()) continue;
+
+ auto *pct_update = new MOSDPGPCT(
+ spg_t(parent->whoami_spg_t().pgid, i.shard),
+ get_osdmap_epoch(), parent->get_interval_start_epoch(),
+ parent->get_pg_committed_to()
+ );
+
+ dout(10) << __func__ << ": sending pct update to i " << i
+ << ", i.osd " << i.osd << dendl;
+ parent->send_message_osd_cluster(
+ i.osd, pct_update, get_osdmap_epoch());
+ }
+ dout(10) << __func__ << ": sending pct update complete" << dendl;
+}
+
+void ReplicatedBackend::maybe_kick_pct_update()
+{
+ if (!in_progress_ops.empty()) {
+ dout(20) << __func__ << ": not scheduling pct update, "
+ << in_progress_ops.size() << " ops pending" << dendl;
+ return;
+ }
+
+ if (!PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT)) {
+ dout(20) << __func__ << ": not scheduling pct update, PCT feature not"
+ << " supported" << dendl;
+ return;
+ }
+
+ if (pct_callback.is_scheduled()) {
+ derr << __func__
+ << ": pct_callback is already scheduled, this should be impossible"
+ << dendl;
+ return;
+ }
+
+ int64_t pct_delay;
+ if (!parent->get_pool().opts.get(
+ pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) {
+ dout(20) << __func__ << ": not scheduling pct update, PCT_UPDATE_DELAY not"
+ << " set" << dendl;
+ return;
+ }
+
+ dout(10) << __func__ << ": scheduling pct update after "
+ << pct_delay << " seconds" << dendl;
+ parent->get_pg_timer().schedule_after(
+ pct_callback, std::chrono::seconds(pct_delay));
+}
+
+void ReplicatedBackend::cancel_pct_update()
+{
+ if (pct_callback.is_scheduled()) {
+ dout(10) << __func__ << ": canceling pct update" << dendl;
+ parent->get_pg_timer().cancel(pct_callback);
+ }
+}
+
void ReplicatedBackend::submit_transaction(
const hobject_t &soid,
const object_stat_sum_t &delta_stats,
osd_reqid_t reqid,
OpRequestRef orig_op)
{
+ cancel_pct_update();
+
parent->apply_stats(
soid,
delta_stats);
op->on_commit = 0;
in_progress_ops.erase(op->tid);
}
+ maybe_kick_pct_update();
}
void ReplicatedBackend::do_repop_reply(OpRequestRef op)
in_progress_ops.erase(iter);
}
}
+ maybe_kick_pct_update();
}
int ReplicatedBackend::be_deep_scrub(
op(op), v(v) {}
};
std::map<ceph_tid_t, ceph::ref_t<InProgressOp>> in_progress_ops;
+
+ /// Invoked by pct_callback to update PCT after a pause in IO
+ void send_pct_update();
+
+ /// Handle MOSDPGPCT message
+ void do_pct(OpRequestRef op);
+
+ /// Kick pct timer if repop_queue is empty
+ void maybe_kick_pct_update();
+
+ /// Kick pct timer if repop_queue is empty
+ void cancel_pct_update();
+
+ struct pct_callback_t final : public common::intrusive_timer::callback_t {
+ ReplicatedBackend *backend;
+
+ pct_callback_t(ReplicatedBackend *backend) : backend(backend) {}
+
+ void lock() override {
+ return backend->parent->pg_lock();
+ }
+ void unlock() override {
+ return backend->parent->pg_unlock();
+ }
+ void add_ref() override {
+ return backend->parent->pg_add_ref();
+ }
+ void dec_ref() override {
+ return backend->parent->pg_dec_ref();
+ }
+ void invoke() override {
+ return backend->send_pct_update();
+ }
+ } pct_callback;
public:
friend class C_OSD_OnOpCommit;
#define PG_HAVE_FEATURE(x, name) \
(((x) & (PG_FEATUREMASK_##name)) == (PG_FEATUREMASK_##name))
+DEFINE_PG_FEATURE(0, 1, PCT)
+
static constexpr pg_feature_vec_t PG_FEATURE_NONE = 0ull;
-static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL = 0ull;
static constexpr pg_feature_vec_t PG_FEATURE_CRIMSON_ALL = 0ull;
+static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL =
+ PG_FEATURE_PCT;