// pg waiters
list<class Message*> waiting_for_active;
hash_map<sobject_t,
- list<class Message*> > waiting_for_missing_object;
+ list<class Message*> > waiting_for_missing_object, waiting_for_degraded_object;
map<eversion_t,class MOSDOp*> replay_queue;
+
+ void take_object_waiters(hash_map<sobject_t, list<Message*> >& m);
hash_map<sobject_t, list<Message*> > waiting_for_wr_unlock;
virtual bool is_missing_object(const sobject_t& oid) = 0;
virtual void wait_for_missing_object(const sobject_t& oid, Message *op) = 0;
+ virtual bool is_degraded_object(const sobject_t& oid) = 0;
+ virtual void wait_for_degraded_object(const sobject_t& oid, Message *op) = 0;
+
virtual void on_osd_failure(int osd) = 0;
virtual void on_role_change() = 0;
virtual void on_change() = 0;
{
return missing.missing.count(soid);
}
-
void ReplicatedPG::wait_for_missing_object(const sobject_t& soid, Message *m)
{
waiting_for_missing_object[soid].push_back(m);
}
+bool ReplicatedPG::is_degraded_object(const sobject_t& soid)
+{
+ if (missing.missing.count(soid))
+ return true;
+ for (unsigned i = 1; i < acting.size(); i++) {
+ int peer = acting[i];
+ if (peer_missing.count(peer) &&
+ peer_missing[peer].missing.count(soid))
+ return true;
+ }
+ return false;
+}
+
+void ReplicatedPG::wait_for_degraded_object(const sobject_t& soid, Message *m)
+{
+ assert(is_degraded_object(soid));
+
+ // we don't have it (yet).
+ if (pushing.count(soid)) {
+ dout(7) << "degraded "
+ << soid
+ << ", already pushing"
+ << dendl;
+ } else {
+ dout(7) << "degraded "
+ << soid
+ << ", pushing"
+ << dendl;
+ recover_object_replicas(soid);
+ }
+ waiting_for_degraded_object[soid].push_back(m);
+}
+
// ==========================================================
for (unsigned i=1; i<acting.size(); i++) {
int peer = acting[i];
-
- if (peer_missing.count(peer) &&
- peer_missing[peer].is_missing(soid)) {
- // push it before this update.
- // FIXME, this is probably extra much work (eg if we're about to overwrite)
- repop->obc->ondisk_read_lock();
- push_to_replica(soid, peer);
- start_recovery_op(soid);
- repop->obc->ondisk_read_unlock();
- }
// forward the write/update/whatever
MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
if (pushing[soid].empty()) {
dout(10) << "pushed " << soid << " to all replicas" << dendl;
finish_recovery_op(soid);
+ if (waiting_for_degraded_object.count(soid)) {
+ osd->take_waiters(waiting_for_degraded_object[soid]);
+ waiting_for_degraded_object.erase(soid);
+ }
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid] << dendl;
dout(10) << "on_role_change" << dendl;
// take object waiters
- for (hash_map<sobject_t, list<Message*> >::iterator it = waiting_for_missing_object.begin();
- it != waiting_for_missing_object.end();
- it++)
- osd->take_waiters(it->second);
- waiting_for_missing_object.clear();
+ take_object_waiters(waiting_for_missing_object);
+ take_object_waiters(waiting_for_degraded_object);
}
+
// clear state. called on recovery completion AND cancellation.
void ReplicatedPG::_clear_recovery_state()
{
return started;
}
+int ReplicatedPG::recover_object_replicas(const sobject_t& soid)
+{
+ int started = 0;
+
+ dout(10) << "recover_object_replicas " << soid << dendl;
+
+ ObjectContext *obc = lookup_object_context(soid);
+ if (obc) {
+ dout(10) << " ondisk_read_lock for " << soid << dendl;
+ obc->ondisk_read_lock();
+ }
+
+ start_recovery_op(soid);
+ started++;
+
+ // who needs it?
+ for (unsigned i=1; i<acting.size(); i++) {
+ int peer = acting[i];
+ if (peer_missing.count(peer) &&
+ peer_missing[peer].is_missing(soid))
+ push_to_replica(soid, peer);
+ }
+
+ if (obc) {
+ dout(10) << " ondisk_read_unlock on " << soid << dendl;
+ obc->ondisk_read_unlock();
+ put_object_context(obc);
+ }
+
+ return started;
+}
+
int ReplicatedPG::recover_replicas(int max)
{
int started = 0;
sobject_t soid = peer_missing[peer].rmissing.begin()->second;
eversion_t v = peer_missing[peer].rmissing.begin()->first;
- ObjectContext *obc = lookup_object_context(soid);
- if (obc) {
- dout(10) << " ondisk_read_lock for " << soid << dendl;
- obc->ondisk_read_lock();
- }
-
- start_recovery_op(soid);
- started++;
-
- push_to_replica(soid, peer);
-
- // do other peers need it too?
- for (i++; i<acting.size(); i++) {
- int peer = acting[i];
- if (peer_missing.count(peer) &&
- peer_missing[peer].is_missing(soid))
- push_to_replica(soid, peer);
- }
-
- if (obc) {
- dout(10) << " ondisk_read_unlock on " << soid << dendl;
- obc->ondisk_read_unlock();
- put_object_context(obc);
- }
+ started += recover_object_replicas(soid);
if (started >= max)
return started;