From: Dong Yuan Date: Tue, 2 Dec 2014 17:08:44 +0000 (+0000) Subject: osd: Transaction::append & Transaction::swap X-Git-Tag: v0.92~62^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d427ca35404a30e1f428859c3274e030f2f83ef6;p=ceph.git osd: Transaction::append & Transaction::swap Finish append and swap for new Transaction encode/decode layout. Since append will modify the op_bl now, we changed the order of append and swap in ReplicatedBackend::sub_op_modify and ReplicatedBackend::submit_transaction to avoid append call on op_t, so the op_t can be encode in message. Change-Id: I6fb421e0defdb092fb9732eef818e90291b039f5 Signed-off-by: Dong Yuan --- diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index c7347acc74e0..63eb4a109b8c 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -27,14 +27,14 @@ #include #include -#define OPS_PER_PTR 32 - #if defined(DARWIN) || defined(__FreeBSD__) #include #else #include /* or */ #endif /* DARWIN */ +#define OPS_PER_PTR 32 + class CephContext; using std::vector; @@ -516,11 +516,123 @@ public: std::swap(on_applied, other.on_applied); std::swap(on_commit, other.on_commit); std::swap(on_applied_sync, other.on_applied_sync); + + std::swap(use_tbl, other.use_tbl); tbl.swap(other.tbl); - } + std::swap(coll_index, other.coll_index); + std::swap(object_index, other.object_index); + std::swap(coll_id, other.coll_id); + std::swap(object_id, other.object_id); + op_bl.swap(other.op_bl); + data_bl.swap(other.data_bl); + } + + void _update_op(Op* op, + vector<__le32> &cm, + vector<__le32> &om) { + + switch (op->op) { + case OP_NOP: + case OP_STARTSYNC: + break; + + case OP_TOUCH: + case OP_REMOVE: + case OP_SETATTR: + case OP_SETATTRS: + case OP_RMATTR: + case OP_RMATTRS: + case OP_COLL_REMOVE: + case OP_OMAP_CLEAR: + case OP_OMAP_SETKEYS: + case OP_OMAP_RMKEYS: + case OP_OMAP_RMKEYRANGE: + case OP_OMAP_SETHEADER: + case OP_WRITE: + case OP_ZERO: + case OP_TRUNCATE: + case OP_CLONERANGE2: + case OP_SETALLOCHINT: + assert(op->cid < cm.size()); + assert(op->oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + break; + + case OP_CLONE: + assert(op->cid < cm.size()); + assert(op->oid < om.size()); + assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_MKCOLL: + case OP_RMCOLL: + case OP_COLL_SETATTR: + case OP_COLL_RMATTR: + case OP_COLL_SETATTRS: + case OP_COLL_HINT: + assert(op->cid < cm.size()); + op->cid = cm[op->cid]; + break; + + case OP_COLL_ADD: + assert(op->cid < cm.size()); + assert(op->oid < om.size()); + assert(op->dest_cid < om.size()); + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + op->oid = om[op->oid]; + break; + + case OP_COLL_MOVE_RENAME: + assert(op->cid < cm.size()); + assert(op->oid < om.size()); + assert(op->dest_cid < cm.size()); + assert(op->dest_oid < om.size()); + op->cid = cm[op->cid]; + op->oid = om[op->oid]; + op->dest_cid = cm[op->dest_cid]; + op->dest_oid = om[op->dest_oid]; + break; + + case OP_SPLIT_COLLECTION2: + assert(op->cid < cm.size()); + op->dest_cid = cm[op->dest_oid]; + op->cid = cm[op->cid]; + op->dest_cid = cm[op->dest_cid]; + break; + + default: + assert(0 == "Unkown OP"); + } + } + void _update_op_bl( + bufferlist& bl, + vector<__le32> &cm, + vector<__le32> &om) { + + list list = bl.buffers(); + std::list::iterator p; + + for(p = list.begin(); p != list.end(); p++) { + assert(p->length() % sizeof(Op) == 0); + + char* raw_p = p->c_str(); + char* raw_end = raw_p + p->length(); + while (raw_p < raw_end) { + _update_op((Op*)raw_p, cm, om); + raw_p += sizeof(Op); + } + } + } /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction void append(Transaction& other) { + assert(use_tbl == other.use_tbl); + data.ops += other.data.ops; if (other.data.largest_data_len > data.largest_data_len) { data.largest_data_len = other.data.largest_data_len; @@ -532,6 +644,37 @@ public: on_applied.splice(on_applied.end(), other.on_applied); on_commit.splice(on_commit.end(), other.on_commit); on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); + + //append coll_index & object_index + vector<__le32> cm(other.coll_index.size()); + map::iterator coll_index_p; + for (coll_index_p = other.coll_index.begin(); + coll_index_p != other.coll_index.end(); + coll_index_p++) { + cm[coll_index_p->second] = _get_coll_id(coll_index_p->first); + } + + vector<__le32> om(other.object_index.size()); + map::iterator object_index_p; + for (object_index_p = other.object_index.begin(); + object_index_p != other.object_index.end(); + object_index_p++) { + om[object_index_p->second] = _get_object_id(object_index_p->first); + } + + //update other.op_bl with cm & om + //When the other is appended to current transaction, all coll_index and + //object_index in other.op_buffer should be updated by new index of the + //combined transaction + _update_op_bl(other.op_bl, cm, om); + + //append op_bl + op_bl.append(other.op_bl); + //append data_bl + data_bl.append(other.data_bl); + } + void make_op_bl_continue() { + op_bl.get_contiguous(0, op_bl.length()); } /** Inquires about the Transaction as a whole. */ @@ -702,9 +845,11 @@ public: } ghobject_t get_oid(__le32 oid_id) { + assert(oid_id < objects.size()); return objects[oid_id]; } coll_t get_cid(__le32 cid_id) { + assert(cid_id < colls.size()); return colls[cid_id]; } uint32_t get_fadvise_flags() const { @@ -720,7 +865,7 @@ public: iterator_impl* iterator = NULL; if (t->use_tbl) { - assert("tbl is not supported" == 0); + assert("tbl encode is not supported" == 0); iterator = new map_iterator(t); } else { iterator = new map_iterator(t); @@ -1428,7 +1573,7 @@ public: osr(NULL), use_tbl(false), coll_id(0), - object_id(0) {} + object_id(0) { } Transaction(bufferlist::iterator &dp) : osr(NULL), @@ -1485,6 +1630,8 @@ public: ::decode(object_index, bl); data.decode(bl); use_tbl = false; + coll_id = coll_index.size(); + object_id = object_index.size(); } else { decode7_5(bl, struct_v); use_tbl = true; diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index c1c7217f91b6..13e085209125 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -583,8 +583,7 @@ void ReplicatedBackend::submit_transaction( trim_rollback_to, true, &local_t); - local_t.append(*op_t); - local_t.swap(*op_t); + (*op_t).append(local_t); op_t->register_on_applied_sync(on_local_applied_sync); op_t->register_on_applied( diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index e8b2cb76480c..485e6c1a4bee 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -8265,14 +8265,14 @@ void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op) op->mark_started(); - rm->localt.append(rm->opt); - rm->localt.register_on_commit( + rm->opt.append(rm->localt); + rm->opt.register_on_commit( parent->bless_context( new C_OSD_RepModifyCommit(this, rm))); - rm->localt.register_on_applied( + rm->opt.register_on_applied( parent->bless_context( new C_OSD_RepModifyApply(this, rm))); - parent->queue_transaction(&(rm->localt), op); + parent->queue_transaction(&(rm->opt), op); // op is cleaned up by oncommit/onapply when both are executed }