]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: initial COPY_FROM (not viable for large objects)
authorSage Weil <sage@inktank.com>
Wed, 28 Aug 2013 22:04:16 +0000 (15:04 -0700)
committerSage Weil <sage@inktank.com>
Tue, 3 Sep 2013 22:48:30 +0000 (15:48 -0700)
Initial pass at COPY_FROM implementation.  This uses COPY_GET to read an
object from another OSD and write it locally.  It chunks the read but
accumulates it all in-memory and commits it at once, so it is only suitable
for smaller objects.

Signed-off-by: Sage Weil <sage@inktank.com>
src/common/config_opts.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h
src/test/librados/misc.cc

index 328f7f4b94d079c9549bfe81b08b8f1963b6483d..2fa72d4ce0f003d7831b19ba39954077f42fea5a 100644 (file)
@@ -444,6 +444,7 @@ OPTION(osd_recovery_delay_start, OPT_FLOAT, 0)
 OPTION(osd_recovery_max_active, OPT_INT, 15)
 OPTION(osd_recovery_max_single_start, OPT_INT, 5)
 OPTION(osd_recovery_max_chunk, OPT_U64, 8<<20)  // max size of push chunk
+OPTION(osd_copyfrom_max_chunk, OPT_U64, 8<<20)   // max size of a COPYFROM chunk
 OPTION(osd_push_per_object_cost, OPT_U64, 1000)  // push cost per object
 OPTION(osd_max_push_cost, OPT_U64, 8<<20)  // max size of push message
 OPTION(osd_max_push_objects, OPT_U64, 10)  // max objects in single push op
index 0027edda077a32ac45e95cdc48992eb13e799443..2c96180b13a93cb48d69f553503644a5a3989b25 100644 (file)
@@ -1000,6 +1000,11 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
     p->second->ondisk_read_unlock();
   }
 
+  if (result == -EINPROGRESS) {
+    // come back later.
+    return;
+  }
+
   if (result == -EAGAIN) {
     // clean up after the ctx
     delete ctx;
@@ -3386,6 +3391,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          if (result < 0)
            break;
          cursor.attr_complete = true;
+         dout(20) << " got attrs" << dendl;
        }
        ::encode(out_attrs, osd_op.outdata);
 
@@ -3395,15 +3401,17 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        bufferlist bl;
        if (left > 0 && !cursor.data_complete) {
          if (cursor.data_offset < oi.size) {
-           result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl);
+           result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl);
            if (result < 0)
              return result;
            assert(result <= left);
            left -= result;
            cursor.data_offset += result;
          }
-         if (cursor.data_offset == oi.size)
+         if (cursor.data_offset == oi.size) {
            cursor.data_complete = true;
+           dout(20) << " got data" << dendl;
+         }
        }
        ::encode(bl, osd_op.outdata);
 
@@ -3423,15 +3431,73 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            cursor.omap_offset = iter->key();
          } else {
            cursor.omap_complete = true;
+           dout(20) << " got omap" << dendl;
          }
        }
        ::encode(out_omap, osd_op.outdata);
 
+       dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
        ::encode(cursor, osd_op.outdata);
        result = 0;
       }
       break;
 
+    case CEPH_OSD_OP_COPY_FROM:
+      ++ctx->num_write;
+      {
+       object_t src_name;
+       object_locator_t src_oloc;
+       snapid_t src_snapid = (uint64_t)op.copy_from.snapid;
+       version_t src_version = op.copy_from.src_version;
+       try {
+         ::decode(src_name, bp);
+         ::decode(src_oloc, bp);
+       }
+       catch (buffer::error& e) {
+         result = -EINVAL;
+         goto fail;
+       }
+       pg_t raw_pg;
+       get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
+       hobject_t src(src_name, src_oloc.key, src_snapid,
+                     raw_pg.ps(), raw_pg.pool(),
+                     src_oloc.nspace);
+       if (!ctx->copy_op) {
+         // start
+         result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op);
+         if (result < 0)
+           goto fail;
+         result = -EINPROGRESS;
+       } else {
+         // finish
+         CopyOpRef cop = ctx->copy_op;
+
+         if (!obs.exists) {
+           ctx->delta_stats.num_objects++;
+           obs.exists = true;
+         } else {
+           t.remove(coll, soid);
+         }
+         t.write(coll, soid, 0, cop->data.length(), cop->data);
+         for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+           t.setattr(coll, soid, string("_") + p->first, p->second);
+         t.omap_setkeys(coll, soid, cop->omap);
+
+         interval_set<uint64_t> ch;
+         if (oi.size > 0)
+           ch.insert(0, oi.size);
+         ctx->modified_ranges.union_of(ch);
+
+         if (cop->data.length() != oi.size) {
+           ctx->delta_stats.num_bytes -= oi.size;
+           oi.size = cop->data.length();
+           ctx->delta_stats.num_bytes += oi.size;
+         }
+         ctx->delta_stats.num_wr++;
+         ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
+       }
+      }
+      break;
 
     default:
       dout(1) << "unrecognized osd op " << op.op
@@ -4013,6 +4079,152 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
   return result;
 }
 
+// ========================================================================
+// copyfrom
+
+struct C_Copyfrom : public Context {
+  ReplicatedPGRef pg;
+  hobject_t oid;
+  epoch_t last_peering_reset;
+  tid_t tid;
+  C_Copyfrom(ReplicatedPG *p, hobject_t o, epoch_t lpr)
+    : pg(p), oid(o), last_peering_reset(lpr),
+      tid(0)
+  {}
+  void finish(int r) {
+    pg->lock();
+    if (last_peering_reset == pg->get_last_peering_reset()) {
+      pg->process_copy_chunk(oid, tid, r);
+    }
+    pg->unlock();
+  }
+};
+
+int ReplicatedPG::start_copy(OpContext *ctx,
+                            hobject_t src, object_locator_t oloc, version_t version,
+                            CopyOpRef *pcop)
+{
+  const hobject_t& dest = ctx->obs->oi.soid;
+  dout(10) << __func__ << " " << dest << " ctx " << ctx
+          << " from " << src << " " << oloc << " v" << version
+          << dendl;
+
+  // cancel a previous in-progress copy?
+  if (copy_ops.count(dest)) {
+    // FIXME: if the src etc match, we could avoid restarting from the
+    // beginning.
+    CopyOpRef cop = copy_ops[dest];
+    cancel_copy(cop);
+  }
+
+  CopyOpRef cop(new CopyOp(ctx, src, oloc, version));
+  copy_ops[dest] = cop;
+  ctx->copy_op = cop;
+  ++ctx->obc->copyfrom_readside;
+
+  _copy_some(ctx, cop);
+
+  return 0;
+}
+
+void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
+{
+  dout(10) << __func__ << " " << ctx << " " << cop << dendl;
+  ObjectOperation op;
+  op.assert_version(cop->version);
+  op.copy_get(&cop->cursor, g_conf->osd_copyfrom_max_chunk,
+             &cop->size, &cop->mtime, &cop->attrs,
+             &cop->data, &cop->omap,
+             &cop->rval);
+
+  C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid,
+                                  get_last_peering_reset());
+  osd->objecter_lock.Lock();
+  tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
+                                      cop->src.snap, NULL, 0,
+                                      new C_OnFinisher(fin,
+                                                       &osd->objecter_finisher),
+                                      NULL);
+  fin->tid = tid;
+  cop->objecter_tid = tid;
+  osd->objecter_lock.Unlock();
+}
+
+void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
+{
+  dout(10) << __func__ << " tid " << tid << " " << cpp_strerror(r) << dendl;
+  map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
+  if (p == copy_ops.end()) {
+    dout(10) << __func__ << " no copy_op found" << dendl;
+    return;
+  }
+  CopyOpRef cop = p->second;
+  if (tid != cop->objecter_tid) {
+    dout(10) << __func__ << " tid " << tid << " != cop " << cop
+            << " tid " << cop->objecter_tid << dendl;
+    return;
+  }
+  OpContext *ctx = cop->ctx;
+  cop->objecter_tid = 0;
+  if (r < 0) {
+    copy_ops.erase(ctx->obc->obs.oi.soid);
+    --ctx->obc->copyfrom_readside;
+    reply_ctx(ctx, r);
+    return;
+  }
+  assert(cop->rval >= 0);
+
+  // FIXME: this is accumulating the entire object in memory.
+
+  if (!cop->cursor.is_complete()) {
+    dout(10) << __func__ << " fetching more" << dendl;
+    _copy_some(ctx, cop);
+    return;
+  }
+
+  dout(20) << __func__ << " complete; committing" << dendl;
+  execute_ctx(ctx);
+
+  copy_ops.erase(ctx->obc->obs.oi.soid);
+  --ctx->obc->copyfrom_readside;
+  ctx->copy_op.reset();
+}
+
+void ReplicatedPG::cancel_copy(CopyOpRef cop)
+{
+  OpContext *ctx = cop->ctx;
+  dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx
+          << " from " << cop->src << " " << cop->oloc << " v" << cop->version
+          << dendl;
+
+  // cancel objecter op, if we can
+  if (cop->objecter_tid) {
+    Mutex::Locker l(osd->objecter_lock);
+    osd->objecter->op_cancel(cop->objecter_tid);
+  }
+
+  copy_ops.erase(ctx->obc->obs.oi.soid);
+  --ctx->obc->copyfrom_readside;
+  ctx->copy_op.reset();
+
+  delete ctx;
+}
+
+void ReplicatedPG::requeue_cancel_copy_ops(bool requeue)
+{
+  dout(10) << __func__ << dendl;
+  for (map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
+       p != copy_ops.end();
+       copy_ops.erase(p++)) {
+    // requeue initiating copy *and* any subsequent waiters
+    CopyOpRef cop = p->second;
+    if (requeue) {
+      cop->waiting.push_front(cop->ctx->op);
+      requeue_ops(cop->waiting);
+    }
+    cancel_copy(cop);
+  }
+}
 
 
 // ========================================================================
@@ -6736,6 +6948,7 @@ void ReplicatedPG::on_shutdown()
   deleting = true;
 
   unreg_next_scrub();
+  requeue_cancel_copy_ops(false);
   apply_and_flush_repops(false);
   context_registry_on_change();
 
@@ -6786,6 +6999,8 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
 
   context_registry_on_change();
 
+  requeue_cancel_copy_ops(is_primary());
+
   // requeue object waiters
   if (is_primary()) {
     requeue_ops(waiting_for_backfill_pos);
index 5dc7d882a8b23f55bec877999cda19f5b4236ec2..254b5842ffce90050627f69ceeae51d59f416bc2 100644 (file)
@@ -83,7 +83,40 @@ public:
 class ReplicatedPG : public PG {
   friend class OSD;
   friend class Watch;
-public:  
+
+public:
+
+  /*
+   * state associated with a copy operation
+   */
+  struct OpContext;
+
+  struct CopyOp {
+    OpContext *ctx;
+    hobject_t src;
+    object_locator_t oloc;
+    version_t version;
+
+    tid_t objecter_tid;
+
+    list<OpRequestRef> waiting;
+
+    object_copy_cursor_t cursor;
+    uint64_t size;
+    utime_t mtime;
+    map<string,bufferlist> attrs;
+    bufferlist data;
+    map<string,bufferlist> omap;
+    int rval;
+
+    CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
+      : ctx(c), src(s), oloc(l), version(v),
+       objecter_tid(0),
+       size(0),
+       rval(-1)
+    {}
+  };
+  typedef boost::shared_ptr<CopyOp> CopyOpRef;
 
   /*
    * Capture all object state associated with an in-progress read or write.
@@ -145,6 +178,8 @@ public:
     int num_read;    ///< count read ops
     int num_write;   ///< count update ops
 
+    CopyOpRef copy_op;
+
     OpContext(const OpContext& other);
     const OpContext& operator=(const OpContext& other);
 
@@ -749,6 +784,17 @@ protected:
 
   void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
 
+  // -- copyfrom --
+  map<hobject_t, CopyOpRef> copy_ops;
+
+  int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
+                CopyOpRef *pcop);
+  void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+  void _copy_some(OpContext *ctx, CopyOpRef cop);
+  void cancel_copy(CopyOpRef cop);
+  void requeue_cancel_copy_ops(bool requeue=true);
+
+  friend class C_Copyfrom;
 
   // -- scrub --
   virtual void _scrub(ScrubMap& map);
index 00e9409c98a91751eb7e47c98c141402063c9cd7..312eb81e3fd0f8db945422359443e0df3c2e4ea5 100644 (file)
@@ -2130,6 +2130,9 @@ public:
   Cond cond;
   int unstable_writes, readers, writers_waiting, readers_waiting;
 
+  /// in-progress copyfrom ops for this object
+  int copyfrom_readside;
+
   // set if writes for this object are blocked on another objects recovery
   ObjectContextRef blocked_by;      // object blocking our writes
   set<ObjectContextRef> blocking;   // objects whose writes we block
@@ -2141,7 +2144,8 @@ public:
     : ssc(NULL),
       destructor_callback(0),
       lock("ReplicatedPG::ObjectContext::lock"),
-      unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0) {}
+      unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
+      copyfrom_readside(0) {}
 
   ~ObjectContext() {
     if (destructor_callback)
index 9fe6427d3f8302dadfc66cad392884876bcd33a1..af17847aeabbdc34573a24d74903856da279c2a7 100644 (file)
@@ -571,18 +571,44 @@ TEST(LibRadosMisc, CopyPP) {
   IoCtx ioctx;
   ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
 
-  char buf[64];
-  memset(buf, 0xcc, sizeof(buf));
-  bufferlist bl;
-  bl.append(buf, sizeof(buf));
-
-  ASSERT_EQ(0, ioctx.write_full("foo", bl));
+  bufferlist bl, x;
+  bl.append("hi there");
+  x.append("bar");
 
+  // small object
+  bufferlist blc = bl;
+  bufferlist xc = x;
+  ASSERT_EQ(0, ioctx.write_full("foo", blc));
+  ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc));
 
   ObjectWriteOperation op;
-  op.copyfrom("foo", ioctx, ioctx.get_last_version());
-
-  ASSERT_EQ(0, ioctx.operate("bar", &op));
+  op.copy_from("foo", ioctx, ioctx.get_last_version());
+  ASSERT_EQ(0, ioctx.operate("foo.copy", &op));
+
+  bufferlist bl2, x2;
+  ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0));
+  ASSERT_TRUE(bl.contents_equal(bl2));
+  ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+  ASSERT_TRUE(x.contents_equal(x2));
+
+  // do a big object
+  bl.append(buffer::create(8000000));
+  bl.zero();
+  bl.append("tail");
+  blc = bl;
+  xc = x;
+  ASSERT_EQ(0, ioctx.write_full("big", blc));
+  ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc));
+
+  ObjectWriteOperation op2;
+  op.copy_from("big", ioctx, ioctx.get_last_version());
+  ASSERT_EQ(0, ioctx.operate("big.copy", &op));
+
+  bl2.clear();
+  ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0));
+  ASSERT_TRUE(bl.contents_equal(bl2));
+  ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+  ASSERT_TRUE(x.contents_equal(x2));
 
   ioctx.close();
   ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));