]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: allow src_oids to be snapped
authorSage Weil <sage@newdream.net>
Mon, 6 Jun 2011 19:27:28 +0000 (12:27 -0700)
committerSage Weil <sage@newdream.net>
Mon, 6 Jun 2011 19:27:28 +0000 (12:27 -0700)
Signed-off-by: Sage Weil <sage@newdream.net>
src/include/rados.h
src/messages/MOSDOp.h
src/osd/OSD.cc
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h
src/osdc/Objecter.h

index 895e0573ae1b85254415f306b65c067d82cc0bd4..8ba67d67c19340be2448c0125c902a1788b6b62b 100644 (file)
@@ -400,7 +400,6 @@ struct ceph_osd_op {
                struct {
                        __le64 offset, length;
                        __le64 src_offset;
-                       __u8 src_oid_idx;
                } __attribute__ ((packed)) clonerange;
 };
        __le32 payload_len;
index 37a50ed6a0dc293ad61667ae7aa1ca019f8a8a67..193b5b0261401a95f6a9acb6e525639df30672fd 100644 (file)
@@ -174,8 +174,10 @@ public:
 
     for (unsigned i = 0; i < ops.size(); i++) {
       if (ceph_osd_op_type_multi(ops[i].op.op)) {
-       ops[i].op.payload_len = ops[i].oid.name.length();
-       data.append(ops[i].oid.name);
+       bufferlist bl;
+       ::encode(ops[i].soid, bl);
+       ops[i].op.payload_len = bl.length();
+       data.append(bl);
       } else {
        ops[i].op.payload_len = ops[i].data.length();
        data.append(ops[i].data);
@@ -329,8 +331,10 @@ struct ceph_osd_request_head {
        bufferlist t;
        ops[i].data.substr_of(t, off, ops[i].op.payload_len);
        off += ops[i].op.payload_len;
-        if (t.length())
-         oid.name = t.c_str();
+        if (t.length()) {
+         bufferlist::iterator p = t.begin();
+         ::decode(ops[i].soid, p);
+       }
       } else {
        ops[i].data.substr_of(data, off, ops[i].op.payload_len);
        off += ops[i].op.payload_len;
index 568ec2e4b40dac23674db0788ae1d2c4a5803668..d2dc49ca02fa82528ae634adfbbd50e9f0a0f658 100644 (file)
@@ -5097,11 +5097,11 @@ void OSD::handle_op(MOSDOp *op)
     for (vector<OSDOp>::const_iterator p = op->ops.begin();
         p != op->ops.end();
         ++p) {
-      if (p->oid.name.length() == 0)
+      if (p->soid.oid.name.length() == 0)
        continue;
-      sobject_t soid(p->oid, CEPH_NOSNAP);
-      if (pg->is_missing_object(soid)) {
-       pg->wait_for_missing_object(soid, op);
+      sobject_t head(p->soid.oid, CEPH_NOSNAP);
+      if (pg->is_missing_object(head)) {
+       pg->wait_for_missing_object(head, op);
        pg->unlock();
        return;
       }
@@ -5355,7 +5355,7 @@ int OSD::init_op_flags(MOSDOp *op)
       op->rmw_flags |= CEPH_OSD_FLAG_READ;
 
     // set READ flag if there are src_oids
-    if (iter->oid.name.length())
+    if (iter->soid.oid.name.length())
       op->rmw_flags |= CEPH_OSD_FLAG_READ;
 
     // set PGOP flag if there are PG ops
index ffc4386535c9f26ed5a10da2de3fad8d2f9477b6..71b2982a775609551c47122b95648c84f2fa86f1 100644 (file)
@@ -467,14 +467,26 @@ void ReplicatedPG::do_op(MOSDOp *op)
   dout(10) << "do_op mode now " << mode << dendl;
 
   // src_oids
-  map<object_t,ObjectContext*> src_obc;
+  map<sobject_t,ObjectContext*> src_obc;
   for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
     OSDOp& osd_op = *p;
-    if (osd_op.oid.name.length()) {
-      if (!src_obc.count(osd_op.oid)) {
+    if (osd_op.soid.oid.name.length()) {
+      if (!src_obc.count(osd_op.soid)) {
        ObjectContext *sobc;
        snapid_t ssnapid;
-       int r = find_object_context(osd_op.oid, op->get_object_locator(), CEPH_NOSNAP, &sobc, false, &ssnapid);
+       int r = find_object_context(osd_op.soid.oid, op->get_object_locator(), osd_op.soid.snap,
+                                   &sobc, false, &ssnapid);
+       if (r == -EAGAIN) {
+         // If we're not the primary of this OSD, and we have
+         // CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise,
+         // we have to wait for the object.
+         if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
+           // missing the specific snap we need; requeue and wait.
+           sobject_t soid(osd_op.soid.oid, ssnapid);
+           wait_for_missing_object(soid, op);
+           return;
+         }
+       }
        if (r) {
          osd->reply_op_error(op, r);
          return;
@@ -482,12 +494,12 @@ void ReplicatedPG::do_op(MOSDOp *op)
        if (sobc->obs.oi.oloc.key != obc->obs.oi.oloc.key &&
            sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
            sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
-         dout(1) << " src_oid " << osd_op.oid << " oloc " << sobc->obs.oi.oloc << " != "
+         dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != "
                  << op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
          osd->reply_op_error(op, -EINVAL);
          return;
        }
-       src_obc[osd_op.oid] = sobc;
+       src_obc[osd_op.soid] = sobc;
       }
     }
   }
@@ -1573,8 +1585,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
     case CEPH_OSD_OP_CLONERANGE:
       {
        bufferlist::iterator p = osd_op.data.begin();
-       object_t& src_oid = osd_op.oid;
-       ObjectContext *sobc = ctx->src_obc[src_oid];
+       ObjectContext *sobc = ctx->src_obc[osd_op.soid];
 
        if (!obs.exists)
          t.touch(coll, obs.oi.soid);
@@ -3029,9 +3040,9 @@ void ReplicatedPG::put_object_context(ObjectContext *obc)
   }
 }
 
-void ReplicatedPG::put_object_contexts(map<object_t,ObjectContext*>& obcv)
+void ReplicatedPG::put_object_contexts(map<sobject_t,ObjectContext*>& obcv)
 {
-  for (map<object_t,ObjectContext*>::iterator p = obcv.begin(); p != obcv.end(); ++p)
+  for (map<sobject_t,ObjectContext*>::iterator p = obcv.begin(); p != obcv.end(); ++p)
     put_object_context(p->second);
   obcv.clear();
 }
index ab391e9a03d3932e9a1f8d32500efc117302dcd7..c3b816ce26bdb8d47450e364f19c264bdc2a90ac 100644 (file)
@@ -355,7 +355,7 @@ public:
     vector<PG::Log::Entry> log;
 
     ObjectContext *obc;
-    map<object_t,ObjectContext*> src_obc;
+    map<sobject_t,ObjectContext*> src_obc;
     ObjectContext *clone_obc;    // if we created a clone
     ObjectContext *snapset_obc;  // if we created/deleted a snapdir
 
@@ -391,7 +391,7 @@ public:
 
     OpContext *ctx;
     ObjectContext *obc;
-    map<object_t,ObjectContext*> src_obc;
+    map<sobject_t,ObjectContext*> src_obc;
 
     tid_t rep_tid;
 
@@ -488,7 +488,7 @@ protected:
       register_snapset_context(obc->ssc);
   }
   void put_object_context(ObjectContext *obc);
-  void put_object_contexts(map<object_t,ObjectContext*>& obcv);
+  void put_object_contexts(map<sobject_t,ObjectContext*>& obcv);
   int find_object_context(const object_t& oid, const object_locator_t& oloc,
                          snapid_t snapid, ObjectContext **pobc,
                          bool can_create, snapid_t *psnapid=NULL);
index 980f2a6ee0708cff757c10512605dcadf6435faf..770cd11ad33a73403b86fa7052ed5ba812550ff2 100644 (file)
@@ -1510,7 +1510,7 @@ WRITE_CLASS_ENCODER(ScrubMap)
 struct OSDOp {
   ceph_osd_op op;
   bufferlist data;
-  object_t oid;
+  sobject_t soid;
 
   OSDOp() {
     memset(&op, 0, sizeof(ceph_osd_op));
@@ -1536,7 +1536,7 @@ inline ostream& operator<<(ostream& out, const OSDOp& op) {
       break;
     case CEPH_OSD_OP_CLONERANGE:
       out << " " << op.op.clonerange.offset << "~" << op.op.clonerange.length
-         << " from " << op.oid
+         << " from " << op.soid
          << " offset " << op.op.clonerange.src_offset;
       break;
     default:
index 9e133c8e187aa50c9e84d2e2bafb4c814b216a15..dcdf5088f2b50d417758a48ba1d65b493f79f57b 100644 (file)
@@ -65,14 +65,14 @@ struct ObjectOperation {
     ops[s].op.extent.length = len;
     ops[s].data.claim_append(bl);
   }
-  void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff) {
+  void add_clone_range(int op, uint64_t off, uint64_t len, const object_t& srcoid, uint64_t srcoff, snapid_t srcsnapid) {
     int s = ops.size();
     ops.resize(s+1);
     ops[s].op.op = op;
     ops[s].op.clonerange.offset = off;
     ops[s].op.clonerange.length = len;
     ops[s].op.clonerange.src_offset = srcoff;
-    ops[s].oid = srcoid;
+    ops[s].soid = sobject_t(srcoid, srcsnapid);
   }
   void add_xattr(int op, const char *name, const bufferlist& data) {
     int s = ops.size();
@@ -178,7 +178,7 @@ struct ObjectOperation {
   }
 
   void clone_range(const object_t& src_oid, uint64_t src_offset, uint64_t len, uint64_t dst_offset) {
-    add_clone_range(CEPH_OSD_OP_CLONERANGE, dst_offset, len, src_oid, src_offset);
+    add_clone_range(CEPH_OSD_OP_CLONERANGE, dst_offset, len, src_oid, src_offset, CEPH_NOSNAP);
   }
 
   // object attrs