]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Transaction::append & Transaction::swap
authorDong Yuan <yuandong1222@gmail.com>
Tue, 2 Dec 2014 17:08:44 +0000 (17:08 +0000)
committerSage Weil <sage@redhat.com>
Tue, 6 Jan 2015 21:29:06 +0000 (13:29 -0800)
Finish append and swap for new Transaction encode/decode layout.

Since append will modify the op_bl now, we changed the order of append
and swap in ReplicatedBackend::sub_op_modify and
ReplicatedBackend::submit_transaction to avoid append call on op_t, so
the op_t can be encode in message.

Change-Id: I6fb421e0defdb092fb9732eef818e90291b039f5
Signed-off-by: Dong Yuan <yuandong1222@gmail.com>
src/os/ObjectStore.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedPG.cc

index c7347acc74e02f63261cdec5e8a0e8d686071c3f..63eb4a109b8cfb337d140e75e43594f9c3987247 100644 (file)
 #include <vector>
 #include <map>
 
-#define OPS_PER_PTR 32
-
 #if defined(DARWIN) || defined(__FreeBSD__)
 #include <sys/statvfs.h>
 #else
 #include <sys/vfs.h>    /* or <sys/statfs.h> */
 #endif /* DARWIN */
 
+#define OPS_PER_PTR 32
+
 class CephContext;
 
 using std::vector;
@@ -516,11 +516,123 @@ public:
       std::swap(on_applied, other.on_applied);
       std::swap(on_commit, other.on_commit);
       std::swap(on_applied_sync, other.on_applied_sync);
+
+      std::swap(use_tbl, other.use_tbl);
       tbl.swap(other.tbl);
-    }
 
+      std::swap(coll_index, other.coll_index);
+      std::swap(object_index, other.object_index);
+      std::swap(coll_id, other.coll_id);
+      std::swap(object_id, other.object_id);
+      op_bl.swap(other.op_bl);
+      data_bl.swap(other.data_bl);
+    }
+
+    void _update_op(Op* op,
+      vector<__le32> &cm,
+      vector<__le32> &om) {
+
+      switch (op->op) {
+      case OP_NOP:
+      case OP_STARTSYNC:
+        break;
+
+      case OP_TOUCH:
+      case OP_REMOVE:
+      case OP_SETATTR:
+      case OP_SETATTRS:
+      case OP_RMATTR:
+      case OP_RMATTRS:
+      case OP_COLL_REMOVE:
+      case OP_OMAP_CLEAR:
+      case OP_OMAP_SETKEYS:
+      case OP_OMAP_RMKEYS:
+      case OP_OMAP_RMKEYRANGE:
+      case OP_OMAP_SETHEADER:
+      case OP_WRITE:
+      case OP_ZERO:
+      case OP_TRUNCATE:
+      case OP_CLONERANGE2:
+      case OP_SETALLOCHINT:
+        assert(op->cid < cm.size());
+        assert(op->oid < om.size());
+        op->cid = cm[op->cid];
+        op->oid = om[op->oid];
+        break;
+
+      case OP_CLONE:
+        assert(op->cid < cm.size());
+        assert(op->oid < om.size());
+        assert(op->dest_oid < om.size());
+        op->cid = cm[op->cid];
+        op->oid = om[op->oid];
+        op->dest_oid = om[op->dest_oid];
+        break;
+
+      case OP_MKCOLL:
+      case OP_RMCOLL:
+      case OP_COLL_SETATTR:
+      case OP_COLL_RMATTR:
+      case OP_COLL_SETATTRS:
+      case OP_COLL_HINT:
+        assert(op->cid < cm.size());
+        op->cid = cm[op->cid];
+        break;
+
+      case OP_COLL_ADD:
+        assert(op->cid < cm.size());
+        assert(op->oid < om.size());
+        assert(op->dest_cid < om.size());
+        op->cid = cm[op->cid];
+        op->dest_cid = cm[op->dest_cid];
+        op->oid = om[op->oid];
+        break;
+
+      case OP_COLL_MOVE_RENAME:
+        assert(op->cid < cm.size());
+        assert(op->oid < om.size());
+        assert(op->dest_cid < cm.size());
+        assert(op->dest_oid < om.size());
+        op->cid = cm[op->cid];
+        op->oid = om[op->oid];
+        op->dest_cid = cm[op->dest_cid];
+        op->dest_oid = om[op->dest_oid];
+        break;
+
+      case OP_SPLIT_COLLECTION2:
+        assert(op->cid < cm.size());
+        op->dest_cid = cm[op->dest_oid];
+        op->cid = cm[op->cid];
+        op->dest_cid = cm[op->dest_cid];
+        break;
+
+      default:
+        assert(0 == "Unkown OP");
+      }
+    }
+    void _update_op_bl(
+      bufferlist& bl,
+      vector<__le32> &cm,
+      vector<__le32> &om) {
+
+      list<bufferptr> list = bl.buffers();
+      std::list<bufferptr>::iterator p;
+
+      for(p = list.begin(); p != list.end(); p++) {
+        assert(p->length() % sizeof(Op) == 0);
+
+        char* raw_p = p->c_str();
+        char* raw_end = raw_p + p->length();
+        while (raw_p < raw_end) {
+          _update_op((Op*)raw_p, cm, om);
+          raw_p += sizeof(Op);
+        }
+      }
+    }
     /// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction
     void append(Transaction& other) {
+      assert(use_tbl == other.use_tbl);
+
       data.ops += other.data.ops;
       if (other.data.largest_data_len > data.largest_data_len) {
        data.largest_data_len = other.data.largest_data_len;
@@ -532,6 +644,37 @@ public:
       on_applied.splice(on_applied.end(), other.on_applied);
       on_commit.splice(on_commit.end(), other.on_commit);
       on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync);
+
+      //append coll_index & object_index
+      vector<__le32> cm(other.coll_index.size());
+      map<coll_t, __le32>::iterator coll_index_p;
+      for (coll_index_p = other.coll_index.begin();
+           coll_index_p != other.coll_index.end();
+           coll_index_p++) {
+        cm[coll_index_p->second] = _get_coll_id(coll_index_p->first);
+      }
+
+      vector<__le32> om(other.object_index.size());
+      map<ghobject_t, __le32>::iterator object_index_p;
+      for (object_index_p = other.object_index.begin();
+           object_index_p != other.object_index.end();
+           object_index_p++) {
+        om[object_index_p->second] = _get_object_id(object_index_p->first);
+      }
+
+      //update other.op_bl with cm & om
+      //When the other is appended to current transaction, all coll_index and
+      //object_index in other.op_buffer should be updated by new index of the
+      //combined transaction
+      _update_op_bl(other.op_bl, cm, om);
+
+      //append op_bl
+      op_bl.append(other.op_bl);
+      //append data_bl
+      data_bl.append(other.data_bl);
+    }
+    void make_op_bl_continue() {
+      op_bl.get_contiguous(0, op_bl.length());
     }
 
     /** Inquires about the Transaction as a whole. */
@@ -702,9 +845,11 @@ public:
       }
 
       ghobject_t get_oid(__le32 oid_id) {
+        assert(oid_id < objects.size());
         return objects[oid_id];
       }
       coll_t get_cid(__le32 cid_id) {
+        assert(cid_id < colls.size());
         return colls[cid_id];
       }
       uint32_t get_fadvise_flags() const {
@@ -720,7 +865,7 @@ public:
         iterator_impl* iterator = NULL;
 
         if (t->use_tbl) {
-          assert("tbl is not supported" == 0);
+          assert("tbl encode is not supported" == 0);
           iterator = new map_iterator(t);
         } else {
           iterator = new map_iterator(t);
@@ -1428,7 +1573,7 @@ public:
       osr(NULL),
       use_tbl(false),
       coll_id(0),
-      object_id(0) {}
+      object_id(0) { }
 
     Transaction(bufferlist::iterator &dp) :
       osr(NULL),
@@ -1485,6 +1630,8 @@ public:
         ::decode(object_index, bl);
         data.decode(bl);
         use_tbl = false;
+        coll_id = coll_index.size();
+        object_id = object_index.size();
       } else {
         decode7_5(bl, struct_v);
         use_tbl = true;
index c1c7217f91b61f74c304e989875d54d0e560d6d9..13e0852091255573e67934e12d10591598a4b369 100644 (file)
@@ -583,8 +583,7 @@ void ReplicatedBackend::submit_transaction(
     trim_rollback_to,
     true,
     &local_t);
-  local_t.append(*op_t);
-  local_t.swap(*op_t);
+  (*op_t).append(local_t);
   
   op_t->register_on_applied_sync(on_local_applied_sync);
   op_t->register_on_applied(
index e8b2cb76480cef9d9ff0eec6c1a8664459eb5f1f..485e6c1a4bee98e4a2d5950be70d29102c1d082b 100644 (file)
@@ -8265,14 +8265,14 @@ void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
   
   op->mark_started();
 
-  rm->localt.append(rm->opt);
-  rm->localt.register_on_commit(
+  rm->opt.append(rm->localt);
+  rm->opt.register_on_commit(
     parent->bless_context(
       new C_OSD_RepModifyCommit(this, rm)));
-  rm->localt.register_on_applied(
+  rm->opt.register_on_applied(
     parent->bless_context(
       new C_OSD_RepModifyApply(this, rm)));
-  parent->queue_transaction(&(rm->localt), op);
+  parent->queue_transaction(&(rm->opt), op);
   // op is cleaned up by oncommit/onapply when both are executed
 }