cct->_conf->osd_command_thread_timeout,
cct->_conf->osd_command_thread_suicide_timeout,
&command_tp),
- replay_queue_lock("OSD::replay_queue_lock"),
remove_wq(
cct,
store,
start_boot();
}
- if (is_active()) {
- check_replay_queue();
- }
-
do_waiters();
tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
// =========================================================
// RECOVERY
-/*
- * caller holds osd_lock
- */
-void OSD::check_replay_queue()
-{
- assert(osd_lock.is_locked());
-
- utime_t now = ceph_clock_now();
- list< pair<spg_t,utime_t> > pgids;
- replay_queue_lock.Lock();
- while (!replay_queue.empty() &&
- replay_queue.front().second <= now) {
- pgids.push_back(replay_queue.front());
- replay_queue.pop_front();
- }
- replay_queue_lock.Unlock();
-
- for (list< pair<spg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); ++p) {
- spg_t pgid = p->first;
- pg_map_lock.get_read();
- if (pg_map.count(pgid)) {
- PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
- pg_map_lock.unlock();
- dout(10) << "check_replay_queue " << *pg << dendl;
- if ((pg->is_active() || pg->is_activating()) &&
- pg->is_replay() &&
- pg->is_primary() &&
- pg->replay_until == p->second) {
- pg->replay_queued_ops();
- }
- pg->unlock();
- } else {
- pg_map_lock.unlock();
- dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl;
- }
- }
-}
-
void OSDService::_maybe_queue_recovery() {
assert(recovery_lock.is_locked_by_me());
uint64_t available_pushes;
void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
ThreadPool::TPHandle &handle);
- Mutex replay_queue_lock;
- list< pair<spg_t, utime_t > > replay_queue;
-
- void check_replay_queue();
// -- scrubbing --
void sched_scrub();
}
}
-void PG::replay_queued_ops()
-{
- assert(is_replay());
- assert(is_active() || is_activating());
- eversion_t c = info.last_update;
- list<OpRequestRef> replay;
- dout(10) << "replay_queued_ops" << dendl;
- state_clear(PG_STATE_REPLAY);
-
- for (map<eversion_t,OpRequestRef>::iterator p = replay_queue.begin();
- p != replay_queue.end();
- ++p) {
- if (p->first.version != c.version+1) {
- dout(10) << "activate replay " << p->first
- << " skipping " << c.version+1 - p->first.version
- << " ops"
- << dendl;
- c = p->first;
- }
- dout(10) << "activate replay " << p->first << " "
- << *p->second->get_req() << dendl;
- replay.push_back(p->second);
- }
- replay_queue.clear();
- if (is_active()) {
- requeue_ops(replay);
- requeue_ops(waiting_for_active);
- assert(waiting_for_peered.empty());
- } else {
- waiting_for_active.splice(waiting_for_active.begin(), replay);
- }
-
- publish_stats_to_osd();
-}
-
void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
{
lock();
}
}
-static void split_replay_queue(
- map<eversion_t, OpRequestRef> *from,
- map<eversion_t, OpRequestRef> *to,
- unsigned match,
- unsigned bits)
-{
- for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
- i != from->end();
- ) {
- if (OSD::split_request(i->second, match, bits)) {
- to->insert(*i);
- from->erase(i++);
- } else {
- ++i;
- }
- }
-}
-
void PG::split_ops(PG *child, unsigned split_bits) {
unsigned match = child->info.pgid.ps();
assert(waiting_for_all_missing.empty());
assert(waiting_for_ack.empty());
assert(waiting_for_ondisk.empty());
assert(waiting_for_active.empty());
- split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
osd->dequeue_pg(this, &waiting_for_peered);
if (was_old_primary != is_primary()) {
state_clear(PG_STATE_CLEAN);
clear_publish_stats();
-
- // take replay queue waiters
- list<OpRequestRef> ls;
- for (map<eversion_t,OpRequestRef>::iterator it = replay_queue.begin();
- it != replay_queue.end();
- ++it)
- ls.push_back(it->second);
- replay_queue.clear();
- requeue_ops(ls);
}
on_role_change();
map<eversion_t,
list<pair<OpRequestRef, version_t> > > waiting_for_ack, waiting_for_ondisk;
- map<eversion_t,OpRequestRef> replay_queue;
void split_ops(PG *child, unsigned split_bits);
void requeue_object_waiters(map<hobject_t, list<OpRequestRef>, hobject_t::BitwiseComparator>& m);
bool choose_acting(pg_shard_t &auth_log_shard,
bool *history_les_bound);
void build_might_have_unfound();
- void replay_queued_ops();
void activate(
ObjectStore::Transaction& t,
epoch_t activation_epoch,