cct->_conf->osd_command_thread_timeout,
cct->_conf->osd_command_thread_suicide_timeout,
&command_tp),
- remove_wq(
- cct,
- store,
- cct->_conf->osd_remove_thread_timeout,
- cct->_conf->osd_remove_thread_suicide_timeout,
- &disk_tp),
service(this)
{
monc->set_messenger(client_messenger);
}
}
-OSD::res_result OSD::_try_resurrect_pg(
- OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state)
-{
- assert(resurrected);
- assert(old_pg_state);
- // find nearest ancestor
- DeletingStateRef df;
- spg_t cur(pgid);
- while (true) {
- df = service.deleting_pgs.lookup(cur);
- if (df)
- break;
- if (!cur.ps())
- break;
- cur = cur.get_parent();
- }
- if (!df)
- return RES_NONE; // good to go
-
- df->old_pg_state->lock();
- OSDMapRef create_map = df->old_pg_state->get_osdmap();
- df->old_pg_state->unlock();
-
- set<spg_t> children;
- if (cur == pgid) {
- if (df->try_stop_deletion()) {
- dout(10) << __func__ << ": halted deletion on pg " << pgid << dendl;
- *resurrected = cur;
- *old_pg_state = df->old_pg_state;
- service.deleting_pgs.remove(pgid); // PG is no longer being removed!
- return RES_SELF;
- } else {
- // raced, ensure we don't see DeletingStateRef when we try to
- // delete this pg
- service.deleting_pgs.remove(pgid);
- return RES_NONE;
- }
- } else if (cur.is_split(create_map->get_pg_num(cur.pool()),
- curmap->get_pg_num(cur.pool()),
- &children) &&
- children.count(pgid)) {
- if (df->try_stop_deletion()) {
- dout(10) << __func__ << ": halted deletion on ancestor pg " << pgid
- << dendl;
- *resurrected = cur;
- *old_pg_state = df->old_pg_state;
- service.deleting_pgs.remove(cur); // PG is no longer being removed!
- return RES_PARENT;
- } else {
- /* this is not a problem, failing to cancel proves that all objects
- * have been removed, so no hobject_t overlap is possible
- */
- return RES_NONE;
- }
- }
- return RES_NONE;
-}
-
PG *OSD::_create_lock_pg(
OSDMapRef createmap,
spg_t pgid,
if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
return -EAGAIN;
}
- // do we need to resurrect a deleting pg?
- spg_t resurrected;
- PGRef old_pg_state;
- res_result result = _try_resurrect_pg(
- service.get_osdmap(),
- pgid,
- &resurrected,
- &old_pg_state);
PG::RecoveryCtx rctx = create_context();
- switch (result) {
- case RES_NONE: {
- const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
- if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
- store->get_type() != "bluestore") {
- clog->warn() << "pg " << pgid
- << " is at risk of silent data corruption: "
- << "the pool allows ec overwrites but is not stored in "
- << "bluestore, so deep scrubbing will not detect bitrot";
- }
- PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
- PG::_init(*rctx.transaction, pgid, pp);
-
- int role = osdmap->calc_pg_role(whoami, acting, acting.size());
- if (!pp->is_replicated() && role != pgid.shard)
- role = -1;
-
- pg = _create_lock_pg(
- get_map(epoch),
- pgid, false, false,
- role,
- up, up_primary,
- acting, acting_primary,
- history, pi,
- *rctx.transaction);
- pg->handle_create(&rctx);
- dispatch_context(rctx, pg, osdmap);
-
- dout(10) << *pg << " is new" << dendl;
- pg->queue_peering_event(evt);
- wake_pg_waiters(pg);
- pg->unlock();
- return 0;
- }
- case RES_SELF: {
- old_pg_state->lock();
- OSDMapRef old_osd_map = old_pg_state->get_osdmap();
- int old_role = old_pg_state->get_role();
- vector<int> old_up = old_pg_state->get_up();
- int old_up_primary = old_pg_state->get_up_primary();
- vector<int> old_acting = old_pg_state->get_acting();
- int old_primary = old_pg_state->get_acting_primary();
- pg_history_t old_history = old_pg_state->get_history();
- PastIntervals old_past_intervals = old_pg_state->get_past_intervals();
- old_pg_state->unlock();
- pg = _create_lock_pg(
- old_osd_map,
- resurrected,
- false,
- true,
- old_role,
- old_up,
- old_up_primary,
- old_acting,
- old_primary,
- old_history,
- old_past_intervals,
- *rctx.transaction);
- pg->handle_create(&rctx);
- dispatch_context(rctx, pg, osdmap);
-
- dout(10) << *pg << " is new (resurrected)" << dendl;
-
- pg->queue_peering_event(evt);
- wake_pg_waiters(pg);
- pg->unlock();
- return 0;
- }
- case RES_PARENT: {
- assert(old_pg_state);
- old_pg_state->lock();
- OSDMapRef old_osd_map = old_pg_state->get_osdmap();
- int old_role = old_pg_state->get_role();
- vector<int> old_up = old_pg_state->get_up();
- int old_up_primary = old_pg_state->get_up_primary();
- vector<int> old_acting = old_pg_state->get_acting();
- int old_primary = old_pg_state->get_acting_primary();
- pg_history_t old_history = old_pg_state->get_history();
- PastIntervals old_past_intervals = old_pg_state->get_past_intervals();
- old_pg_state->unlock();
- PG *parent = _create_lock_pg(
- old_osd_map,
- resurrected,
- false,
- true,
- old_role,
- old_up,
- old_up_primary,
- old_acting,
- old_primary,
- old_history,
- old_past_intervals,
- *rctx.transaction
- );
- parent->handle_create(&rctx);
- dispatch_context(rctx, parent, osdmap);
-
- dout(10) << *parent << " is new" << dendl;
-
- assert(service.splitting(pgid));
- peering_wait_for_split[pgid].push_back(evt);
-
- //parent->queue_peering_event(evt);
- parent->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
- wake_pg_waiters(parent);
- parent->unlock();
- return 0;
- }
- default:
- assert(0);
- return 0;
+ const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
+ if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
+ store->get_type() != "bluestore") {
+ clog->warn() << "pg " << pgid
+ << " is at risk of silent data corruption: "
+ << "the pool allows ec overwrites but is not stored in "
+ << "bluestore, so deep scrubbing will not detect bitrot";
}
+ PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
+ PG::_init(*rctx.transaction, pgid, pp);
+
+ int role = osdmap->calc_pg_role(whoami, acting, acting.size());
+ if (!pp->is_replicated() && role != pgid.shard)
+ role = -1;
+
+ pg = _create_lock_pg(
+ get_map(epoch),
+ pgid, false, false,
+ role,
+ up, up_primary,
+ acting, acting_primary,
+ history, pi,
+ *rctx.transaction);
+ pg->handle_create(&rctx);
+ dispatch_context(rctx, pg, osdmap);
+
+ dout(10) << *pg << " is new" << dendl;
+
+ pg->queue_peering_event(evt);
+ wake_pg_waiters(pg);
+ pg->unlock();
+ return 0;
} else {
// already had it. did the mapping change?
if (epoch < pg->get_same_interval_since()) {
logger->set(l_osd_cached_crc, buffer::get_cached_crc());
logger->set(l_osd_cached_crc_adjusted, buffer::get_cached_crc_adjusted());
logger->set(l_osd_missed_crc, buffer::get_missed_crc());
- logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
// osd_lock is not being held, which means the OSD state
// might change when doing the monitor report
ss << "Internal error - command=" << command;
}
-// =========================================
-bool remove_dir(
- PGRef pg,
- ObjectStore *store,
- DeletingStateRef dstate,
- bool *finished,
- ThreadPool::TPHandle &handle)
-{
- CephContext *cct = pg->get_cct();
- vector<ghobject_t> olist;
- int64_t num = 0;
- ObjectStore::Transaction t;
- ghobject_t next;
- handle.reset_tp_timeout();
- store->collection_list(
- pg->coll,
- next,
- ghobject_t::get_max(),
- store->get_ideal_list_max(),
- &olist,
- &next);
- generic_dout(10) << __func__ << " " << olist << dendl;
- // default cont to true, this is safe because caller(OSD::RemoveWQ::_process())
- // will recheck the answer before it really goes on.
- bool cont = true;
- for (vector<ghobject_t>::iterator i = olist.begin();
- i != olist.end();
- ++i) {
- if (i->is_pgmeta())
- continue;
- pg->pg_remove_object(*i, &t);
- if (++num >= cct->_conf->osd_target_transaction_size) {
- C_SaferCond waiter;
- store->queue_transaction(pg->osr.get(), std::move(t), &waiter);
- cont = dstate->pause_clearing();
- handle.suspend_tp_timeout();
- waiter.wait();
- handle.reset_tp_timeout();
- if (cont)
- cont = dstate->resume_clearing();
- if (!cont)
- return false;
- t = ObjectStore::Transaction();
- num = 0;
- }
- }
- if (num) {
- C_SaferCond waiter;
- store->queue_transaction(pg->osr.get(), std::move(t), &waiter);
- cont = dstate->pause_clearing();
- handle.suspend_tp_timeout();
- waiter.wait();
- handle.reset_tp_timeout();
- if (cont)
- cont = dstate->resume_clearing();
- }
- // whether there are more objects to remove in the collection
- *finished = next.is_max();
- return cont;
-}
-
-void OSD::RemoveWQ::_process(
- pair<PGRef, DeletingStateRef> item,
- ThreadPool::TPHandle &handle)
-{
- FUNCTRACE(cct);
- PGRef pg(item.first);
- coll_t coll = coll_t(pg->pg_id);
- pg->osr->flush();
- bool finished = false;
-
- if (!item.second->start_or_resume_clearing())
- return;
-
- bool cont = remove_dir(pg, store, item.second, &finished, handle);
- if (!cont)
- return;
- if (!finished) {
- if (item.second->pause_clearing())
- queue_front(item);
- return;
- }
-
- if (!item.second->start_deleting())
- return;
-
- ObjectStore::Transaction t;
- PGLog::clear_info_log(pg->pg_id, &t);
-
- if (cct->_conf->osd_inject_failure_on_pg_removal) {
- generic_derr << "osd_inject_failure_on_pg_removal" << dendl;
- _exit(1);
- }
- t.remove_collection(coll);
-
- // We need the sequencer to stick around until the op is complete
- store->queue_transaction(
- pg->osr.get(),
- std::move(t),
- 0, // onapplied
- 0, // oncommit
- 0, // onreadable sync
- new ContainerContext<PGRef>(pg),
- TrackedOpRef());
-
- item.second->finish_deleting();
-}
// =========================================
void OSD::ms_handle_connect(Connection *con)
logger->set(l_osd_pg_primary, num_pg_primary);
logger->set(l_osd_pg_replica, num_pg_replica);
logger->set(l_osd_pg_stray, num_pg_stray);
- logger->set(l_osd_pg_removing, remove_wq.get_remove_queue_len());
}
void OSD::activate_map()
dout(10) << " pg " << pgid << " dne" << dendl;
pg_info_t empty(spg_t(pgid.pgid, it->second.to));
- /* This is racy, but that should be ok: if we complete the deletion
- * before the pg is recreated, we'll just start it off backfilling
- * instead of just empty */
- if (service.deleting_pgs.lookup(pgid))
- empty.set_last_backfill(hobject_t());
if (it->second.type == pg_query_t::LOG ||
it->second.type == pg_query_t::FULLLOG) {
ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
}
}
-void OSD::_remove_pg(PG *pg)
-{
- ObjectStore::Transaction rmt ;
-
- // on_removal, which calls remove_watchers_and_notifies, and the erasure from
- // the pg_map must be done together without unlocking the pg lock,
- // to avoid racing with watcher cleanup in ms_handle_reset
- // and handle_notify_timeout
- pg->on_removal(&rmt);
-
- service.cancel_pending_splits_for_parent(pg->pg_id);
- int tr = store->queue_transaction(
- pg->osr.get(), std::move(rmt), NULL,
- new ContainerContext<
- SequencerRef>(pg->osr));
- assert(tr == 0);
-
- DeletingStateRef deleting = service.deleting_pgs.lookup_or_create(
- pg->pg_id,
- make_pair(
- pg->pg_id,
- PGRef(pg))
- );
- remove_wq.queue(make_pair(PGRef(pg), deleting));
-
- service.pg_remove_epoch(pg->pg_id);
-
- // dereference from op_wq
- //op_shardedwq.clear_pg_pointer(pg->pg_id);
-
- // remove from map
- pg_map.erase(pg->pg_id);
- pg->put("PGMap"); // since we've taken it out of map
-}
-
// =========================================================
// RECOVERY
typedef ceph::shared_ptr<ObjectStore::Sequencer> SequencerRef;
class MOSDOp;
-class DeletingState {
- Mutex lock;
- Cond cond;
- enum {
- QUEUED,
- CLEARING_DIR,
- CLEARING_WAITING,
- DELETING_DIR,
- DELETED_DIR,
- CANCELED,
- } status;
- bool stop_deleting;
-public:
- const spg_t pgid;
- const PGRef old_pg_state;
- explicit DeletingState(const pair<spg_t, PGRef> &in) :
- lock("DeletingState::lock"), status(QUEUED), stop_deleting(false),
- pgid(in.first), old_pg_state(in.second) {
- }
-
- /// transition status to CLEARING_WAITING
- bool pause_clearing() {
- Mutex::Locker l(lock);
- assert(status == CLEARING_DIR);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = CLEARING_WAITING;
- return true;
- } ///< @return false if we should cancel deletion
-
- /// start or resume the clearing - transition the status to CLEARING_DIR
- bool start_or_resume_clearing() {
- Mutex::Locker l(lock);
- assert(
- status == QUEUED ||
- status == DELETED_DIR ||
- status == CLEARING_WAITING);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = CLEARING_DIR;
- return true;
- } ///< @return false if we should cancel the deletion
-
- /// transition status to CLEARING_DIR
- bool resume_clearing() {
- Mutex::Locker l(lock);
- assert(status == CLEARING_WAITING);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = CLEARING_DIR;
- return true;
- } ///< @return false if we should cancel deletion
-
- /// transition status to deleting
- bool start_deleting() {
- Mutex::Locker l(lock);
- assert(status == CLEARING_DIR);
- if (stop_deleting) {
- status = CANCELED;
- cond.Signal();
- return false;
- }
- status = DELETING_DIR;
- return true;
- } ///< @return false if we should cancel deletion
-
- /// signal collection removal queued
- void finish_deleting() {
- Mutex::Locker l(lock);
- assert(status == DELETING_DIR);
- status = DELETED_DIR;
- cond.Signal();
- }
-
- /// try to halt the deletion
- bool try_stop_deletion() {
- Mutex::Locker l(lock);
- stop_deleting = true;
- /**
- * If we are in DELETING_DIR or CLEARING_DIR, there are in progress
- * operations we have to wait for before continuing on. States
- * CLEARING_WAITING and QUEUED indicate that the remover will check
- * stop_deleting before queueing any further operations. CANCELED
- * indicates that the remover has already halted. DELETED_DIR
- * indicates that the deletion has been fully queued.
- */
- while (status == DELETING_DIR || status == CLEARING_DIR)
- cond.Wait(lock);
- return status != DELETED_DIR;
- } ///< @return true if we don't need to recreate the collection
-};
-typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
-
class OSD;
class OSDService {
CephContext *cct;
SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
- SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
const int whoami;
ObjectStore *&store;
LogClient &log_client;
protected:
PG *_open_lock_pg(OSDMapRef createmap,
spg_t pg, bool no_lockdep_check=false);
- enum res_result {
- RES_PARENT, // resurrected a parent
- RES_SELF, // resurrected self
- RES_NONE // nothing relevant deleting
- };
- res_result _try_resurrect_pg(
- OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
PG *_create_lock_pg(
OSDMapRef createmap,
void handle_force_recovery(Message *m);
void handle_pg_remove(OpRequestRef op);
- void _remove_pg(PG *pg);
// -- commands --
struct Command {
bool scrub_load_below_threshold();
bool scrub_time_permit(utime_t now);
- // -- removing --
- struct RemoveWQ :
- public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
- CephContext* cct;
- ObjectStore *&store;
- list<pair<PGRef, DeletingStateRef> > remove_queue;
- RemoveWQ(CephContext* cct, ObjectStore *&o, time_t ti, time_t si,
- ThreadPool *tp)
- : ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
- "OSD::RemoveWQ", ti, si, tp), cct(cct), store(o) {}
-
- bool _empty() override {
- return remove_queue.empty();
- }
- void _enqueue(pair<PGRef, DeletingStateRef> item) override {
- remove_queue.push_back(item);
- }
- void _enqueue_front(pair<PGRef, DeletingStateRef> item) override {
- remove_queue.push_front(item);
- }
- bool _dequeue(pair<PGRef, DeletingStateRef> item) {
- ceph_abort();
- }
- pair<PGRef, DeletingStateRef> _dequeue() override {
- assert(!remove_queue.empty());
- pair<PGRef, DeletingStateRef> item = remove_queue.front();
- remove_queue.pop_front();
- return item;
- }
- void _process(pair<PGRef, DeletingStateRef>,
- ThreadPool::TPHandle &) override;
- void _clear() override {
- remove_queue.clear();
- }
- int get_remove_queue_len() {
- lock();
- int r = remove_queue.size();
- unlock();
- return r;
- }
- } remove_wq;
-
// -- status reporting --
MPGStats *collect_pg_stats();
std::vector<OSDHealthMetric> get_health_metrics();