// otherwise, we'd have blocked in do_op
ceph_assert(oid.is_head());
ceph_assert(objects_blocked_on_snap_promotion.count(oid) == 0);
+ /*
+ * We block the head object here.
+ *
+ * Let's assume that there is racing read When the head object is being rollbacked.
+ * Since the two different ops can trigger promote_object() with the same source,
+ * infinite loop happens by canceling ops each other.
+ * To avoid this, we block the head object during rollback.
+ * So, the racing read will be blocked until the rollback is completed.
+ * see also: https://tracker.ceph.com/issues/49726
+ */
+ ObjectContextRef head_obc = get_object_context(oid, false);
+ head_obc->start_block();
objects_blocked_on_snap_promotion[oid] = obc;
wait_for_blocked_object(obc->obs.oi.soid, op);
}
copy_ops.erase(cop->obc->obs.oi.soid);
cop->obc->stop_block();
+ kick_object_context_blocked(cop->obc);
cop->results.should_requeue = requeue;
CopyCallbackResults result(-ECANCELED, &cop->results);
cop->cb->complete(result);
cop->obc = ObjectContextRef();
}
-void PrimaryLogPG::cancel_and_kick_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
{
dout(10) << __func__ << dendl;
map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
while (p != copy_ops.end()) {
- ObjectContextRef obc = p->second->obc;
// requeue this op? can I queue up all of them?
cancel_copy((p++)->second, requeue, tids);
- kick_object_context_blocked(obc);
}
}
pgstat->stats.sum.add(stat);
}
+void PrimaryLogPG::requeue_op_blocked_by_object(const hobject_t &soid) {
+ map<hobject_t, list<OpRequestRef>>::iterator p = waiting_for_blocked_object.find(soid);
+ if (p != waiting_for_blocked_object.end()) {
+ list<OpRequestRef>& ls = p->second;
+ dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl;
+ requeue_ops(ls);
+ waiting_for_blocked_object.erase(p);
+ }
+}
+
void PrimaryLogPG::kick_object_context_blocked(ObjectContextRef obc)
{
const hobject_t& soid = obc->obs.oi.soid;
return;
}
- map<hobject_t, list<OpRequestRef>>::iterator p = waiting_for_blocked_object.find(soid);
- if (p != waiting_for_blocked_object.end()) {
- list<OpRequestRef>& ls = p->second;
- dout(10) << __func__ << " " << soid << " requeuing " << ls.size() << " requests" << dendl;
- requeue_ops(ls);
- waiting_for_blocked_object.erase(p);
- }
+ requeue_op_blocked_by_object(soid);
map<hobject_t, ObjectContextRef>::iterator i =
objects_blocked_on_snap_promotion.find(obc->obs.oi.soid.get_head());
if (i != objects_blocked_on_snap_promotion.end()) {
ceph_assert(i->second == obc);
+ ObjectContextRef head_obc = get_object_context(i->first, false);
+ head_obc->stop_block();
+ // kick blocked ops (head)
+ requeue_op_blocked_by_object(i->first);
objects_blocked_on_snap_promotion.erase(i);
}
m_scrubber->unreg_next_scrub();
vector<ceph_tid_t> tids;
- cancel_and_kick_copy_ops(false, &tids);
+ cancel_copy_ops(false, &tids);
cancel_flush_ops(false, &tids);
cancel_proxy_ops(false, &tids);
cancel_manifest_ops(false, &tids);
requeue_ops(waiting_for_readable);
vector<ceph_tid_t> tids;
- cancel_and_kick_copy_ops(is_primary(), &tids);
+ cancel_copy_ops(is_primary(), &tids);
cancel_flush_ops(is_primary(), &tids);
cancel_proxy_ops(is_primary(), &tids);
cancel_manifest_ops(is_primary(), &tids);
void finish_copyfrom(CopyFromCallback *cb);
void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
void cancel_copy(CopyOpRef cop, bool requeue, std::vector<ceph_tid_t> *tids);
- void cancel_and_kick_copy_ops(bool requeue, std::vector<ceph_tid_t> *tids);
+ void cancel_copy_ops(bool requeue, std::vector<ceph_tid_t> *tids);
friend struct C_Copyfrom;
bool maybe_await_blocked_head(const hobject_t &soid, OpRequestRef op);
void wait_for_blocked_object(const hobject_t& soid, OpRequestRef op);
void kick_object_context_blocked(ObjectContextRef obc);
+ void requeue_op_blocked_by_object(const hobject_t &soid);
void maybe_force_recovery();