]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/filestore: wait_for_apply on read ops
authorSage Weil <sage@redhat.com>
Sat, 27 Jan 2018 16:56:41 +0000 (10:56 -0600)
committerSage Weil <sage@redhat.com>
Mon, 12 Feb 2018 19:56:43 +0000 (13:56 -0600)
On any read, wait for any updates to the object to apply first.

Signed-off-by: Sage Weil <sage@redhat.com>
src/os/filestore/FileStore.cc
src/os/filestore/FileStore.h

index 02f41f4f3c00364934ea682ec51b1238f9dd9ac6..7e0b0a1ce1def100c2e9d541f224c9fae43ee5a0 100644 (file)
@@ -2124,9 +2124,6 @@ void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
   apply_manager.op_apply_finish(o->op);
   dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
           << ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
-
-  o->tls.clear();
-
 }
 
 void FileStore::_finish_op(OpSequencer *osr)
@@ -2134,6 +2131,8 @@ void FileStore::_finish_op(OpSequencer *osr)
   list<Context*> to_queue;
   Op *o = osr->dequeue(&to_queue);
 
+  o->tls.clear();
+
   utime_t lat = ceph_clock_now();
   lat -= o->start;
 
@@ -2159,7 +2158,6 @@ void FileStore::_finish_op(OpSequencer *osr)
   o = nullptr;
 }
 
-
 struct C_JournaledAhead : public Context {
   FileStore *fs;
   FileStore::OpSequencer *osr;
@@ -3206,6 +3204,8 @@ void FileStore::_do_transaction(
 bool FileStore::exists(CollectionHandle& ch, const ghobject_t& oid)
 {
   tracepoint(objectstore, exists_enter, ch->cid.c_str());
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(oid);
   struct stat st;
   bool retval = stat(ch, oid, &st) == 0;
   tracepoint(objectstore, exists_exit, retval);
@@ -3216,6 +3216,8 @@ int FileStore::stat(
   CollectionHandle& ch, const ghobject_t& oid, struct stat *st, bool allow_eio)
 {
   tracepoint(objectstore, stat_enter, ch->cid.c_str());
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(oid);
   const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
   int r = lfn_stat(cid, oid, st);
   assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
@@ -3257,6 +3259,9 @@ int FileStore::read(
 
   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
 
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(oid);
+
   FDRef fd;
   int r = lfn_open(cid, oid, false, &fd);
   if (r < 0) {
@@ -3466,6 +3471,9 @@ int FileStore::fiemap(CollectionHandle& ch, const ghobject_t& oid,
 
   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
 
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(oid);
+
   FDRef fd;
 
   int r = lfn_open(cid, oid, false, &fd);
@@ -4430,6 +4438,10 @@ int FileStore::getattr(CollectionHandle& ch, const ghobject_t& oid, const char *
   tracepoint(objectstore, getattr_enter, ch->cid.c_str());
   const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
   dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
+
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(oid);
+
   FDRef fd;
   int r = lfn_open(cid, oid, false, &fd);
   if (r < 0) {
@@ -4482,6 +4494,10 @@ int FileStore::getattrs(CollectionHandle& ch, const ghobject_t& oid, map<string,
   map<string, bufferlist> omap_aset;
   Index index;
   dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
+
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(oid);
+
   FDRef fd;
   bool spill_out = true;
   char buf[2];
@@ -5059,6 +5075,10 @@ int FileStore::omap_get(CollectionHandle& ch, const ghobject_t &hoid,
   tracepoint(objectstore, omap_get_enter, ch->cid.c_str());
   const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(hoid);
+
   Index index;
   int r = get_index(c, &index);
   if (r < 0)
@@ -5088,6 +5108,10 @@ int FileStore::omap_get_header(
   tracepoint(objectstore, omap_get_header_enter, ch->cid.c_str());
   const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(hoid);
+
   Index index;
   int r = get_index(c, &index);
   if (r < 0)
@@ -5113,6 +5137,10 @@ int FileStore::omap_get_keys(CollectionHandle& ch, const ghobject_t &hoid, set<s
   tracepoint(objectstore, omap_get_keys_enter, ch->cid.c_str());
   const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(hoid);
+
   Index index;
   int r = get_index(c, &index);
   if (r < 0)
@@ -5140,6 +5168,10 @@ int FileStore::omap_get_values(CollectionHandle& ch, const ghobject_t &hoid,
   tracepoint(objectstore, omap_get_values_enter, ch->cid.c_str());
   const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(hoid);
+
   Index index;
   const char *where = "()";
   int r = get_index(c, &index);
@@ -5178,6 +5210,9 @@ int FileStore::omap_check_keys(CollectionHandle& ch, const ghobject_t &hoid,
   const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
   dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
 
+  auto osr = static_cast<OpSequencer*>(ch.get());
+  osr->wait_for_apply(hoid);
+
   Index index;
   int r = get_index(c, &index);
   if (r < 0)
@@ -6048,3 +6083,49 @@ void FSSuperblock::generate_test_instances(list<FSSuperblock*>& o)
   z.omap_backend = "rocksdb";
   o.push_back(new FSSuperblock(z));
 }
+
+#undef dout_prefix
+#define dout_prefix *_dout << "filestore.osr(" << this << ") "
+
+void FileStore::OpSequencer::_register_apply(Op *o)
+{
+  if (o->registered_apply) {
+    dout(20) << __func__ << " " << o << " already registered" << dendl;
+    return;
+  }
+  o->registered_apply = true;
+  for (auto& t : o->tls) {
+    for (auto& i : t.get_object_index()) {
+      ++applying[i.first];
+      dout(20) << __func__ << " " << o << " " << i.first << " now " << applying[i.first]
+              << dendl;
+    }
+  }
+}
+
+void FileStore::OpSequencer::_unregister_apply(Op *o)
+{
+  assert(o->registered_apply);
+  for (auto& t : o->tls) {
+    for (auto& i : t.get_object_index()) {
+      auto p = applying.find(i.first);
+      assert(p != applying.end());
+      if (--p->second == 0) {
+       dout(20) << __func__ << " " << o << " " << i.first << " now 0, removing" << dendl;
+       applying.erase(p);
+      } else {
+       dout(20) << __func__ << " " << o << " " << i.first << " now " << p->second << dendl;
+      }
+    }
+  }
+}
+
+void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
+{
+  Mutex::Locker l(qlock);
+  while (applying.count(oid)) {
+    dout(20) << __func__ << " " << oid << " waiting" << dendl;
+    cond.Wait(qlock);
+  }
+  dout(20) << __func__ << " " << oid << " done" << dendl;
+}
index 88cadabc5efb3fec85bd3b50bbe1e33acc57d0aa..b4d2df045aab3808587373b6addd31bbf9f57ed7 100644 (file)
@@ -303,34 +303,9 @@ private:
       _register_apply(o);
       o->trace.keyval("queue depth", q.size());
     }
-    void _register_apply(Op *o) {
-      if (o->registered_apply)
-       return;
-      o->registered_apply = true;
-      for (auto& t : o->tls) {
-       for (auto& i : t.get_object_index()) {
-         ++applying[i.first];
-       }
-      }
-    }
-    void _unregister_apply(Op *o) {
-      assert(o->registered_apply);
-      for (auto& t : o->tls) {
-       for (auto& i : t.get_object_index()) {
-         auto p = applying.find(i.first);
-         assert(p != applying.end());
-         if (--p->second == 0) {
-           applying.erase(p);
-         }
-       }
-      }
-    }
-    void wait_for_apply(const ghobject_t& oid) {
-      Mutex::Locker l(qlock);
-      while (applying.count(oid)) {
-       cond.Wait(qlock);
-      }
-    }
+    void _register_apply(Op *o);
+    void _unregister_apply(Op *o);
+    void wait_for_apply(const ghobject_t& oid);
     Op *peek_queue() {
       Mutex::Locker l(qlock);
       assert(apply_lock.is_locked());