}
::encode(out_omap, osd_op.outdata);
- dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
+ dout(20) << " cursor.is_complete=" << cursor.is_complete()
+ << " " << out_attrs.size() << " attrs"
+ << " " << bl.length() << " bytes"
+ << " " << out_omap.size() << " keys"
+ << dendl;
::encode(cursor, osd_op.outdata);
result = 0;
}
} else {
t.remove(coll, soid);
}
- t.write(coll, soid, 0, cop->data.length(), cop->data);
- for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
- t.setattr(coll, soid, string("_") + p->first, p->second);
- t.omap_setkeys(coll, soid, cop->omap);
+
+ if (cop->temp_cursor.is_initial()) {
+ // write directly to final object
+ cop->temp_coll = coll;
+ cop->temp_oid = soid;
+ _write_copy_chunk(cop, &t);
+ } else {
+ // finish writing to temp object, then move into place
+ _write_copy_chunk(cop, &t);
+ t.collection_move_rename(cop->temp_coll, cop->temp_oid, coll, soid);
+ temp_contents.erase(cop->temp_oid);
+ ctx->old_temp_oid = cop->temp_oid;
+ }
interval_set<uint64_t> ch;
if (oi.size > 0)
ch.insert(0, oi.size);
ctx->modified_ranges.union_of(ch);
- if (cop->data.length() != oi.size) {
+ if (cop->cursor.data_offset != oi.size) {
ctx->delta_stats.num_bytes -= oi.size;
- oi.size = cop->data.length();
ctx->delta_stats.num_bytes += oi.size;
+ oi.size = cop->cursor.data_offset;
}
ctx->delta_stats.num_wr++;
ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
}
assert(cop->rval >= 0);
- // FIXME: this is accumulating the entire object in memory.
if (!cop->cursor.is_complete()) {
+ // write out what we have so far
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &ctx->obc->obs, ctx->obc->ssc, this);
+ tctx->mtime = ceph_clock_now(g_ceph_context);
+ RepGather *repop = new_repop(tctx, ctx->obc, rep_tid);
+
+ if (cop->temp_cursor.is_initial()) {
+ cop->temp_coll = get_temp_coll(&tctx->local_t);
+ cop->temp_oid = generate_temp_object();
+ temp_contents.insert(cop->temp_oid);
+ repop->ctx->new_temp_oid = cop->temp_oid;
+ }
+
+ _write_copy_chunk(cop, &tctx->op_t);
+
+ issue_repop(repop, repop->ctx->mtime);
+ eval_repop(repop);
+
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(ctx, cop);
return;
ctx->copy_op.reset();
}
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
+{
+ dout(20) << __func__ << " " << cop
+ << " " << cop->attrs.size() << " attrs"
+ << " " << cop->data.length() << " bytes"
+ << " " << cop->omap.size() << " keys"
+ << dendl;
+ if (!cop->temp_cursor.attr_complete) {
+ t->touch(cop->temp_coll, cop->temp_oid);
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+ t->setattr(cop->temp_coll, cop->temp_oid, string("_") + p->first, p->second);
+ cop->attrs.clear();
+ }
+ if (!cop->temp_cursor.data_complete) {
+ t->write(cop->temp_coll, cop->temp_oid, cop->temp_cursor.data_offset, cop->data.length(), cop->data);
+ cop->data.clear();
+ }
+ if (!cop->temp_cursor.omap_complete) {
+ t->omap_setkeys(cop->temp_coll, cop->temp_oid, cop->omap);
+ cop->omap.clear();
+ }
+ cop->temp_cursor = cop->cursor;
+}
+
void ReplicatedPG::cancel_copy(CopyOpRef cop)
{
OpContext *ctx = cop->ctx;
}
// ship resulting transaction, log entries, and pg_stats
- if (peer == backfill_target && soid >= backfill_pos) {
+ if (peer == backfill_target && soid >= backfill_pos &&
+ soid.pool == (int64_t)info.pgid.pool()) { // only skip normal (not temp pool=-1) objects
dout(10) << "issue_repop shipping empty opt to osd." << peer << ", object beyond backfill_pos "
<< backfill_pos << ", last_backfill is " << pinfo.last_backfill << dendl;
ObjectStore::Transaction t;
} else {
::encode(repop->ctx->op_t, wr->get_data());
}
+
::encode(repop->ctx->log, wr->logbl);
if (backfill_target >= 0 && backfill_target == peer)