// ============== StripObjectMap Implementation =================
-void StripObjectMap::sync_wrap(StripObjectHeader &strip_header,
- KeyValueDB::Transaction t,
- const SequencerPosition &spos)
-{
- dout(10) << __func__ << " cid: " << strip_header.cid << "oid: "
- << strip_header.oid << " setting spos to " << strip_header.spos
- << dendl;
- strip_header.spos = spos;
- strip_header.header->data.clear();
- ::encode(strip_header, strip_header.header->data);
-
- sync(strip_header.header, t);
-}
-
-bool StripObjectMap::check_spos(const StripObjectHeader &header,
- const SequencerPosition &spos)
-{
- if (spos > header.spos) {
- stringstream out;
- dout(10) << "cid: " << "oid: " << header.oid
- << " not skipping op, *spos " << spos << dendl;
- dout(10) << " > header.spos " << header.spos << dendl;
- return false;
- } else {
- dout(10) << "cid: " << "oid: " << header.oid << " skipping op, spos "
- << spos << " <= header.spos " << header.spos << dendl;
- return true;
- }
-}
-
int StripObjectMap::save_strip_header(StripObjectHeader &strip_header,
- const SequencerPosition &spos,
KeyValueDB::Transaction t)
{
- strip_header.spos = spos;
strip_header.header->data.clear();
::encode(strip_header, strip_header.header->data);
return 0;
}
-// =========== KeyValueStore::SubmitManager Implementation ==============
-
-uint64_t KeyValueStore::SubmitManager::op_submit_start()
-{
- lock.Lock();
- uint64_t op = ++op_seq;
- dout(10) << "op_submit_start " << op << dendl;
- return op;
-}
-
-void KeyValueStore::SubmitManager::op_submit_finish(uint64_t op)
-{
- dout(10) << "op_submit_finish " << op << dendl;
- if (op != op_submitted + 1) {
- dout(0) << "op_submit_finish " << op << " expected " << (op_submitted + 1)
- << ", OUT OF ORDER" << dendl;
- assert(0 == "out of order op_submit_finish");
- }
- op_submitted = op;
- lock.Unlock();
-}
-
// ========= KeyValueStore::BufferTransaction Implementation ============
StripObjectMap::StripObjectHeader &old_header,
const coll_t &cid, const ghobject_t &oid)
{
- if (store->backend->check_spos(old_header, spos))
- return ;
-
// FIXME: Lacking of lock for origin header, it will cause other operation
// can get the origin header while submitting transactions
store->backend->rename_wrap(cid, oid, t, &old_header);
header_iter != strip_headers.end(); ++header_iter) {
StripObjectMap::StripObjectHeader header = header_iter->second;
- if (store->backend->check_spos(header, spos))
- continue;
-
if (header.deleted)
continue;
- r = store->backend->save_strip_header(header, spos, t);
+ r = store->backend->save_strip_header(header, t);
if (r < 0) {
dout(10) << __func__ << " save strip header failed " << dendl;
goto out;
ObjectStore(base),
internal_name(name),
basedir(base),
- fsid_fd(-1), op_fd(-1), current_fd(-1),
+ fsid_fd(-1), current_fd(-1),
kv_type(KV_TYPE_NONE),
backend(NULL),
ondisk_finisher(g_ceph_context),
VOID_TEMP_FAILURE_RETRY(::close(fsid_fd));
fsid_fd = -1;
}
- if (op_fd >= 0) {
- VOID_TEMP_FAILURE_RETRY(::close(op_fd));
- op_fd = -1;
- }
if (current_fd >= 0) {
VOID_TEMP_FAILURE_RETRY(::close(current_fd));
current_fd = -1;
Op *o = build_op(tls, ondisk, onreadable, onreadable_sync, osd_op);
op_queue_reserve_throttle(o, handle);
- uint64_t op = submit_manager.op_submit_start();
- o->op = op;
- dout(5) << "queue_transactions (trailing journal) " << op << " "
- << tls <<dendl;
+ dout(5) << "queue_transactions (trailing journal) " << " " << tls <<dendl;
queue_op(osr, o);
- submit_manager.op_submit_finish(op);
-
return 0;
}
}
int trans_num = 0;
- SequencerPosition spos(op_seq, trans_num, 0);
- BufferTransaction bt(this, spos);
+ BufferTransaction bt(this);
for (list<Transaction*>::iterator p = tls.begin();
p != tls.end();
++p, trans_num++) {
- r = _do_transaction(**p, bt, spos, handle);
+ r = _do_transaction(**p, bt, handle);
if (r < 0)
break;
if (handle)
unsigned KeyValueStore::_do_transaction(Transaction& transaction,
BufferTransaction &t,
- SequencerPosition& spos,
ThreadPool::TPHandle *handle)
{
dout(10) << "_do_transaction on " << &transaction << dendl;
Transaction::iterator i = transaction.begin();
+ uint64_t op_num = 0;
while (i.have_op()) {
if (handle)
}
dout(0) << " error " << cpp_strerror(r) << " not handled on operation "
- << op << " (" << spos << ", or op " << spos.op
- << ", counting from 0)" << dendl;
+ << op << " op " << op_num << ", counting from 0)" << dendl;
dout(0) << msg << dendl;
dout(0) << " transaction dump:\n";
JSONFormatter f(true);
}
}
- spos.op++;
+ op_num++;
}
return 0; // FIXME count errors
#include "common/Mutex.h"
#include "GenericObjectMap.h"
-#include "SequencerPosition.h"
#include "KeyValueDB.h"
#include "include/uuid.h"
uint64_t strip_size;
uint64_t max_size;
vector<char> bits;
- SequencerPosition spos;
// soft state
Header header; // FIXME: Hold lock to avoid concurrent operations, it will
::encode(strip_size, bl);
::encode(max_size, bl);
::encode(bits, bl);
- ::encode(spos, bl);
ENCODE_FINISH(bl);
}
::decode(strip_size, bl);
::decode(max_size, bl);
::decode(bits, bl);
- ::decode(spos, bl);
DECODE_FINISH(bl);
}
};
- bool check_spos(const StripObjectHeader &header,
- const SequencerPosition &spos);
- void sync_wrap(StripObjectHeader &strip_header, KeyValueDB::Transaction t,
- const SequencerPosition &spos);
-
static int file_to_extents(uint64_t offset, size_t len, uint64_t strip_size,
vector<StripExtent> &extents);
int lookup_strip_header(const coll_t & cid, const ghobject_t &oid,
StripObjectHeader &header);
- int save_strip_header(StripObjectHeader &header,
- const SequencerPosition &spos,
- KeyValueDB::Transaction t);
+ int save_strip_header(StripObjectHeader &header, KeyValueDB::Transaction t);
int create_strip_header(const coll_t &cid, const ghobject_t &oid,
StripObjectHeader &strip_header,
KeyValueDB::Transaction t);
std::string current_op_seq_fn;
uuid_d fsid;
- int fsid_fd, op_fd, current_fd;
+ int fsid_fd, current_fd;
enum kvstore_types kv_type;
KeyValueStore *store;
- SequencerPosition spos;
KeyValueDB::Transaction t;
int lookup_cached_header(const coll_t &cid, const ghobject_t &oid,
const coll_t &cid, const ghobject_t &oid);
int submit_transaction();
- BufferTransaction(KeyValueStore *store,
- SequencerPosition &spos): store(store), spos(spos) {
+ BufferTransaction(KeyValueStore *store): store(store) {
t = store->backend->get_transaction();
}
};
class OpSequencer : public Sequencer_impl {
Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
list<Op*> q;
- list<uint64_t> jq;
Cond cond;
list<pair<uint64_t, Context*> > flush_commit_waiters;
+ uint64_t op; // used by flush() to know the sequence of op
public:
Sequencer *parent;
Mutex apply_lock; // for apply mutual exclusion
void queue(Op *o) {
Mutex::Locker l(qlock);
q.push_back(o);
+ op++;
+ o->op = op;
}
Op *peek_queue() {
assert(apply_lock.is_locked());
uint64_t seq = 0;
if (!q.empty())
seq = q.back()->op;
- if (!jq.empty() && jq.back() > seq)
- seq = jq.back();
if (seq) {
// everything prior to our watermark to drain through either/both
// queues
- while ((!q.empty() && q.front()->op <= seq) ||
- (!jq.empty() && jq.front() <= seq))
+ while (!q.empty() && q.front()->op <= seq)
cond.Wait(qlock);
}
}
OpSequencer()
: qlock("KeyValueStore::OpSequencer::qlock", false, false),
- parent(0),
+ op(0), parent(0),
apply_lock("KeyValueStore::OpSequencer::apply_lock", false, false) {}
~OpSequencer() {
assert(q.empty());
}
unsigned _do_transaction(Transaction& transaction,
BufferTransaction &bt,
- SequencerPosition& spos,
ThreadPool::TPHandle *handle);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
static const string COLLECTION;
static const string COLLECTION_ATTR;
static const uint32_t COLLECTION_VERSION = 1;
-
- class SubmitManager {
- Mutex lock;
- uint64_t op_seq;
- uint64_t op_submitted;
- public:
- SubmitManager() :
- lock("JOS::SubmitManager::lock", false, true, false, g_ceph_context),
- op_seq(0), op_submitted(0)
- {}
- uint64_t op_submit_start();
- void op_submit_finish(uint64_t op);
- void set_op_seq(uint64_t seq) {
- Mutex::Locker l(lock);
- op_submitted = op_seq = seq;
- }
- uint64_t get_op_seq() {
- return op_seq;
- }
- } submit_manager;
};
WRITE_CLASS_ENCODER(StripObjectMap::StripObjectHeader)