]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: COPY_GET operation
authorSage Weil <sage@inktank.com>
Tue, 27 Aug 2013 22:25:50 +0000 (15:25 -0700)
committerSage Weil <sage@inktank.com>
Fri, 30 Aug 2013 23:57:25 +0000 (16:57 -0700)
Add new rados operation to copy all user-visible content for an object
in a simple, safe way.  Use a new object_copy_cursor_t to keep track of
our position.

Signed-off-by: Sage Weil <sage@inktank.com>
src/include/ceph_strings.cc
src/include/encoding.h
src/include/rados.h
src/osd/ReplicatedPG.cc
src/osd/osd_types.cc
src/osd/osd_types.h
src/osdc/Objecter.h
src/test/encoding/types.h

index d46eca6aaf82dceb9f7e03f93831c12cb432416e..f14f29ce0e90743e5e2762dbb73686e2d9dc4dac 100644 (file)
@@ -48,6 +48,8 @@ const char *ceph_osd_op_name(int op)
        case CEPH_OSD_OP_TMAPPUT: return "tmapput";
        case CEPH_OSD_OP_WATCH: return "watch";
 
+       case CEPH_OSD_OP_COPY_GET: return "copy-get";
+
        case CEPH_OSD_OP_CLONERANGE: return "clonerange";
        case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
        case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
index 67c9af59d2be73980b08abdd1c1ab888982f3a4c..a091f7f69e91eba6ae0c649ce0e707b7a69c186b 100644 (file)
@@ -562,6 +562,17 @@ inline void decode(std::map<T,U>& m, bufferlist::iterator& p)
   }
 }
 template<class T, class U>
+inline void decode_noclear(std::map<T,U>& m, bufferlist::iterator& p)
+{
+  __u32 n;
+  decode(n, p);
+  while (n--) {
+    T k;
+    decode(k, p);
+    decode(m[k], p);
+  }
+}
+template<class T, class U>
 inline void encode_nohead(const std::map<T,U>& m, bufferlist& bl)
 {
   for (typename std::map<T,U>::const_iterator p = m.begin(); p != m.end(); ++p) {
index 9037606d154a9b535f2f3cb26ba0a5a85d955492..27291a7440e84bd51d2170c4ca78c8dc58f875a1 100644 (file)
@@ -217,6 +217,8 @@ enum {
        CEPH_OSD_OP_OMAPRMKEYS    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
        CEPH_OSD_OP_OMAP_CMP      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
 
+       CEPH_OSD_OP_COPY_GET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 27,
+
        /** multi **/
        CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
        CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
@@ -405,6 +407,9 @@ struct ceph_osd_op {
                        __le64 offset, length;
                        __le64 src_offset;
                } __attribute__ ((packed)) clonerange;
+               struct {
+                       __le64 max;     /* max data in reply */
+               } __attribute__ ((packed)) copy_get;
        };
        __le32 payload_len;
 } __attribute__ ((packed));
index 7d0811c4d96d611ee94dec508b0e4882624c2c76..ccef49a81b90a433c7989346d9bd40845ea3ae47 100644 (file)
@@ -3361,6 +3361,77 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
 
+    case CEPH_OSD_OP_COPY_GET:
+      ++ctx->num_read;
+      {
+       object_copy_cursor_t cursor;
+       uint64_t out_max;
+       try {
+         ::decode(cursor, bp);
+         ::decode(out_max, bp);
+       }
+       catch (buffer::error& e) {
+         result = -EINVAL;
+         goto fail;
+       }
+
+       // size, mtime
+       ::encode(oi.size, osd_op.outdata);
+       ::encode(oi.mtime, osd_op.outdata);
+
+       // attrs
+       map<string,bufferptr> out_attrs;
+       if (!cursor.attr_complete) {
+         result = osd->store->getattrs(coll, soid, out_attrs, true);
+         if (result < 0)
+           break;
+         cursor.attr_complete = true;
+       }
+       ::encode(out_attrs, osd_op.outdata);
+
+       int64_t left = out_max - osd_op.outdata.length();
+
+       // data
+       bufferlist bl;
+       if (left > 0 && !cursor.data_complete) {
+         if (cursor.data_offset < oi.size) {
+           result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl);
+           if (result < 0)
+             return result;
+           assert(result <= left);
+           left -= result;
+           cursor.data_offset += result;
+         }
+         if (cursor.data_offset == oi.size)
+           cursor.data_complete = true;
+       }
+       ::encode(bl, osd_op.outdata);
+
+       // omap
+       std::map<std::string,bufferlist> out_omap;
+       if (left > 0 && !cursor.omap_complete) {
+         ObjectMap::ObjectMapIterator iter = osd->store->get_omap_iterator(coll, oi.soid);
+         assert(iter);
+         if (iter->valid()) {
+           iter->upper_bound(cursor.omap_offset);
+           for (; left > 0 && iter->valid(); iter->next()) {
+             out_omap.insert(make_pair(iter->key(), iter->value()));
+             left -= iter->key().length() + 4 + iter->value().length() + 4;
+           }
+         }
+         if (iter->valid()) {
+           cursor.omap_offset = iter->key();
+         } else {
+           cursor.omap_complete = true;
+         }
+       }
+       ::encode(out_omap, osd_op.outdata);
+
+       ::encode(cursor, osd_op.outdata);
+       result = 0;
+      }
+      break;
+
 
     default:
       dout(1) << "unrecognized osd op " << op.op
index 6511d802952b04553b06c5719353aec36238fa1e..1c1b457002c07661fa4f2b5d3c889ca6e840aa2c 100644 (file)
@@ -2422,6 +2422,55 @@ void pg_missing_t::split_into(
   }
 }
 
+// -- object_copy_cursor_t --
+
+void object_copy_cursor_t::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(attr_complete, bl);
+  ::encode(data_offset, bl);
+  ::encode(data_complete, bl);
+  ::encode(omap_offset, bl);
+  ::encode(omap_complete, bl);
+  ENCODE_FINISH(bl);
+}
+
+void object_copy_cursor_t::decode(bufferlist::iterator &bl)
+{
+  DECODE_START(1, bl);
+  ::decode(attr_complete, bl);
+  ::decode(data_offset, bl);
+  ::decode(data_complete, bl);
+  ::decode(omap_offset, bl);
+  ::decode(omap_complete, bl);
+  DECODE_FINISH(bl);
+}
+
+void object_copy_cursor_t::dump(Formatter *f) const
+{
+  f->dump_unsigned("attr_complete", (int)attr_complete);
+  f->dump_unsigned("data_offset", data_offset);
+  f->dump_unsigned("data_complete", (int)data_complete);
+  f->dump_string("omap_offset", omap_offset);
+  f->dump_unsigned("omap_complete", (int)omap_complete);
+}
+
+void object_copy_cursor_t::generate_test_instances(list<object_copy_cursor_t*>& o)
+{
+  o.push_back(new object_copy_cursor_t);
+  o.push_back(new object_copy_cursor_t);
+  o.back()->attr_complete = true;
+  o.back()->data_offset = 123;
+  o.push_back(new object_copy_cursor_t);
+  o.back()->attr_complete = true;
+  o.back()->data_complete = true;
+  o.back()->omap_offset = "foo";
+  o.push_back(new object_copy_cursor_t);
+  o.back()->attr_complete = true;
+  o.back()->data_complete = true;
+  o.back()->omap_complete = true;
+}
+
 // -- pg_create_t --
 
 void pg_create_t::encode(bufferlist &bl) const
@@ -3433,6 +3482,9 @@ ostream& operator<<(ostream& out, const OSDOp& op)
       out << (op.op.watch.flag ? " add":" remove")
          << " cookie " << op.op.watch.cookie << " ver " << op.op.watch.ver;
       break;
+    case CEPH_OSD_OP_COPY_GET:
+      out << " max " << op.op.copy_get.max;
+      break;
     default:
       out << " " << op.op.extent.offset << "~" << op.op.extent.length;
       if (op.op.extent.truncate_seq)
index 3eb14246cc523397b72acddda87758d0bc580d43..00e9409c98a91751eb7e47c98c141402063c9cd7 100644 (file)
@@ -1816,6 +1816,37 @@ struct pg_ls_response_t {
 
 WRITE_CLASS_ENCODER(pg_ls_response_t)
 
+/**
+ * object_copy_cursor_t
+ */
+struct object_copy_cursor_t {
+  bool attr_complete;
+  uint64_t data_offset;
+  bool data_complete;
+  string omap_offset;
+  bool omap_complete;
+
+  object_copy_cursor_t()
+    : attr_complete(false),
+      data_offset(0),
+      data_complete(false),
+      omap_complete(false)
+  {}
+
+  bool is_initial() const {
+    return !attr_complete && data_offset == 0 && omap_offset.empty();
+  }
+  bool is_complete() const {
+    return attr_complete && data_complete && omap_complete;
+  }
+
+  static void generate_test_instances(list<object_copy_cursor_t*>& o);
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::iterator &bl);
+  void dump(Formatter *f) const;
+};
+WRITE_CLASS_ENCODER(object_copy_cursor_t)
+
 
 /**
  * pg creation info
index f4ef5b7629175a28c246b0aae394090e10059b00..91f625517291ce68237cb60e90c2cd91268604f0 100644 (file)
@@ -567,6 +567,82 @@ struct ObjectOperation {
     }
   }
 
+  struct C_ObjectOperation_copyget : public Context {
+    bufferlist bl;
+    object_copy_cursor_t *cursor;
+    uint64_t *out_size;
+    utime_t *out_mtime;
+    std::map<std::string,bufferlist> *out_attrs;
+    bufferlist *out_data;
+    std::map<std::string,bufferlist> *out_omap;
+    int *prval;
+    C_ObjectOperation_copyget(object_copy_cursor_t *c,
+                             uint64_t *s,
+                             utime_t *m,
+                             std::map<std::string,bufferlist> *a,
+                             bufferlist *d,
+                             std::map<std::string,bufferlist> *o,
+                             int *r)
+      : cursor(c),
+       out_size(s), out_mtime(m), out_attrs(a),
+       out_data(d), out_omap(o), prval(r) {}
+    void finish(int r) {
+      if (r < 0)
+       return;
+      try {
+       bufferlist::iterator p = bl.begin();
+       uint64_t size;
+       ::decode(size, p);
+       if (out_size)
+         *out_size = size;
+       utime_t mtime;
+       ::decode(mtime, p);
+       if (out_mtime)
+         *out_mtime = mtime;
+       if (out_attrs) {
+         ::decode_noclear(*out_attrs, p);
+       } else {
+         std::map<std::string,bufferlist> t;
+         ::decode(t, p);
+       }
+       bufferlist bl;
+       ::decode(bl, p);
+       if (out_data)
+         out_data->claim_append(bl);
+       if (out_omap) {
+         ::decode_noclear(*out_omap, p);
+       } else {
+         std::map<std::string,bufferlist> t;
+         ::decode(t, p);
+       }
+       ::decode(*cursor, p);
+      } catch (buffer::error& e) {
+       if (prval)
+         *prval = -EIO;
+      }
+    }
+  };
+
+  void copy_get(object_copy_cursor_t *cursor,
+               uint64_t max,
+               uint64_t *out_size,
+               utime_t *out_mtime,
+               std::map<std::string,bufferlist> *out_attrs,
+               bufferlist *out_data,
+               std::map<std::string,bufferlist> *out_omap,
+               int *prval) {
+    OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
+    osd_op.op.copy_get.max = max;
+    ::encode(*cursor, osd_op.indata);
+    ::encode(max, osd_op.indata);
+    unsigned p = ops.size() - 1;
+    out_rval[p] = prval;
+    C_ObjectOperation_copyget *h =
+      new C_ObjectOperation_copyget(cursor, out_size, out_mtime, out_attrs, out_data, out_omap, prval);
+    out_bl[p] = &h->bl;
+    out_handler[p] = h;
+  }
+
   void omap_get_header(bufferlist *bl, int *prval) {
     add_op(CEPH_OSD_OP_OMAPGETHEADER);
     unsigned p = ops.size() - 1;
index 213da6fccccc231efa830e6827497d4a091dabaf..a6f7cfb7883dfe92d21040ae94d9be3e299758d5 100644 (file)
@@ -53,6 +53,7 @@ TYPE(pg_log_t)
 TYPE(pg_missing_t::item)
 TYPE(pg_missing_t)
 TYPE(pg_ls_response_t)
+TYPE(object_copy_cursor_t)
 TYPE(pg_create_t)
 TYPE(watch_info_t)
 TYPE(object_info_t)