dout(10) << "do_op mode now " << mode << dendl;
+ // are writes blocked by another object?
+ if (obc->blocked_by) {
+ dout(10) << "do_op writes for " << obc->obs.oi.soid << " blocked by "
+ << obc->blocked_by->obs.oi.soid << dendl;
+ wait_for_degraded_object(obc->blocked_by->obs.oi.soid, op);
+ put_object_context(obc);
+ return;
+ }
+
// src_oids
map<hobject_t,ObjectContext*> src_obc;
for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
osd->reply_op_error(op, r);
} else if (is_degraded_object(sobc->obs.oi.soid)) {
wait_for_degraded_object(sobc->obs.oi.soid, op);
+ dout(10) << " writes for " << obc->obs.oi.soid << " now blocked by "
+ << sobc->obs.oi.soid << dendl;
+ obc->blocked_by = sobc;
+ sobc->blocking.insert(obc);
} else if (sobc->obs.oi.oloc.key != obc->obs.oi.oloc.key &&
sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
osd->requeue_ops(this, waiting_for_degraded_object[soid]);
waiting_for_degraded_object.erase(soid);
}
- map<hobject_t, ObjectContext *>::iterator i =
- object_contexts.find(soid);
- if (i != object_contexts.end()) {
- populate_obc_watchers(i->second);
- }
+ finish_degraded_object(soid);
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
reply->put();
}
+void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
+{
+ dout(10) << "finish_degraded_object " << oid << dendl;
+ map<hobject_t, ObjectContext *>::iterator i = object_contexts.find(oid);
+ if (i != object_contexts.end()) {
+ populate_obc_watchers(i->second);
+ for (set<ObjectContext*>::iterator j = i->second->blocking.begin();
+ j != i->second->blocking.end();
+ i->second->blocking.erase(j++)) {
+ dout(10) << " no longer blocking writes for " << (*j)->obs.oi.soid << dendl;
+ (*j)->blocked_by = NULL;
+ }
+ }
+}
/** op_pull
* process request to pull an entire object.
// take object waiters
requeue_object_waiters(waiting_for_missing_object);
- requeue_object_waiters(waiting_for_degraded_object);
+ for (map<hobject_t,list<Message*> >::iterator p = waiting_for_degraded_object.begin();
+ p != waiting_for_degraded_object.end();
+ waiting_for_degraded_object.erase(p++)) {
+ osd->requeue_ops(this, p->second);
+ finish_degraded_object(p->first);
+ }
osd->requeue_ops(this, waiting_for_all_missing);
waiting_for_all_missing.clear();
Cond cond;
int unstable_writes, readers, writers_waiting, readers_waiting;
+ // set if writes for this object are blocked on another objects recovery
+ ObjectContext *blocked_by; // object blocking our writes
+ set<ObjectContext*> blocking; // objects whose writes we block
+
// any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
map<entity_name_t, OSD::Session *> watchers;
map<entity_name_t, Context *> unconnected_watchers;
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
void send_push_op_blank(const hobject_t& soid, int peer);
+ void finish_degraded_object(const hobject_t& oid);
+
// Cancels/resets pulls from peer
void check_recovery_op_pulls(const OSDMapRef map);
int pull(const hobject_t& oid, eversion_t v);