dout(10) << "do_recovery starting " << max
<< " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
<< *pg << dendl;
+#ifdef DEBUG_RECOVERY_OIDS
+ dout(20) << " active was " << recovery_oids << dendl;
+#endif
int started = pg->start_recovery_ops(max);
- recovery_ops_active += started;
- pg->recovery_ops_active += started;
+
+ dout(10) << "do_recovery started " << started
+ << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
+ << *pg << dendl;
+
if (started < max)
pg->recovery_item.remove_myself();
pg->put();
}
-void OSD::start_recovery_op(PG *pg, int count)
+void OSD::start_recovery_op(PG *pg, const sobject_t& soid)
{
recovery_wq.lock();
- dout(10) << "start_recovery_op " << *pg << " count " << count
+ dout(10) << "start_recovery_op " << *pg << " " << soid
<< " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
<< dendl;
assert(recovery_ops_active >= 0);
- assert(pg->recovery_ops_active >= 0);
- recovery_ops_active += count;
- pg->recovery_ops_active += count;
+ recovery_ops_active++;
+
+#ifdef DEBUG_RECOVERY_OIDS
+ dout(20) << " active was " << recovery_oids << dendl;
+ assert(recovery_oids.count(soid) == 0);
+ recovery_oids.insert(soid);
+ assert((int)recovery_oids.size() == recovery_ops_active);
+#endif
+
recovery_wq.unlock();
}
-void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
+void OSD::finish_recovery_op(PG *pg, const sobject_t& soid, bool dequeue)
{
- dout(10) << "finish_recovery_op " << *pg << " count " << count
+ dout(10) << "finish_recovery_op " << *pg << " " << soid
<< " dequeue=" << dequeue
<< " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
<< dendl;
recovery_wq.lock();
// adjust count
- recovery_ops_active -= count;
+ recovery_ops_active--;
assert(recovery_ops_active >= 0);
- pg->recovery_ops_active -= count;
- assert(pg->recovery_ops_active >= 0);
+
+#ifdef DEBUG_RECOVERY_OIDS
+ dout(20) << " active oids was " << recovery_oids << dendl;
+ assert(recovery_oids.count(soid));
+ recovery_oids.erase(soid);
+ assert((int)recovery_oids.size() == recovery_ops_active);
+#endif
if (dequeue)
pg->recovery_item.remove_myself();
xlist<PG*> recovery_queue;
utime_t defer_recovery_until;
int recovery_ops_active;
+#ifdef DEBUG_RECOVERY_OIDS
+ set<sobject_t> recovery_oids;
+#endif
struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
} recovery_wq;
bool queue_for_recovery(PG *pg);
- void start_recovery_op(PG *pg, int count);
- void finish_recovery_op(PG *pg, int count, bool more);
+ void start_recovery_op(PG *pg, const sobject_t& soid);
+ void finish_recovery_op(PG *pg, const sobject_t& soid, bool dequeue);
void defer_recovery(PG *pg);
void do_recovery(PG *pg);
bool _recover_now();
put();
}
+void PG::start_recovery_op(const sobject_t& soid)
+{
+ dout(10) << "start_recovery_op " << soid
+#ifdef DEBUG_RECOVERY_OIDS
+ << " (" << recovering_oids << ")"
+#endif
+ << dendl;
+ assert(recovery_ops_active >= 0);
+ recovery_ops_active++;
+#ifdef DEBUG_RECOVERY_OIDS
+ assert(recovering_oids.count(soid) == 0);
+ recovering_oids.insert(soid);
+#endif
+ osd->start_recovery_op(this, soid);
+}
+
+void PG::finish_recovery_op(const sobject_t& soid, bool dequeue)
+{
+ dout(10) << "finish_recovery_op " << soid
+#ifdef DEBUG_RECOVERY_OIDS
+ << " (" << recovering_oids << ")"
+#endif
+ << dendl;
+ assert(recovery_ops_active > 0);
+ recovery_ops_active--;
+#ifdef DEBUG_RECOVERY_OIDS
+ assert(recovering_oids.count(soid));
+ recovering_oids.erase(soid);
+#endif
+ osd->finish_recovery_op(this, soid, dequeue);
+}
+
+
void PG::defer_recovery()
{
osd->defer_recovery(this);
log.reset_recovery_pointers();
finish_sync_event = 0;
- osd->finish_recovery_op(this, recovery_ops_active, true);
+ sobject_t soid;
+ while (recovery_ops_active > 0) {
+#ifdef DEBUG_RECOVERY_OIDS
+ soid = *recovering_oids.begin();
+#endif
+ finish_recovery_op(soid, true);
+ }
_clear_recovery_state(); // pg impl specific hook
}
osd->get_logclient()->log(LOG_ERROR, ss);
reorder = true;
}
- last = e.version;
- if (e.version > log.bottom || log.backlog) { // ignore items below log.bottom
- if (opos % 4096 < pos % 4096)
- ondisklog.block_map[opos] = e.version;
- log.log.push_back(e);
- } else {
- dout(20) << "read_log ignoring entry at " << pos << dendl;
+ if (e.version <= log.bottom && !log.backlog) {
+ dout(20) << "read_log ignoring entry at " << pos << " below log.bottom" << dendl;
+ continue;
}
+ if (last.version == e.version.version) {
+ dout(0) << "read_log got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl;
+ log.log.pop_back();
+ stringstream ss;
+ ss << info.pgid << " read_log got dup " << e.version << " after " << last;
+ osd->get_logclient()->log(LOG_ERROR, ss);
+ }
+
+ if (opos % 4096 < pos % 4096)
+ ondisklog.block_map[opos] = e.version;
+ log.log.push_back(e);
+ last = e.version;
// [repair] at end of log?
if (!p.end() && e.version == info.last_update) {
using namespace __gnu_cxx;
+#define DEBUG_RECOVERY_OIDS // track set of recovering oids explicitly, to find counting bugs
+
+
class OSD;
class MOSDOp;
class MOSDSubOp;
xlist<PG*>::item recovery_item, backlog_item, scrub_item, snap_trim_item, stat_queue_item;
int recovery_ops_active;
+#ifdef DEBUG_RECOVERY_OIDS
+ set<sobject_t> recovering_oids;
+#endif
epoch_t generate_backlog_epoch; // epoch we decided to build a backlog.
utime_t replay_until;
void clear_recovery_state();
virtual void _clear_recovery_state() = 0;
void defer_recovery();
+ void start_recovery_op(const sobject_t& soid);
+ void finish_recovery_op(const sobject_t& soid, bool dequeue=false);
loff_t get_log_write_pos() {
return 0;
<< ", pulling"
<< dendl;
pull(soid);
- osd->start_recovery_op(this, 1);
+ start_recovery_op(soid);
}
waiting_for_missing_object[soid].push_back(m);
}
// push it before this update.
// FIXME, this is probably extra much work (eg if we're about to overwrite)
push_to_replica(soid, peer);
- osd->start_recovery_op(this, 1);
+ start_recovery_op(soid);
}
issue_repop(repop, peer, now, old_exists, old_size, old_version);
if (pushing[soid].empty()) {
dout(10) << "pushed " << soid << " to all replicas" << dendl;
- finish_recovery_op();
+ finish_recovery_op(soid);
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid] << dendl;
// close out pull op?
if (pulling.count(soid)) {
pulling.erase(soid);
- finish_recovery_op();
+ finish_recovery_op(soid);
}
update_stats();
assert(peer_missing.count(peer));
if (peer_missing[peer].is_missing(soid)) {
push_to_replica(soid, peer); // ok, push it, and they (will) have it now.
- osd->start_recovery_op(this, 1);
+ start_recovery_op(soid);
}
}
}
void ReplicatedPG::_clear_recovery_state()
{
missing_loc.clear();
+#ifdef DEBUG_RECOVERY_OIDS
+ recovering_oids.clear();
+#endif
pulling.clear();
pushing.clear();
}
return started;
}
-void ReplicatedPG::finish_recovery_op()
-{
- dout(10) << "finish_recovery_op" << dendl;
- osd->finish_recovery_op(this, 1, false);
-}
+
/**
}
}
- if (pull(soid))
+ if (pull(soid)) {
++started;
- else
+ start_recovery_op(soid);
+ } else
++skipped;
if (started >= max)
return started;
sobject_t soid = peer_missing[peer].rmissing.begin()->second;
eversion_t v = peer_missing[peer].rmissing.begin()->first;
+ start_recovery_op(soid);
+ started++;
+
push_to_replica(soid, peer);
// do other peers need it too?
push_to_replica(soid, peer);
}
- if (++started >= max)
+ if (started >= max)
return started;
}
void queue_for_recovery();
int start_recovery_ops(int max);
- void finish_recovery_op();
int recover_primary(int max);
int recover_replicas(int max);