*/
OPTION(osd_client_op_priority, OPT_U32)
OPTION(osd_recovery_op_priority, OPT_U32)
+OPTION(osd_peering_op_priority, OPT_U32)
OPTION(osd_snap_trim_priority, OPT_U32)
OPTION(osd_snap_trim_cost, OPT_U32) // set default cost equal to 1MB io
.set_default(3)
.set_description(""),
+ Option("osd_peering_op_priority", Option::TYPE_UINT, Option::LEVEL_DEV)
+ .set_default(255)
+ .set_description(""),
+
Option("osd_snap_trim_priority", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(5)
.set_description(""),
epoch));
}
+void OSD::enqueue_peering_evt(PG *pg, PGPeeringEventRef evt)
+{
+ dout(15) << __func__ << " " << pg->get_pgid() << " " << evt->get_desc() << dendl;
+ op_shardedwq.queue(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pg->get_pgid(), evt)),
+ 10,
+ cct->_conf->osd_peering_op_priority,
+ utime_t(),
+ 0,
+ evt->get_epoch_sent()));
+}
+
+void OSD::enqueue_peering_evt_front(PG *pg, PGPeeringEventRef evt)
+{
+ dout(15) << __func__ << " " << pg->get_pgid() << " " << evt->get_desc() << dendl;
+ op_shardedwq.queue_front(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pg->get_pgid(), evt)),
+ 10,
+ cct->_conf->osd_peering_op_priority,
+ utime_t(),
+ 0,
+ evt->get_epoch_sent()));
+}
+
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
service.send_pg_temp();
}
+void OSD::dequeue_peering_evt(
+ PG *pg,
+ PGPeeringEventRef evt,
+ ThreadPool::TPHandle& handle)
+{
+ if (pg->is_deleting()) {
+ pg->unlock();
+ return;
+ }
+ auto curmap = service.get_osdmap();
+ PG::RecoveryCtx rctx = create_context();
+ set<PGRef> split_pgs;
+ if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
+ advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs, true);
+ }
+ pg->do_peering_event(evt, &rctx);
+ auto need_up_thru = pg->get_need_up_thru();
+ auto same_interval_since = pg->get_same_interval_since();
+ if (!split_pgs.empty()) {
+ rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
+ split_pgs.clear();
+ }
+ dispatch_context_transaction(rctx, pg, &handle);
+ pg->unlock();
+
+ if (need_up_thru) {
+ queue_want_up_thru(same_interval_since);
+ }
+ dispatch_context(rctx, 0, curmap, &handle);
+
+ service.send_pg_temp();
+}
+
// --------------------------------
const char** OSD::get_tracked_conf_keys() const
* and already requeued the items.
*/
friend class PGOpItem;
+ friend class PGPeeringItem;
friend class PGRecovery;
class ShardedOpWQ
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle);
+ void enqueue_peering_evt(
+ PG *pg,
+ PGPeeringEventRef ref);
+ void enqueue_peering_evt_front(
+ PG *pg,
+ PGPeeringEventRef ref);
+ void dequeue_peering_evt(
+ PG *pg,
+ PGPeeringEventRef ref,
+ ThreadPool::TPHandle& handle);
+
// -- peering queue --
struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
list<PG*> peering_queue;
pg->unlock();
}
+void PGPeeringItem::run(
+ OSD *osd,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_peering_evt(pg.get(), evt, handle);
+}
+
void PGSnapTrim::run(OSD *osd,
PGRef& pg,
ThreadPool::TPHandle &handle)
#include "osd/PG.h"
#include "common/mClockCommon.h"
#include "messages/MOSDOp.h"
-
+#include "PGPeeringEvent.h"
class OSD;
public:
enum class op_type_t {
client_op,
+ peering_event,
bg_snaptrim,
bg_recovery,
bg_scrub
void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
};
+class PGPeeringItem : public PGOpQueueable {
+ PGPeeringEventRef evt;
+public:
+ PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::peering_event;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
+ }
+ void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
+
class PGSnapTrim : public PGOpQueueable {
epoch_t epoch_queued;
public:
{
dout(10) << "take_waiters" << dendl;
requeue_map_waiters();
- for (list<PGPeeringEventRef>::iterator i = peering_waiters.begin();
- i != peering_waiters.end();
- ++i) osd->queue_for_peering(this);
- peering_queue.splice(peering_queue.begin(), peering_waiters,
- peering_waiters.begin(), peering_waiters.end());
+ for (auto i = peering_waiters.rbegin();
+ i != peering_waiters.rend();
+ ++i) {
+ osd->osd->enqueue_peering_evt_front(this, *i);
+ }
}
void PG::process_peering_event(RecoveryCtx *rctx)
assert(!peering_queue.empty());
PGPeeringEventRef evt = peering_queue.front();
peering_queue.pop_front();
+ do_peering_event(evt, rctx);
+}
+void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx)
+{
dout(10) << __func__ << ": " << evt->get_desc() << dendl;
if (!have_same_or_newer_map(evt->get_epoch_sent())) {
dout(10) << "deferring event " << evt->get_desc() << dendl;
{
if (old_peering_evt(evt))
return;
- peering_queue.push_back(evt);
- osd->queue_for_peering(this);
+ osd->osd->enqueue_peering_evt(this, evt);
}
void PG::queue_null(epoch_t msg_epoch,
void queue_peering_event(PGPeeringEventRef evt);
void process_peering_event(RecoveryCtx *rctx);
+ void do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rcx);
void queue_query(epoch_t msg_epoch, epoch_t query_epoch,
pg_shard_t from, const pg_query_t& q);
void queue_null(epoch_t msg_epoch, epoch_t query_epoch);