osd_logtype.add_inc(l_osd_subop, "subop");
+ osd_logtype.add_inc(l_osd_r_push, "rop");
osd_logtype.add_inc(l_osd_r_push, "r_push");
osd_logtype.add_inc(l_osd_r_pushb, "r_pushb");
osd_logtype.add_inc(l_osd_r_pull, "r_pull");
}
if (oldprimary != pg->get_primary()) {
pg->info.history.same_primary_since = osdmap->get_epoch();
- pg->cancel_recovery();
pg->dirty_info = true;
}
-
+ pg->cancel_recovery();
+
// deactivate.
pg->state_clear(PG_STATE_ACTIVE);
pg->state_clear(PG_STATE_DOWN);
int max = g_conf.osd_recovery_max_active - recovery_ops_active;
dout(10) << "do_recovery starting " << max
- << " (" << recovery_ops_active
- << "/" << g_conf.osd_recovery_max_active << " active) on "
+ << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
<< *pg << dendl;
int started = pg->start_recovery_ops(max);
pg->put();
}
-
-
+void OSD::start_recovery_op(PG *pg, int count)
+{
+ recovery_wq.lock();
+ dout(10) << "start_recovery_op " << *pg << " count " << count
+ << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
+ << dendl;
+ assert(pg->recovery_ops_active >= 0);
+ pg->recovery_ops_active += count;
+ recovery_wq.unlock();
+}
void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
{
dout(10) << "finish_recovery_op " << *pg << " count " << count
- << " dequeue=" << dequeue << dendl;
+ << " dequeue=" << dequeue
+ << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
+ << dendl;
recovery_wq.lock();
// adjust count
recovery_ops_active -= count;
+ assert(recovery_ops_active >= 0);
pg->recovery_ops_active -= count;
+ assert(pg->recovery_ops_active >= 0);
if (dequeue)
pg->recovery_item.remove_myself();
l_osd_r_wr,
l_osd_r_wrb,
l_osd_subop,
+ l_osd_rop,
l_osd_r_push,
l_osd_r_pushb,
l_osd_r_pull,
} recovery_wq;
bool queue_for_recovery(PG *pg);
+ void start_recovery_op(PG *pg, int count);
void finish_recovery_op(PG *pg, int count, bool more);
void defer_recovery(PG *pg);
void do_recovery(PG *pg);
{
out << "pg[" << pg.info
<< " r=" << pg.get_role();
+
+ if (pg.recovery_ops_active)
+ out << " rops=" << pg.recovery_ops_active;
if (pg.log.bottom != pg.info.log_bottom ||
pg.log.top != pg.info.last_update)
<< ", pulling"
<< dendl;
pull(poid);
+ osd->start_recovery_op(this, 1);
}
waiting_for_missing_object[oid].push_back(m);
}
// push it before this update.
// FIXME, this is probably extra much work (eg if we're about to overwrite)
push_to_replica(poid, peer);
+ osd->start_recovery_op(this, 1);
}
}
if (missing.is_missing(head.oid)) {
if (pulling.count(head.oid)) {
dout(10) << " missing but already pulling head " << head << dendl;
+ return false;
} else {
- pull(head);
+ return pull(head);
}
- return false;
}
// check snapset
missing_loc.erase(poid.oid);
// close out pull op?
- if (pulling.count(poid.oid))
+ if (pulling.count(poid.oid)) {
pulling.erase(poid.oid);
+ finish_recovery_op();
+ }
update_stats();
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
assert(peer_missing.count(peer));
- if (peer_missing[peer].is_missing(poid.oid))
+ if (peer_missing[peer].is_missing(poid.oid)) {
push_to_replica(poid, peer); // ok, push it, and they (will) have it now.
+ osd->start_recovery_op(this, 1);
+ }
}
-
- finish_recovery_op();
}
} else {
else
n = recover_primary(max);
started += n;
+ osd->logger->inc(l_osd_rop, n);
if (n < max)
break;
max -= n;
finish_recovery();
} else {
dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
- finish_recovery_op();
}
return started;