]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Remove SequencerPosition from KeyValueStore
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 26 Aug 2014 04:35:57 +0000 (04:35 +0000)
committerroot <root@ceph-test.(none)>
Tue, 26 Aug 2014 04:39:16 +0000 (04:39 +0000)
Now KeyValueStore expects kv backend to ensure consistency and there is unusable
for KeyValueStore to store a SequencerPosition.

Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
Conflicts:

src/os/KeyValueStore.cc
src/os/KeyValueStore.h

src/os/KeyValueStore.cc
src/os/KeyValueStore.h

index f9a912abe99bdddb51abe4a8f632021e04915519..925578c75f718078caa495f3bb368a28b14b7d26 100644 (file)
@@ -69,41 +69,9 @@ const string KeyValueStore::COLLECTION_ATTR = "__COLL_ATTR__";
 
 // ============== StripObjectMap Implementation =================
 
-void StripObjectMap::sync_wrap(StripObjectHeader &strip_header,
-                               KeyValueDB::Transaction t,
-                               const SequencerPosition &spos)
-{
-  dout(10) << __func__ << " cid: " << strip_header.cid << "oid: "
-           << strip_header.oid << " setting spos to " << strip_header.spos
-           << dendl;
-  strip_header.spos = spos;
-  strip_header.header->data.clear();
-  ::encode(strip_header, strip_header.header->data);
-
-  sync(strip_header.header, t);
-}
-
-bool StripObjectMap::check_spos(const StripObjectHeader &header,
-                                const SequencerPosition &spos)
-{
-  if (spos > header.spos) {
-    stringstream out;
-    dout(10) << "cid: " << "oid: " << header.oid
-             << " not skipping op, *spos " << spos << dendl;
-    dout(10) << " > header.spos " << header.spos << dendl;
-    return false;
-  } else {
-    dout(10) << "cid: " << "oid: " << header.oid << " skipping op, spos "
-             << spos << " <= header.spos " << header.spos << dendl;
-    return true;
-  }
-}
-
 int StripObjectMap::save_strip_header(StripObjectHeader &strip_header,
-                                      const SequencerPosition &spos,
                                       KeyValueDB::Transaction t)
 {
-  strip_header.spos = spos;
   strip_header.header->data.clear();
   ::encode(strip_header, strip_header.header->data);
 
@@ -265,28 +233,6 @@ int StripObjectMap::get_with_header(const StripObjectHeader &header,
 
   return 0;
 }
-// =========== KeyValueStore::SubmitManager Implementation ==============
-
-uint64_t KeyValueStore::SubmitManager::op_submit_start()
-{
-  lock.Lock();
-  uint64_t op = ++op_seq;
-  dout(10) << "op_submit_start " << op << dendl;
-  return op;
-}
-
-void KeyValueStore::SubmitManager::op_submit_finish(uint64_t op)
-{
-  dout(10) << "op_submit_finish " << op << dendl;
-  if (op != op_submitted + 1) {
-      dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
-          << ", OUT OF ORDER" << dendl;
-      assert(0 == "out of order op_submit_finish");
-  }
-  op_submitted = op;
-  lock.Unlock();
-}
-
 
 // ========= KeyValueStore::BufferTransaction Implementation ============
 
@@ -419,9 +365,6 @@ void KeyValueStore::BufferTransaction::rename_buffer(
     StripObjectMap::StripObjectHeader &old_header,
     const coll_t &cid, const ghobject_t &oid)
 {
-  if (store->backend->check_spos(old_header, spos))
-    return ;
-
   // FIXME: Lacking of lock for origin header, it will cause other operation
   // can get the origin header while submitting transactions
   store->backend->rename_wrap(cid, oid, t, &old_header);
@@ -438,13 +381,10 @@ int KeyValueStore::BufferTransaction::submit_transaction()
        header_iter != strip_headers.end(); ++header_iter) {
     StripObjectMap::StripObjectHeader header = header_iter->second;
 
-    if (store->backend->check_spos(header, spos))
-      continue;
-
     if (header.deleted)
       continue;
 
-    r = store->backend->save_strip_header(header, spos, t);
+    r = store->backend->save_strip_header(header, t);
     if (r < 0) {
       dout(10) << __func__ << " save strip header failed " << dendl;
       goto out;
@@ -495,7 +435,7 @@ KeyValueStore::KeyValueStore(const std::string &base,
   ObjectStore(base),
   internal_name(name),
   basedir(base),
-  fsid_fd(-1), op_fd(-1), current_fd(-1),
+  fsid_fd(-1), current_fd(-1),
   kv_type(KV_TYPE_NONE),
   backend(NULL),
   ondisk_finisher(g_ceph_context),
@@ -906,10 +846,6 @@ int KeyValueStore::umount()
     VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
     fsid_fd = -1;
   }
-  if (op_fd >= 0) {
-    VOID_TEMP_FAILURE_RETRY(::close(op_fd));
-    op_fd = -1;
-  }
   if (current_fd >= 0) {
     VOID_TEMP_FAILURE_RETRY(::close(current_fd));
     current_fd = -1;
@@ -963,14 +899,9 @@ int KeyValueStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
 
   Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op);
   op_queue_reserve_throttle(o, handle);
-  uint64_t op = submit_manager.op_submit_start();
-  o->op = op;
-  dout(5) << "queue_transactions (trailing journal) " << op << " "
-          << tls <<dendl;
+  dout(5) << "queue_transactions (trailing journal) " << " " << tls <<dendl;
   queue_op(osr, o);
 
-  submit_manager.op_submit_finish(op);
-
   return 0;
 }
 
@@ -1128,13 +1059,12 @@ int KeyValueStore::_do_transactions(list<Transaction*> &tls, uint64_t op_seq,
   }
 
   int trans_num = 0;
-  SequencerPosition spos(op_seq, trans_num, 0);
-  BufferTransaction bt(this, spos);
+  BufferTransaction bt(this);
 
   for (list<Transaction*>::iterator p = tls.begin();
        p != tls.end();
        ++p, trans_num++) {
-    r = _do_transaction(**p, bt, spos, handle);
+    r = _do_transaction(**p, bt, handle);
     if (r < 0)
       break;
     if (handle)
@@ -1151,12 +1081,12 @@ int KeyValueStore::_do_transactions(list<Transaction*> &tls, uint64_t op_seq,
 
 unsigned KeyValueStore::_do_transaction(Transaction& transaction,
                                         BufferTransaction &t,
-                                        SequencerPosition& spos,
                                         ThreadPool::TPHandle *handle)
 {
   dout(10) << "_do_transaction on " << &transaction << dendl;
 
   Transaction::iterator i = transaction.begin();
+  uint64_t op_num = 0;
 
   while (i.have_op()) {
     if (handle)
@@ -1495,8 +1425,7 @@ unsigned KeyValueStore::_do_transaction(Transaction& transaction,
         }
 
         dout(0) << " error " << cpp_strerror(r) << " not handled on operation "
-                << op << " (" << spos << ", or op " << spos.op
-                << ", counting from 0)" << dendl;
+                << op << " op " << op_num << ", counting from 0)" << dendl;
         dout(0) << msg << dendl;
         dout(0) << " transaction dump:\n";
         JSONFormatter f(true);
@@ -1513,7 +1442,7 @@ unsigned KeyValueStore::_do_transaction(Transaction& transaction,
       }
     }
 
-    spos.op++;
+    op_num++;
   }
 
   return 0;  // FIXME count errors
index 402b4cba92b656729336740b180d6ff299276647..28bfb281898d1bb2cd6ff548ff2e0bae7827febc 100644 (file)
@@ -36,7 +36,6 @@ using namespace std;
 
 #include "common/Mutex.h"
 #include "GenericObjectMap.h"
-#include "SequencerPosition.h"
 #include "KeyValueDB.h"
 
 #include "include/uuid.h"
@@ -65,7 +64,6 @@ class StripObjectMap: public GenericObjectMap {
     uint64_t strip_size;
     uint64_t max_size;
     vector<char> bits;
-    SequencerPosition spos;
 
     // soft state
     Header header; // FIXME: Hold lock to avoid concurrent operations, it will
@@ -82,7 +80,6 @@ class StripObjectMap: public GenericObjectMap {
       ::encode(strip_size, bl);
       ::encode(max_size, bl);
       ::encode(bits, bl);
-      ::encode(spos, bl);
       ENCODE_FINISH(bl);
     }
 
@@ -91,23 +88,15 @@ class StripObjectMap: public GenericObjectMap {
       ::decode(strip_size, bl);
       ::decode(max_size, bl);
       ::decode(bits, bl);
-      ::decode(spos, bl);
       DECODE_FINISH(bl);
     }
   };
 
-  bool check_spos(const StripObjectHeader &header,
-                  const SequencerPosition &spos);
-  void sync_wrap(StripObjectHeader &strip_header, KeyValueDB::Transaction t,
-                 const SequencerPosition &spos);
-
   static int file_to_extents(uint64_t offset, size_t len, uint64_t strip_size,
                              vector<StripExtent> &extents);
   int lookup_strip_header(const coll_t & cid, const ghobject_t &oid,
                           StripObjectHeader &header);
-  int save_strip_header(StripObjectHeader &header,
-                        const SequencerPosition &spos,
-                        KeyValueDB::Transaction t);
+  int save_strip_header(StripObjectHeader &header, KeyValueDB::Transaction t);
   int create_strip_header(const coll_t &cid, const ghobject_t &oid,
                           StripObjectHeader &strip_header,
                           KeyValueDB::Transaction t);
@@ -161,7 +150,7 @@ class KeyValueStore : public ObjectStore,
   std::string current_op_seq_fn;
   uuid_d fsid;
 
-  int fsid_fd, op_fd, current_fd;
+  int fsid_fd, current_fd;
 
   enum kvstore_types kv_type;
 
@@ -217,7 +206,6 @@ class KeyValueStore : public ObjectStore,
 
     KeyValueStore *store;
 
-    SequencerPosition spos;
     KeyValueDB::Transaction t;
 
     int lookup_cached_header(const coll_t &cid, const ghobject_t &oid,
@@ -239,8 +227,7 @@ class KeyValueStore : public ObjectStore,
                        const coll_t &cid, const ghobject_t &oid);
     int submit_transaction();
 
-    BufferTransaction(KeyValueStore *store,
-                      SequencerPosition &spos): store(store), spos(spos) {
+    BufferTransaction(KeyValueStore *store): store(store) {
       t = store->backend->get_transaction();
     }
   };
@@ -257,9 +244,9 @@ class KeyValueStore : public ObjectStore,
   class OpSequencer : public Sequencer_impl {
     Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
     list<Op*> q;
-    list<uint64_t> jq;
     Cond cond;
     list<pair<uint64_t, Context*> > flush_commit_waiters;
+    uint64_t op; // used by flush() to know the sequence of op
    public:
     Sequencer *parent;
     Mutex apply_lock;  // for apply mutual exclusion
@@ -310,6 +297,8 @@ class KeyValueStore : public ObjectStore,
     void queue(Op *o) {
       Mutex::Locker l(qlock);
       q.push_back(o);
+      op++;
+      o->op = op;
     }
     Op *peek_queue() {
       assert(apply_lock.is_locked());
@@ -335,14 +324,11 @@ class KeyValueStore : public ObjectStore,
       uint64_t seq = 0;
       if (!q.empty())
         seq = q.back()->op;
-      if (!jq.empty() && jq.back() > seq)
-        seq = jq.back();
 
       if (seq) {
         // everything prior to our watermark to drain through either/both
         // queues
-        while ((!q.empty() && q.front()->op <= seq) ||
-                (!jq.empty() && jq.front() <= seq))
+        while (!q.empty() && q.front()->op <= seq)
           cond.Wait(qlock);
       }
     }
@@ -360,7 +346,7 @@ class KeyValueStore : public ObjectStore,
 
     OpSequencer()
       : qlock("KeyValueStore::OpSequencer::qlock", false, false),
-       parent(0),
+        op(0), parent(0),
        apply_lock("KeyValueStore::OpSequencer::apply_lock", false, false) {}
     ~OpSequencer() {
       assert(q.empty());
@@ -477,7 +463,6 @@ class KeyValueStore : public ObjectStore,
   }
   unsigned _do_transaction(Transaction& transaction,
                            BufferTransaction &bt,
-                           SequencerPosition& spos,
                            ThreadPool::TPHandle *handle);
 
   int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
@@ -632,26 +617,6 @@ class KeyValueStore : public ObjectStore,
   static const string COLLECTION;
   static const string COLLECTION_ATTR;
   static const uint32_t COLLECTION_VERSION = 1;
-
-  class SubmitManager {
-    Mutex lock;
-    uint64_t op_seq;
-    uint64_t op_submitted;
-   public:
-    SubmitManager() :
-        lock("JOS::SubmitManager::lock", false, true, false, g_ceph_context),
-        op_seq(0), op_submitted(0)
-    {}
-    uint64_t op_submit_start();
-    void op_submit_finish(uint64_t op);
-    void set_op_seq(uint64_t seq) {
-        Mutex::Locker l(lock);
-        op_submitted = op_seq = seq;
-    }
-    uint64_t get_op_seq() {
-        return op_seq;
-    }
-  } submit_manager;
 };
 
 WRITE_CLASS_ENCODER(StripObjectMap::StripObjectHeader)