map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
assert(i != recovering.end());
- if (!is_delete) {
+ if (i->second && i->second->rwstate.recovery_read_marker) {
// recover missing won't have had an obc, but it gets filled in
// during on_local_recover
assert(i->second);
if (is_missing_object(soid)) {
recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
} else if (missing_loc.is_deleted(soid)) {
- prep_object_replica_deletes(soid, v, h);
+ prep_object_replica_deletes(soid, v, h, &work_started);
} else {
prep_object_replica_pushes(soid, v, h, &work_started);
}
int PrimaryLogPG::prep_object_replica_deletes(
const hobject_t& soid, eversion_t v,
- PGBackend::RecoveryHandle *h)
+ PGBackend::RecoveryHandle *h,
+ bool *work_started)
{
assert(is_primary());
dout(10) << __func__ << ": on " << soid << dendl;
+ ObjectContextRef obc = get_object_context(soid, false);
+ if (obc) {
+ if (!obc->get_recovery_read()) {
+ dout(20) << "replica delete delayed on " << soid
+ << "; could not get rw_manager lock" << dendl;
+ *work_started = true;
+ return 0;
+ } else {
+ dout(20) << "replica delete got recovery read lock on " << soid
+ << dendl;
+ }
+ }
+
start_recovery_op(soid);
assert(!recovering.count(soid));
- recovering.insert(make_pair(soid, ObjectContextRef()));
+ if (!obc)
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+ else
+ recovering.insert(make_pair(soid, obc));
pgbackend->recover_delete_object(soid, v, h);
return 1;
if (missing_loc.is_deleted(soid)) {
dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
- started += prep_object_replica_deletes(soid, r->second.need, h);
+ started += prep_object_replica_deletes(soid, r->second.need, h, work_started);
continue;
}