]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Filer: drop probe/purge locks before calling objecter 2612/head
authorJohn Spray <john.spray@redhat.com>
Wed, 24 Sep 2014 13:19:32 +0000 (14:19 +0100)
committerJohn Spray <john.spray@redhat.com>
Tue, 30 Sep 2014 17:46:25 +0000 (18:46 +0100)
Fixes: #9562
Signed-off-by: John Spray <john.spray@redhat.com>
(cherry picked from commit 8dc94a2d8ce3364c0d8d52f634e5fc967e43d819)

src/osdc/Filer.cc

index df23770e3b9b1d833c14197d4b4e989b71127efa..84392de53b59a877b1a2f48ebe5242314cb32f8a 100644 (file)
@@ -47,14 +47,13 @@ public:
 
     bool probe_complete;
     {
-      Mutex::Locker l(probe->lock);
-
-      // TODO: handle this error.
+      probe->lock.Lock();
       if (r != 0) {
         probe->err = r;
       }
 
       probe_complete = filer->_probed(probe, oid, size, mtime);
+      assert(!probe->lock.is_locked_by_me());
     }
     if (probe_complete) {
       probe->onfinish->complete(probe->err);
@@ -97,16 +96,17 @@ int Filer::probe(inodeno_t ino,
     probe->probing_off -= probe->probing_len;
   }
   
-  // Take lock before starting any I/Os, to protect us from concurrent calls
-  // back into C_Probe from OSD op completions from different OSDs
-  {
-    Mutex::Locker l(probe->lock);
-    _probe(probe);
-  }
+  probe->lock.Lock();
+  _probe(probe);
+  assert(!probe->lock.is_locked_by_me());
+
   return 0;
 }
 
 
+/**
+ * probe->lock must be initially locked, this function will release it
+ */
 void Filer::_probe(Probe *probe)
 {
   assert(probe->lock.is_locked_by_me());
@@ -121,18 +121,27 @@ void Filer::_probe(Probe *probe)
   Striper::file_to_extents(cct, probe->ino, &probe->layout,
                           probe->probing_off, probe->probing_len, 0, probe->probing);
   
+  std::vector<ObjectExtent> stat_extents;
   for (vector<ObjectExtent>::iterator p = probe->probing.begin();
        p != probe->probing.end();
        ++p) {
     ldout(cct, 10) << "_probe  probing " << p->oid << dendl;
-    C_Probe *c = new C_Probe(this, probe, p->oid);
-    objecter->stat(p->oid, p->oloc, probe->snapid, &c->size, &c->mtime, 
-                  probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
     probe->ops.insert(p->oid);
+    stat_extents.push_back(*p);
+  }
+
+  probe->lock.Unlock();
+  for (std::vector<ObjectExtent>::iterator i = stat_extents.begin();
+      i != stat_extents.end(); ++i) {
+    C_Probe *c = new C_Probe(this, probe, i->oid);
+    objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime, 
+                   probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
   }
 }
 
 /**
+ * probe->lock must be initially held, and will be released by this function.
+ *
  * @return true if probe is complete and Probe object may be freed.
  */
 bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime)
@@ -149,10 +158,13 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
   assert(probe->ops.count(oid));
   probe->ops.erase(oid);
 
-  if (!probe->ops.empty()) 
+  if (!probe->ops.empty()) {
+    probe->lock.Unlock();
     return false;  // waiting for more!
+  }
 
   if (probe->err) { // we hit an error, propagate back up
+    probe->lock.Unlock();
     return true;
   }
 
@@ -226,15 +238,15 @@ bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
       probe->probing_off -= period;
     }
     _probe(probe);
+    assert(!probe->lock.is_locked_by_me());
     return false;
-  }
-
-  if (probe->pmtime) {
+  } else if (probe->pmtime) {
     ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
     *probe->pmtime = probe->max_mtime;
   }
 
   // done!
+  probe->lock.Unlock();
   return true;
 }
 
@@ -308,20 +320,28 @@ void Filer::_do_purge_range(PurgeRange *pr, int fin)
     return;
   }
 
+  std::vector<object_t> remove_oids;
+
   int max = 10 - pr->uncommitted;
   while (pr->num > 0 && max > 0) {
-    object_t oid = file_object_t(pr->ino, pr->first);
-    const OSDMap *osdmap = objecter->get_osdmap_read();
-    object_locator_t oloc = osdmap->file_to_object_locator(pr->layout);
-    objecter->put_osdmap_read();
-    objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
-                    NULL, new C_PurgeRange(this, pr));
+    remove_oids.push_back(file_object_t(pr->ino, pr->first));
     pr->uncommitted++;
     pr->first++;
     pr->num--;
     max--;
   }
   pr->lock.Unlock();
+
+  // Issue objecter ops outside pr->lock to avoid lock dependency loop
+  for (std::vector<object_t>::iterator i = remove_oids.begin();
+      i != remove_oids.end(); ++i) {
+    const object_t oid = *i;
+    const OSDMap *osdmap = objecter->get_osdmap_read();
+    const object_locator_t oloc = osdmap->file_to_object_locator(pr->layout);
+    objecter->put_osdmap_read();
+    objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
+                    NULL, new C_PurgeRange(this, pr));
+  }
 }