From: John Spray Date: Wed, 24 Sep 2014 13:19:32 +0000 (+0100) Subject: osdc/Filer: drop probe/purge locks before calling objecter X-Git-Tag: v0.86~17^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a8ac4b62a5e878c5658fda730a2f429297ba7c05;p=ceph.git osdc/Filer: drop probe/purge locks before calling objecter Fixes: #9562 Signed-off-by: John Spray (cherry picked from commit 8dc94a2d8ce3364c0d8d52f634e5fc967e43d819) --- diff --git a/src/osdc/Filer.cc b/src/osdc/Filer.cc index df23770e3b9b..84392de53b59 100644 --- a/src/osdc/Filer.cc +++ b/src/osdc/Filer.cc @@ -47,14 +47,13 @@ public: bool probe_complete; { - Mutex::Locker l(probe->lock); - - // TODO: handle this error. + probe->lock.Lock(); if (r != 0) { probe->err = r; } probe_complete = filer->_probed(probe, oid, size, mtime); + assert(!probe->lock.is_locked_by_me()); } if (probe_complete) { probe->onfinish->complete(probe->err); @@ -97,16 +96,17 @@ int Filer::probe(inodeno_t ino, probe->probing_off -= probe->probing_len; } - // Take lock before starting any I/Os, to protect us from concurrent calls - // back into C_Probe from OSD op completions from different OSDs - { - Mutex::Locker l(probe->lock); - _probe(probe); - } + probe->lock.Lock(); + _probe(probe); + assert(!probe->lock.is_locked_by_me()); + return 0; } +/** + * probe->lock must be initially locked, this function will release it + */ void Filer::_probe(Probe *probe) { assert(probe->lock.is_locked_by_me()); @@ -121,18 +121,27 @@ void Filer::_probe(Probe *probe) Striper::file_to_extents(cct, probe->ino, &probe->layout, probe->probing_off, probe->probing_len, 0, probe->probing); + std::vector stat_extents; for (vector::iterator p = probe->probing.begin(); p != probe->probing.end(); ++p) { ldout(cct, 10) << "_probe probing " << p->oid << dendl; - C_Probe *c = new C_Probe(this, probe, p->oid); - objecter->stat(p->oid, p->oloc, probe->snapid, &c->size, &c->mtime, - probe->flags | CEPH_OSD_FLAG_RWORDERED, c); probe->ops.insert(p->oid); + stat_extents.push_back(*p); + } + + probe->lock.Unlock(); + for (std::vector::iterator i = stat_extents.begin(); + i != stat_extents.end(); ++i) { + C_Probe *c = new C_Probe(this, probe, i->oid); + objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime, + probe->flags | CEPH_OSD_FLAG_RWORDERED, c); } } /** + * probe->lock must be initially held, and will be released by this function. + * * @return true if probe is complete and Probe object may be freed. */ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime) @@ -149,10 +158,13 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt assert(probe->ops.count(oid)); probe->ops.erase(oid); - if (!probe->ops.empty()) + if (!probe->ops.empty()) { + probe->lock.Unlock(); return false; // waiting for more! + } if (probe->err) { // we hit an error, propagate back up + probe->lock.Unlock(); return true; } @@ -226,15 +238,15 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt probe->probing_off -= period; } _probe(probe); + assert(!probe->lock.is_locked_by_me()); return false; - } - - if (probe->pmtime) { + } else if (probe->pmtime) { ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl; *probe->pmtime = probe->max_mtime; } // done! + probe->lock.Unlock(); return true; } @@ -308,20 +320,28 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin) return; } + std::vector remove_oids; + int max = 10 - pr->uncommitted; while (pr->num > 0 && max > 0) { - object_t oid = file_object_t(pr->ino, pr->first); - const OSDMap *osdmap = objecter->get_osdmap_read(); - object_locator_t oloc = osdmap->file_to_object_locator(pr->layout); - objecter->put_osdmap_read(); - objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags, - NULL, new C_PurgeRange(this, pr)); + remove_oids.push_back(file_object_t(pr->ino, pr->first)); pr->uncommitted++; pr->first++; pr->num--; max--; } pr->lock.Unlock(); + + // Issue objecter ops outside pr->lock to avoid lock dependency loop + for (std::vector::iterator i = remove_oids.begin(); + i != remove_oids.end(); ++i) { + const object_t oid = *i; + const OSDMap *osdmap = objecter->get_osdmap_read(); + const object_locator_t oloc = osdmap->file_to_object_locator(pr->layout); + objecter->put_osdmap_read(); + objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags, + NULL, new C_PurgeRange(this, pr)); + } }