bool probe_complete;
{
- Mutex::Locker l(probe->lock);
-
- // TODO: handle this error.
+ probe->lock.Lock();
if (r != 0) {
probe->err = r;
}
probe_complete = filer->_probed(probe, oid, size, mtime);
+ assert(!probe->lock.is_locked_by_me());
}
if (probe_complete) {
probe->onfinish->complete(probe->err);
probe->probing_off -= probe->probing_len;
}
- // Take lock before starting any I/Os, to protect us from concurrent calls
- // back into C_Probe from OSD op completions from different OSDs
- {
- Mutex::Locker l(probe->lock);
- _probe(probe);
- }
+ probe->lock.Lock();
+ _probe(probe);
+ assert(!probe->lock.is_locked_by_me());
+
return 0;
}
+/**
+ * probe->lock must be initially locked, this function will release it
+ */
void Filer::_probe(Probe *probe)
{
assert(probe->lock.is_locked_by_me());
Striper::file_to_extents(cct, probe->ino, &probe->layout,
probe->probing_off, probe->probing_len, 0, probe->probing);
+ std::vector<ObjectExtent> stat_extents;
for (vector<ObjectExtent>::iterator p = probe->probing.begin();
p != probe->probing.end();
++p) {
ldout(cct, 10) << "_probe probing " << p->oid << dendl;
- C_Probe *c = new C_Probe(this, probe, p->oid);
- objecter->stat(p->oid, p->oloc, probe->snapid, &c->size, &c->mtime,
- probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
probe->ops.insert(p->oid);
+ stat_extents.push_back(*p);
+ }
+
+ probe->lock.Unlock();
+ for (std::vector<ObjectExtent>::iterator i = stat_extents.begin();
+ i != stat_extents.end(); ++i) {
+ C_Probe *c = new C_Probe(this, probe, i->oid);
+ objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime,
+ probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
}
}
/**
+ * probe->lock must be initially held, and will be released by this function.
+ *
* @return true if probe is complete and Probe object may be freed.
*/
bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime)
assert(probe->ops.count(oid));
probe->ops.erase(oid);
- if (!probe->ops.empty())
+ if (!probe->ops.empty()) {
+ probe->lock.Unlock();
return false; // waiting for more!
+ }
if (probe->err) { // we hit an error, propagate back up
+ probe->lock.Unlock();
return true;
}
probe->probing_off -= period;
}
_probe(probe);
+ assert(!probe->lock.is_locked_by_me());
return false;
- }
-
- if (probe->pmtime) {
+ } else if (probe->pmtime) {
ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
*probe->pmtime = probe->max_mtime;
}
// done!
+ probe->lock.Unlock();
return true;
}
return;
}
+ std::vector<object_t> remove_oids;
+
int max = 10 - pr->uncommitted;
while (pr->num > 0 && max > 0) {
- object_t oid = file_object_t(pr->ino, pr->first);
- const OSDMap *osdmap = objecter->get_osdmap_read();
- object_locator_t oloc = osdmap->file_to_object_locator(pr->layout);
- objecter->put_osdmap_read();
- objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
- NULL, new C_PurgeRange(this, pr));
+ remove_oids.push_back(file_object_t(pr->ino, pr->first));
pr->uncommitted++;
pr->first++;
pr->num--;
max--;
}
pr->lock.Unlock();
+
+ // Issue objecter ops outside pr->lock to avoid lock dependency loop
+ for (std::vector<object_t>::iterator i = remove_oids.begin();
+ i != remove_oids.end(); ++i) {
+ const object_t oid = *i;
+ const OSDMap *osdmap = objecter->get_osdmap_read();
+ const object_locator_t oloc = osdmap->file_to_object_locator(pr->layout);
+ objecter->put_osdmap_read();
+ objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
+ NULL, new C_PurgeRange(this, pr));
+ }
}