list<OpRequestRef> waiting_for_active;
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
- waiting_for_degraded_object;
+ waiting_for_degraded_object,
+ waiting_for_blocked_object;
// Callbacks should assume pg (and nothing else) is locked
map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
op->mark_delayed("waiting for degraded object");
}
+void ReplicatedPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef op)
+{
+ dout(10) << __func__ << " " << soid << " " << op << dendl;
+ waiting_for_blocked_object[soid].push_back(op);
+ op->mark_delayed("waiting for blocked object");
+}
+
void ReplicatedPG::wait_for_backfill_pos(OpRequestRef op)
{
waiting_for_backfill_pos.push_back(op);
osd->reply_op_error(op, r);
return;
}
-
+
// make sure locator is consistent
object_locator_t oloc(obc->obs.oi.soid);
if (m->get_object_locator() != oloc) {
<< " op " << *m << "\n";
}
+ // io blocked on obc?
+ if (obc->is_blocked()) {
+ wait_for_blocked_object(obc->obs.oi.soid, op);
+ return;
+ }
+
if ((op->may_read()) && (obc->obs.oi.lost)) {
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
pgstat->stats.cat_sum[oi.category].add(stat);
}
+void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc)
+{
+ const hobject_t& soid = obc->obs.oi.soid;
+ map<hobject_t, list<OpRequestRef> >::iterator p = waiting_for_blocked_object.find(soid);
+ if (p == waiting_for_blocked_object.end())
+ return;
+
+ if (obc->is_blocked()) {
+ dout(10) << __func__ << " " << soid << " still blocked" << dendl;
+ return;
+ }
+
+ list<OpRequestRef>& ls = waiting_for_blocked_object[soid];
+ dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl;
+ requeue_ops(ls);
+ waiting_for_blocked_object.erase(soid);
+}
+
SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
{
Mutex::Locker l(snapset_contexts_lock);
p->second.clear();
finish_degraded_object(p->first);
}
+ for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_blocked_object.begin();
+ p != waiting_for_blocked_object.end();
+ waiting_for_blocked_object.erase(p++)) {
+ if (is_primary())
+ requeue_ops(p->second);
+ else
+ p->second.clear();
+ }
if (is_primary())
requeue_ops(waiting_for_all_missing);
bool is_degraded_object(const hobject_t& oid);
void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
+ void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
+ void kick_object_context_blocked(ObjectContextRef obc);
+
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);
ObjectContextRef mark_object_lost(ObjectStore::Transaction *t,