p->second->ondisk_read_unlock();
}
+ if (result == -EINPROGRESS) {
+ // come back later.
+ return;
+ }
+
if (result == -EAGAIN) {
// clean up after the ctx
delete ctx;
if (result < 0)
break;
cursor.attr_complete = true;
+ dout(20) << " got attrs" << dendl;
}
::encode(out_attrs, osd_op.outdata);
bufferlist bl;
if (left > 0 && !cursor.data_complete) {
if (cursor.data_offset < oi.size) {
- result = osd->store->read(coll, oi.soid, cursor.data_offset, out_max, bl);
+ result = osd->store->read(coll, oi.soid, cursor.data_offset, left, bl);
if (result < 0)
return result;
assert(result <= left);
left -= result;
cursor.data_offset += result;
}
- if (cursor.data_offset == oi.size)
+ if (cursor.data_offset == oi.size) {
cursor.data_complete = true;
+ dout(20) << " got data" << dendl;
+ }
}
::encode(bl, osd_op.outdata);
cursor.omap_offset = iter->key();
} else {
cursor.omap_complete = true;
+ dout(20) << " got omap" << dendl;
}
}
::encode(out_omap, osd_op.outdata);
+ dout(20) << " cursor.is_complete=" << cursor.is_complete() << dendl;
::encode(cursor, osd_op.outdata);
result = 0;
}
break;
+ case CEPH_OSD_OP_COPY_FROM:
+ ++ctx->num_write;
+ {
+ object_t src_name;
+ object_locator_t src_oloc;
+ snapid_t src_snapid = (uint64_t)op.copy_from.snapid;
+ version_t src_version = op.copy_from.src_version;
+ try {
+ ::decode(src_name, bp);
+ ::decode(src_oloc, bp);
+ }
+ catch (buffer::error& e) {
+ result = -EINVAL;
+ goto fail;
+ }
+ pg_t raw_pg;
+ get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
+ hobject_t src(src_name, src_oloc.key, src_snapid,
+ raw_pg.ps(), raw_pg.pool(),
+ src_oloc.nspace);
+ if (!ctx->copy_op) {
+ // start
+ result = start_copy(ctx, src, src_oloc, src_version, &ctx->copy_op);
+ if (result < 0)
+ goto fail;
+ result = -EINPROGRESS;
+ } else {
+ // finish
+ CopyOpRef cop = ctx->copy_op;
+
+ if (!obs.exists) {
+ ctx->delta_stats.num_objects++;
+ obs.exists = true;
+ } else {
+ t.remove(coll, soid);
+ }
+ t.write(coll, soid, 0, cop->data.length(), cop->data);
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin(); p != cop->attrs.end(); ++p)
+ t.setattr(coll, soid, string("_") + p->first, p->second);
+ t.omap_setkeys(coll, soid, cop->omap);
+
+ interval_set<uint64_t> ch;
+ if (oi.size > 0)
+ ch.insert(0, oi.size);
+ ctx->modified_ranges.union_of(ch);
+
+ if (cop->data.length() != oi.size) {
+ ctx->delta_stats.num_bytes -= oi.size;
+ oi.size = cop->data.length();
+ ctx->delta_stats.num_bytes += oi.size;
+ }
+ ctx->delta_stats.num_wr++;
+ ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(cop->data.length(), 10);
+ }
+ }
+ break;
default:
dout(1) << "unrecognized osd op " << op.op
return result;
}
+// ========================================================================
+// copyfrom
+
+struct C_Copyfrom : public Context {
+ ReplicatedPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ tid_t tid;
+ C_Copyfrom(ReplicatedPG *p, hobject_t o, epoch_t lpr)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0)
+ {}
+ void finish(int r) {
+ pg->lock();
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ pg->process_copy_chunk(oid, tid, r);
+ }
+ pg->unlock();
+ }
+};
+
+int ReplicatedPG::start_copy(OpContext *ctx,
+ hobject_t src, object_locator_t oloc, version_t version,
+ CopyOpRef *pcop)
+{
+ const hobject_t& dest = ctx->obs->oi.soid;
+ dout(10) << __func__ << " " << dest << " ctx " << ctx
+ << " from " << src << " " << oloc << " v" << version
+ << dendl;
+
+ // cancel a previous in-progress copy?
+ if (copy_ops.count(dest)) {
+ // FIXME: if the src etc match, we could avoid restarting from the
+ // beginning.
+ CopyOpRef cop = copy_ops[dest];
+ cancel_copy(cop);
+ }
+
+ CopyOpRef cop(new CopyOp(ctx, src, oloc, version));
+ copy_ops[dest] = cop;
+ ctx->copy_op = cop;
+ ++ctx->obc->copyfrom_readside;
+
+ _copy_some(ctx, cop);
+
+ return 0;
+}
+
+void ReplicatedPG::_copy_some(OpContext *ctx, CopyOpRef cop)
+{
+ dout(10) << __func__ << " " << ctx << " " << cop << dendl;
+ ObjectOperation op;
+ op.assert_version(cop->version);
+ op.copy_get(&cop->cursor, g_conf->osd_copyfrom_max_chunk,
+ &cop->size, &cop->mtime, &cop->attrs,
+ &cop->data, &cop->omap,
+ &cop->rval);
+
+ C_Copyfrom *fin = new C_Copyfrom(this, ctx->obs->oi.soid,
+ get_last_peering_reset());
+ osd->objecter_lock.Lock();
+ tid_t tid = osd->objecter->read(cop->src.oid, cop->oloc, op,
+ cop->src.snap, NULL, 0,
+ new C_OnFinisher(fin,
+ &osd->objecter_finisher),
+ NULL);
+ fin->tid = tid;
+ cop->objecter_tid = tid;
+ osd->objecter_lock.Unlock();
+}
+
+void ReplicatedPG::process_copy_chunk(hobject_t oid, tid_t tid, int r)
+{
+ dout(10) << __func__ << " tid " << tid << " " << cpp_strerror(r) << dendl;
+ map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
+ if (p == copy_ops.end()) {
+ dout(10) << __func__ << " no copy_op found" << dendl;
+ return;
+ }
+ CopyOpRef cop = p->second;
+ if (tid != cop->objecter_tid) {
+ dout(10) << __func__ << " tid " << tid << " != cop " << cop
+ << " tid " << cop->objecter_tid << dendl;
+ return;
+ }
+ OpContext *ctx = cop->ctx;
+ cop->objecter_tid = 0;
+ if (r < 0) {
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ reply_ctx(ctx, r);
+ return;
+ }
+ assert(cop->rval >= 0);
+
+ // FIXME: this is accumulating the entire object in memory.
+
+ if (!cop->cursor.is_complete()) {
+ dout(10) << __func__ << " fetching more" << dendl;
+ _copy_some(ctx, cop);
+ return;
+ }
+
+ dout(20) << __func__ << " complete; committing" << dendl;
+ execute_ctx(ctx);
+
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ ctx->copy_op.reset();
+}
+
+void ReplicatedPG::cancel_copy(CopyOpRef cop)
+{
+ OpContext *ctx = cop->ctx;
+ dout(10) << __func__ << " " << ctx->obc->obs.oi.soid << " ctx " << ctx
+ << " from " << cop->src << " " << cop->oloc << " v" << cop->version
+ << dendl;
+
+ // cancel objecter op, if we can
+ if (cop->objecter_tid) {
+ Mutex::Locker l(osd->objecter_lock);
+ osd->objecter->op_cancel(cop->objecter_tid);
+ }
+
+ copy_ops.erase(ctx->obc->obs.oi.soid);
+ --ctx->obc->copyfrom_readside;
+ ctx->copy_op.reset();
+
+ delete ctx;
+}
+
+void ReplicatedPG::requeue_cancel_copy_ops(bool requeue)
+{
+ dout(10) << __func__ << dendl;
+ for (map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
+ p != copy_ops.end();
+ copy_ops.erase(p++)) {
+ // requeue initiating copy *and* any subsequent waiters
+ CopyOpRef cop = p->second;
+ if (requeue) {
+ cop->waiting.push_front(cop->ctx->op);
+ requeue_ops(cop->waiting);
+ }
+ cancel_copy(cop);
+ }
+}
// ========================================================================
deleting = true;
unreg_next_scrub();
+ requeue_cancel_copy_ops(false);
apply_and_flush_repops(false);
context_registry_on_change();
context_registry_on_change();
+ requeue_cancel_copy_ops(is_primary());
+
// requeue object waiters
if (is_primary()) {
requeue_ops(waiting_for_backfill_pos);
class ReplicatedPG : public PG {
friend class OSD;
friend class Watch;
-public:
+
+public:
+
+ /*
+ * state associated with a copy operation
+ */
+ struct OpContext;
+
+ struct CopyOp {
+ OpContext *ctx;
+ hobject_t src;
+ object_locator_t oloc;
+ version_t version;
+
+ tid_t objecter_tid;
+
+ list<OpRequestRef> waiting;
+
+ object_copy_cursor_t cursor;
+ uint64_t size;
+ utime_t mtime;
+ map<string,bufferlist> attrs;
+ bufferlist data;
+ map<string,bufferlist> omap;
+ int rval;
+
+ CopyOp(OpContext *c, hobject_t s, object_locator_t l, version_t v)
+ : ctx(c), src(s), oloc(l), version(v),
+ objecter_tid(0),
+ size(0),
+ rval(-1)
+ {}
+ };
+ typedef boost::shared_ptr<CopyOp> CopyOpRef;
/*
* Capture all object state associated with an in-progress read or write.
int num_read; ///< count read ops
int num_write; ///< count update ops
+ CopyOpRef copy_op;
+
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
+ // -- copyfrom --
+ map<hobject_t, CopyOpRef> copy_ops;
+
+ int start_copy(OpContext *ctx, hobject_t src, object_locator_t oloc, version_t version,
+ CopyOpRef *pcop);
+ void process_copy_chunk(hobject_t oid, tid_t tid, int r);
+ void _copy_some(OpContext *ctx, CopyOpRef cop);
+ void cancel_copy(CopyOpRef cop);
+ void requeue_cancel_copy_ops(bool requeue=true);
+
+ friend class C_Copyfrom;
// -- scrub --
virtual void _scrub(ScrubMap& map);
IoCtx ioctx;
ASSERT_EQ(0, cluster.ioctx_create(pool_name.c_str(), ioctx));
- char buf[64];
- memset(buf, 0xcc, sizeof(buf));
- bufferlist bl;
- bl.append(buf, sizeof(buf));
-
- ASSERT_EQ(0, ioctx.write_full("foo", bl));
+ bufferlist bl, x;
+ bl.append("hi there");
+ x.append("bar");
+ // small object
+ bufferlist blc = bl;
+ bufferlist xc = x;
+ ASSERT_EQ(0, ioctx.write_full("foo", blc));
+ ASSERT_EQ(0, ioctx.setxattr("foo", "myattr", xc));
ObjectWriteOperation op;
- op.copyfrom("foo", ioctx, ioctx.get_last_version());
-
- ASSERT_EQ(0, ioctx.operate("bar", &op));
+ op.copy_from("foo", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("foo.copy", &op));
+
+ bufferlist bl2, x2;
+ ASSERT_EQ((int)bl.length(), ioctx.read("foo.copy", bl2, 10000, 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
+
+ // do a big object
+ bl.append(buffer::create(8000000));
+ bl.zero();
+ bl.append("tail");
+ blc = bl;
+ xc = x;
+ ASSERT_EQ(0, ioctx.write_full("big", blc));
+ ASSERT_EQ(0, ioctx.setxattr("big", "myattr", xc));
+
+ ObjectWriteOperation op2;
+ op.copy_from("big", ioctx, ioctx.get_last_version());
+ ASSERT_EQ(0, ioctx.operate("big.copy", &op));
+
+ bl2.clear();
+ ASSERT_EQ((int)bl.length(), ioctx.read("big.copy", bl2, bl.length(), 0));
+ ASSERT_TRUE(bl.contents_equal(bl2));
+ ASSERT_EQ((int)x.length(), ioctx.getxattr("foo.copy", "myattr", x2));
+ ASSERT_TRUE(x.contents_equal(x2));
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));