From e0ae6f9de685b60eee5efa5ca960c7a15481e41a Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 26 Aug 2014 04:35:57 +0000 Subject: [PATCH] Remove SequencerPosition from KeyValueStore Now KeyValueStore expects kv backend to ensure consistency and there is unusable for KeyValueStore to store a SequencerPosition. Signed-off-by: Haomai Wang Conflicts: src/os/KeyValueStore.cc src/os/KeyValueStore.h --- src/os/KeyValueStore.cc | 87 ++++------------------------------------- src/os/KeyValueStore.h | 51 ++++-------------------- 2 files changed, 16 insertions(+), 122 deletions(-) diff --git a/src/os/KeyValueStore.cc b/src/os/KeyValueStore.cc index f9a912abe99b..925578c75f71 100644 --- a/src/os/KeyValueStore.cc +++ b/src/os/KeyValueStore.cc @@ -69,41 +69,9 @@ const string KeyValueStore::COLLECTION_ATTR = "__COLL_ATTR__"; // ============== StripObjectMap Implementation ================= -void StripObjectMap::sync_wrap(StripObjectHeader &strip_header, - KeyValueDB::Transaction t, - const SequencerPosition &spos) -{ - dout(10) << __func__ << " cid: " << strip_header.cid << "oid: " - << strip_header.oid << " setting spos to " << strip_header.spos - << dendl; - strip_header.spos = spos; - strip_header.header->data.clear(); - ::encode(strip_header, strip_header.header->data); - - sync(strip_header.header, t); -} - -bool StripObjectMap::check_spos(const StripObjectHeader &header, - const SequencerPosition &spos) -{ - if (spos > header.spos) { - stringstream out; - dout(10) << "cid: " << "oid: " << header.oid - << " not skipping op, *spos " << spos << dendl; - dout(10) << " > header.spos " << header.spos << dendl; - return false; - } else { - dout(10) << "cid: " << "oid: " << header.oid << " skipping op, spos " - << spos << " <= header.spos " << header.spos << dendl; - return true; - } -} - int StripObjectMap::save_strip_header(StripObjectHeader &strip_header, - const SequencerPosition &spos, KeyValueDB::Transaction t) { - strip_header.spos = spos; strip_header.header->data.clear(); ::encode(strip_header, strip_header.header->data); @@ -265,28 +233,6 @@ int StripObjectMap::get_with_header(const StripObjectHeader &header, return 0; } -// =========== KeyValueStore::SubmitManager Implementation ============== - -uint64_t KeyValueStore::SubmitManager::op_submit_start() -{ - lock.Lock(); - uint64_t op = ++op_seq; - dout(10) << "op_submit_start " << op << dendl; - return op; -} - -void KeyValueStore::SubmitManager::op_submit_finish(uint64_t op) -{ - dout(10) << "op_submit_finish " << op << dendl; - if (op != op_submitted + 1) { - dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1) - << ", OUT OF ORDER" << dendl; - assert(0 == "out of order op_submit_finish"); - } - op_submitted = op; - lock.Unlock(); -} - // ========= KeyValueStore::BufferTransaction Implementation ============ @@ -419,9 +365,6 @@ void KeyValueStore::BufferTransaction::rename_buffer( StripObjectMap::StripObjectHeader &old_header, const coll_t &cid, const ghobject_t &oid) { - if (store->backend->check_spos(old_header, spos)) - return ; - // FIXME: Lacking of lock for origin header, it will cause other operation // can get the origin header while submitting transactions store->backend->rename_wrap(cid, oid, t, &old_header); @@ -438,13 +381,10 @@ int KeyValueStore::BufferTransaction::submit_transaction() header_iter != strip_headers.end(); ++header_iter) { StripObjectMap::StripObjectHeader header = header_iter->second; - if (store->backend->check_spos(header, spos)) - continue; - if (header.deleted) continue; - r = store->backend->save_strip_header(header, spos, t); + r = store->backend->save_strip_header(header, t); if (r < 0) { dout(10) << __func__ << " save strip header failed " << dendl; goto out; @@ -495,7 +435,7 @@ KeyValueStore::KeyValueStore(const std::string &base, ObjectStore(base), internal_name(name), basedir(base), - fsid_fd(-1), op_fd(-1), current_fd(-1), + fsid_fd(-1), current_fd(-1), kv_type(KV_TYPE_NONE), backend(NULL), ondisk_finisher(g_ceph_context), @@ -906,10 +846,6 @@ int KeyValueStore::umount() VOID_TEMP_FAILURE_RETRY(::close(fsid_fd)); fsid_fd = -1; } - if (op_fd >= 0) { - VOID_TEMP_FAILURE_RETRY(::close(op_fd)); - op_fd = -1; - } if (current_fd >= 0) { VOID_TEMP_FAILURE_RETRY(::close(current_fd)); current_fd = -1; @@ -963,14 +899,9 @@ int KeyValueStore::queue_transactions(Sequencer *posr, list &tls, Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op); op_queue_reserve_throttle(o, handle); - uint64_t op = submit_manager.op_submit_start(); - o->op = op; - dout(5) << "queue_transactions (trailing journal) " << op << " " - << tls < &tls, uint64_t op_seq, } int trans_num = 0; - SequencerPosition spos(op_seq, trans_num, 0); - BufferTransaction bt(this, spos); + BufferTransaction bt(this); for (list::iterator p = tls.begin(); p != tls.end(); ++p, trans_num++) { - r = _do_transaction(**p, bt, spos, handle); + r = _do_transaction(**p, bt, handle); if (r < 0) break; if (handle) @@ -1151,12 +1081,12 @@ int KeyValueStore::_do_transactions(list &tls, uint64_t op_seq, unsigned KeyValueStore::_do_transaction(Transaction& transaction, BufferTransaction &t, - SequencerPosition& spos, ThreadPool::TPHandle *handle) { dout(10) << "_do_transaction on " << &transaction << dendl; Transaction::iterator i = transaction.begin(); + uint64_t op_num = 0; while (i.have_op()) { if (handle) @@ -1495,8 +1425,7 @@ unsigned KeyValueStore::_do_transaction(Transaction& transaction, } dout(0) << " error " << cpp_strerror(r) << " not handled on operation " - << op << " (" << spos << ", or op " << spos.op - << ", counting from 0)" << dendl; + << op << " op " << op_num << ", counting from 0)" << dendl; dout(0) << msg << dendl; dout(0) << " transaction dump:\n"; JSONFormatter f(true); @@ -1513,7 +1442,7 @@ unsigned KeyValueStore::_do_transaction(Transaction& transaction, } } - spos.op++; + op_num++; } return 0; // FIXME count errors diff --git a/src/os/KeyValueStore.h b/src/os/KeyValueStore.h index 402b4cba92b6..28bfb281898d 100644 --- a/src/os/KeyValueStore.h +++ b/src/os/KeyValueStore.h @@ -36,7 +36,6 @@ using namespace std; #include "common/Mutex.h" #include "GenericObjectMap.h" -#include "SequencerPosition.h" #include "KeyValueDB.h" #include "include/uuid.h" @@ -65,7 +64,6 @@ class StripObjectMap: public GenericObjectMap { uint64_t strip_size; uint64_t max_size; vector bits; - SequencerPosition spos; // soft state Header header; // FIXME: Hold lock to avoid concurrent operations, it will @@ -82,7 +80,6 @@ class StripObjectMap: public GenericObjectMap { ::encode(strip_size, bl); ::encode(max_size, bl); ::encode(bits, bl); - ::encode(spos, bl); ENCODE_FINISH(bl); } @@ -91,23 +88,15 @@ class StripObjectMap: public GenericObjectMap { ::decode(strip_size, bl); ::decode(max_size, bl); ::decode(bits, bl); - ::decode(spos, bl); DECODE_FINISH(bl); } }; - bool check_spos(const StripObjectHeader &header, - const SequencerPosition &spos); - void sync_wrap(StripObjectHeader &strip_header, KeyValueDB::Transaction t, - const SequencerPosition &spos); - static int file_to_extents(uint64_t offset, size_t len, uint64_t strip_size, vector &extents); int lookup_strip_header(const coll_t & cid, const ghobject_t &oid, StripObjectHeader &header); - int save_strip_header(StripObjectHeader &header, - const SequencerPosition &spos, - KeyValueDB::Transaction t); + int save_strip_header(StripObjectHeader &header, KeyValueDB::Transaction t); int create_strip_header(const coll_t &cid, const ghobject_t &oid, StripObjectHeader &strip_header, KeyValueDB::Transaction t); @@ -161,7 +150,7 @@ class KeyValueStore : public ObjectStore, std::string current_op_seq_fn; uuid_d fsid; - int fsid_fd, op_fd, current_fd; + int fsid_fd, current_fd; enum kvstore_types kv_type; @@ -217,7 +206,6 @@ class KeyValueStore : public ObjectStore, KeyValueStore *store; - SequencerPosition spos; KeyValueDB::Transaction t; int lookup_cached_header(const coll_t &cid, const ghobject_t &oid, @@ -239,8 +227,7 @@ class KeyValueStore : public ObjectStore, const coll_t &cid, const ghobject_t &oid); int submit_transaction(); - BufferTransaction(KeyValueStore *store, - SequencerPosition &spos): store(store), spos(spos) { + BufferTransaction(KeyValueStore *store): store(store) { t = store->backend->get_transaction(); } }; @@ -257,9 +244,9 @@ class KeyValueStore : public ObjectStore, class OpSequencer : public Sequencer_impl { Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock) list q; - list jq; Cond cond; list > flush_commit_waiters; + uint64_t op; // used by flush() to know the sequence of op public: Sequencer *parent; Mutex apply_lock; // for apply mutual exclusion @@ -310,6 +297,8 @@ class KeyValueStore : public ObjectStore, void queue(Op *o) { Mutex::Locker l(qlock); q.push_back(o); + op++; + o->op = op; } Op *peek_queue() { assert(apply_lock.is_locked()); @@ -335,14 +324,11 @@ class KeyValueStore : public ObjectStore, uint64_t seq = 0; if (!q.empty()) seq = q.back()->op; - if (!jq.empty() && jq.back() > seq) - seq = jq.back(); if (seq) { // everything prior to our watermark to drain through either/both // queues - while ((!q.empty() && q.front()->op <= seq) || - (!jq.empty() && jq.front() <= seq)) + while (!q.empty() && q.front()->op <= seq) cond.Wait(qlock); } } @@ -360,7 +346,7 @@ class KeyValueStore : public ObjectStore, OpSequencer() : qlock("KeyValueStore::OpSequencer::qlock", false, false), - parent(0), + op(0), parent(0), apply_lock("KeyValueStore::OpSequencer::apply_lock", false, false) {} ~OpSequencer() { assert(q.empty()); @@ -477,7 +463,6 @@ class KeyValueStore : public ObjectStore, } unsigned _do_transaction(Transaction& transaction, BufferTransaction &bt, - SequencerPosition& spos, ThreadPool::TPHandle *handle); int queue_transactions(Sequencer *osr, list& tls, @@ -632,26 +617,6 @@ class KeyValueStore : public ObjectStore, static const string COLLECTION; static const string COLLECTION_ATTR; static const uint32_t COLLECTION_VERSION = 1; - - class SubmitManager { - Mutex lock; - uint64_t op_seq; - uint64_t op_submitted; - public: - SubmitManager() : - lock("JOS::SubmitManager::lock", false, true, false, g_ceph_context), - op_seq(0), op_submitted(0) - {} - uint64_t op_submit_start(); - void op_submit_finish(uint64_t op); - void set_op_seq(uint64_t seq) { - Mutex::Locker l(lock); - op_submitted = op_seq = seq; - } - uint64_t get_op_seq() { - return op_seq; - } - } submit_manager; }; WRITE_CLASS_ENCODER(StripObjectMap::StripObjectHeader) -- 2.47.3