]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/ReplicatedPG: stage object chunks to replicas during COPY_FROM
authorSage Weil <sage@inktank.com>
Thu, 5 Sep 2013 00:09:52 +0000 (17:09 -0700)
committerSage Weil <sage@inktank.com>
Tue, 17 Sep 2013 18:06:27 +0000 (11:06 -0700)
As we get each chunk of data during the COPY_FROM operation, write it out
to a temporary object on the replicas.  When we get all the pieces, move
it into place.

Signed-off-by: Sage Weil <sage@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/test/librados/misc.cc

index ae1c38e4aef5f915b9ed28e33b2abd4a81a14815..2318aba6f7a13bb27782c4034ab04929fe91088b 100644 (file)
@@ -3435,7 +3435,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        }
        ::encode(out_omap, osd_op.outdata);
 
-       dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
+       dout(20) << " cursor.is_complete=" << cursor.is_complete()
+                << " " << out_attrs.size() << " attrs"
+                << " " << bl.length() << " bytes"
+                << " " << out_omap.size() << " keys"
+                << dendl;
        ::encode(cursor, osd_op.outdata);
        result = 0;
       }
@@ -3477,20 +3481,29 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          } else {
            t.remove(coll, soid);
          }
-         t.write(coll, soid, 0, cop->data.length(), cop->data);
-         for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
-           t.setattr(coll, soid, string("_") + p->first, p->second);
-         t.omap_setkeys(coll, soid, cop->omap);
+
+         if (cop->temp_cursor.is_initial()) {
+           // write directly to final object
+           cop->temp_coll = coll;
+           cop->temp_oid = soid;
+           _write_copy_chunk(cop, &t);
+         } else {
+           // finish writing to temp object, then move into place
+           _write_copy_chunk(cop, &t);
+           t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, soid);
+           temp_contents.erase(cop->temp_oid);
+           ctx->old_temp_oid = cop->temp_oid;
+         }
 
          interval_set<uint64_t> ch;
          if (oi.size > 0)
            ch.insert(0, oi.size);
          ctx->modified_ranges.union_of(ch);
 
-         if (cop->data.length() != oi.size) {
+         if (cop->cursor.data_offset != oi.size) {
            ctx->delta_stats.num_bytes -= oi.size;
-           oi.size = cop->data.length();
            ctx->delta_stats.num_bytes += oi.size;
+           oi.size = cop->cursor.data_offset;
          }
          ctx->delta_stats.num_wr++;
          ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
@@ -4182,8 +4195,27 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
   }
   assert(cop->rval >= 0);
 
-  // FIXME: this is accumulating the entire object in memory.
   if (!cop->cursor.is_complete()) {
+    // write out what we have so far
+    vector<OSDOp> ops;
+    tid_t rep_tid = osd->get_tid();
+    osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+    OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this);
+    tctx->mtime = ceph_clock_now(g_ceph_context);
+    RepGather *repop = new_repop(tctx, ctx->obc, rep_tid);
+
+    if (cop->temp_cursor.is_initial()) {
+      cop->temp_coll = get_temp_coll(&tctx->local_t);
+      cop->temp_oid = generate_temp_object();
+      temp_contents.insert(cop->temp_oid);
+      repop->ctx->new_temp_oid = cop->temp_oid;
+    }
+
+    _write_copy_chunk(cop, &tctx->op_t);
+
+    issue_repop(repop, repop->ctx->mtime);
+    eval_repop(repop);
+
     dout(10) << __func__ << " fetching more" << dendl;
     _copy_some(ctx, cop);
     return;
@@ -4197,6 +4229,30 @@ void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
   ctx->copy_op.reset();
 }
 
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
+{
+  dout(20) << __func__ << " " << cop
+          << " " << cop->attrs.size() << " attrs"
+          << " " << cop->data.length() << " bytes"
+          << " " << cop->omap.size() << " keys"
+          << dendl;
+  if (!cop->temp_cursor.attr_complete) {
+    t->touch(cop->temp_coll, cop->temp_oid);
+    for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+      t->setattr(cop->temp_coll, cop->temp_oid, string("_") + p->first, p->second);
+    cop->attrs.clear();
+  }
+  if (!cop->temp_cursor.data_complete) {
+    t->write(cop->temp_coll, cop->temp_oid, cop->temp_cursor.data_offset, cop->data.length(), cop->data);
+    cop->data.clear();
+  }
+  if (!cop->temp_cursor.omap_complete) {
+    t->omap_setkeys(cop->temp_coll, cop->temp_oid, cop->omap);
+    cop->omap.clear();
+  }
+  cop->temp_cursor = cop->cursor;
+}
+
 void ReplicatedPG::cancel_copy(CopyOpRef cop)
 {
   OpContext *ctx = cop->ctx;
@@ -4565,7 +4621,8 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
     }
 
     // ship resulting transaction, log entries, and pg_stats
-    if (peer == backfill_target && soid >= backfill_pos) {
+    if (peer == backfill_target && soid >= backfill_pos &&
+       soid.pool == (int64_t)info.pgid.pool()) {  // only skip normal (not temp pool=-1) objects
       dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
               << backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
       ObjectStore::Transaction t;
@@ -4573,6 +4630,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
     } else {
       ::encode(repop->ctx->op_t, wr->get_data());
     }
+
     ::encode(repop->ctx->log, wr->logbl);
 
     if (backfill_target >= 0 && backfill_target == peer)
index f80be1b93919efeaeeff0d52a1addd410b4d5e51..ab874351274e256f55cc33c083f9bb8961905543 100644 (file)
@@ -109,6 +109,10 @@ public:
     map<string,bufferlist> omap;
     int rval;
 
+    coll_t temp_coll;
+    hobject_t temp_oid;
+    object_copy_cursor_t temp_cursor;
+
     CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
       : ctx(c), src(s), oloc(l), version(v),
        objecter_tid(0),
@@ -788,6 +792,7 @@ protected:
   int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
                 CopyOpRef *pcop);
   void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+  void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
   void _copy_some(OpContext *ctx, CopyOpRef cop);
   void cancel_copy(CopyOpRef cop);
   void requeue_cancel_copy_ops(bool requeue=true);
index af17847aeabbdc34573a24d74903856da279c2a7..24cb431261a5595cb9fa086898835b2ab66ea1f7 100644 (file)
@@ -592,7 +592,7 @@ TEST(LibRadosMisc, CopyPP) {
   ASSERT_TRUE(x.contents_equal(x2));
 
   // do a big object
-  bl.append(buffer::create(8000000));
+  bl.append(buffer::create(g_conf->osd_copyfrom_max_chunk * 3));
   bl.zero();
   bl.append("tail");
   blc = bl;