}
const sobject_t& soid = obc->soid;
- OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(), &obc->oi);
+ OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(),
+ obc->state, &obc->oi);
bool noop = false;
if (ctx->ops.empty()) {
void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
sobject_t head, pobject_t coid,
- eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps)
+ object_info_t *poi)
{
- object_info_t pi(coid);
- pi.version = v;
- pi.prior_version = ov;
- pi.last_reqid = reqid;
- pi.mtime = mtime;
- pi.snaps.swap(snaps);
bufferlist bv;
- ::encode(pi, bv);
+ ::encode(*poi, bv);
t.clone(info.pgid.to_coll(), head, coid);
t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv);
}
-void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& stats,
- sobject_t soid, loff_t old_size, object_info_t& oi,
- eversion_t& at_version, SnapContext& snapc)
+void ReplicatedPG::prepare_clone(OpContext *ctx, loff_t old_size, eversion_t old_version,
+ utime_t old_mtime, osd_reqid_t old_last_reqid)
{
+ object_info_t& oi = *ctx->poi;
+ const sobject_t& soid = oi.soid;
+ SnapContext& snapc = ctx->snapc;
+ ObjectStore::Transaction& t = ctx->clone_t;
+
// clone?
assert(soid.snap == CEPH_NOSNAP);
dout(20) << "snapset=" << oi.snapset << " snapc=" << snapc << dendl;;
snaps[i] = snapc.snaps[i];
// prepare clone
- _make_clone(t, soid, coid, oi.version, at_version, reqid, oi.mtime, snaps);
+ ctx->clone_obc = new ObjectContext;
+ ctx->clone_obc->state = ctx->mode; // take state from head obc's
+ ctx->clone_obc->soid = coid;
+ ctx->clone_obc->oi.soid = coid;
+ ctx->clone_obc->oi.version = ctx->at_version;
+ ctx->clone_obc->oi.prior_version = old_version;
+ ctx->clone_obc->oi.last_reqid = old_last_reqid;
+ ctx->clone_obc->oi.mtime = old_mtime;
+ ctx->clone_obc->oi.snaps = snaps;
+
+ ctx->clone_obc->force_start_write();
+ if (is_primary())
+ register_object_context(ctx->clone_obc);
+
+ _make_clone(t, soid, coid, &ctx->clone_obc->oi);
// add to snap bound collections
coll_t fc = make_snap_collection(t, snaps[0]);
t.collection_add(lc, info.pgid.to_coll(), coid);
}
- stats.num_objects++;
- stats.num_object_clones++;
+ info.stats.num_objects++;
+ info.stats.num_object_clones++;
oi.snapset.clones.push_back(coid.snap);
oi.snapset.clone_size[coid.snap] = old_size;
oi.snapset.clone_overlap[coid.snap].insert(0, old_size);
// log clone
dout(10) << "cloning v " << oi.version
- << " to " << coid << " v " << at_version
+ << " to " << coid << " v " << ctx->at_version
<< " snaps=" << snaps << dendl;
- log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime));
- ::encode(snaps, log.back().snaps);
+ ctx->log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, ctx->at_version, old_version, ctx->reqid, oi.mtime));
+ ::encode(snaps, ctx->log.back().snaps);
- at_version.version++;
+ ctx->at_version.version++;
}
// update snapset with latest snap context
{
assert(!ctx->ops.empty());
- eversion_t old_version = ctx->poi->version;
-
- const sobject_t& soid = ctx->poi->soid;
object_info_t *poi = ctx->poi;
+ const sobject_t& soid = poi->soid;
+
+ // we'll need this to log
+ eversion_t old_version = poi->version;
+
+ // set these values aside, in case we need to clone
+ __u64 old_size = size;
+ utime_t old_mtime = poi->mtime;
+ osd_reqid_t old_last_reqid = poi->last_reqid;
// prepare the actual mutation
bufferlist::iterator bp = ctx->indata.begin();
int result = do_osd_ops(ctx, ctx->ops, bp, ctx->outdata, exists, size);
if (result < 0 || ctx->op_t.empty())
- return result;
+ return result; // error, or read op.
// clone?
if (soid.snap)
- prepare_clone(ctx->clone_t, ctx->log, ctx->reqid, info.stats, soid, size, *poi,
- ctx->at_version, ctx->snapc);
+ prepare_clone(ctx, old_size, old_version, old_mtime, old_last_reqid);
// finish and log the op.
poi->version = ctx->at_version;
repop->applied = true;
+ if (repop->ctx->clone_obc) {
+ repop->ctx->clone_obc->finish_write();
+ put_object_context(repop->ctx->clone_obc);
+ repop->ctx->clone_obc = 0;
+ }
+
repop->obc->finish_write();
put_object_context(repop->obc);
oi.version = op->old_version;
oi.snapset = op->snapset;
- OpContext ctx(op, op->reqid, op->ops, op->get_data(), &oi);
+ OpContext ctx(op, op->reqid, op->ops, op->get_data(), ObjectContext::RMW, &oi);
if (!op->noop) {
ctx.mtime = op->mtime;
dout(10) << "recover_primary cloning " << head << " v" << latest->prior_version
<< " to " << soid << " v" << latest->version
<< " snaps " << latest->snaps << dendl;
- vector<snapid_t> snaps;
- ::decode(snaps, latest->snaps);
ObjectStore::Transaction t;
- _make_clone(t, head, soid, latest->prior_version, latest->version, latest->reqid, latest->mtime, snaps);
+
+ ObjectContext *headobc = get_object_context(head);
+
+ object_info_t oi(soid);
+ oi.version = latest->version;
+ oi.prior_version = latest->prior_version;
+ oi.last_reqid = headobc->oi.last_reqid;
+ oi.mtime = headobc->oi.mtime;
+ ::decode(oi.snaps, latest->snaps);
+ _make_clone(t, head, soid, &oi);
+
+ put_object_context(headobc);
+
osd->store->apply_transaction(t);
missing.got(latest->soid, latest->version);
missing_loc.erase(latest->soid);
* replicas ack.
*/
struct ObjectContext {
- sobject_t soid;
- int ref;
- bool registered;
-
- void get() { ++ref; }
-
- enum {
+ typedef enum {
IDLE,
DELAYED,
RMW,
DELAYED_FLUSHING,
RMW_FLUSHING
- } state;
-
+ } state_t;
static const char *get_state_name(int s) {
switch (s) {
case IDLE: return "idle";
}
}
+ sobject_t soid;
+ int ref;
+ bool registered;
+
+ state_t state;
+
int num_wr, num_rmw;
entity_inst_t client;
list<Message*> waiting;
__u64 size;
object_info_t oi;
+
+
+ void get() { ++ref; }
bool is_delayed_mode() {
return state == DELAYED || state == DELAYED_FLUSHING;
num_wr++;
assert(state == DELAYED || state == RMW);
}
+ void force_start_write() {
+ num_wr++;
+ }
void finish_write() {
assert(num_wr > 0);
--num_wr;
bufferlist& indata;
bufferlist outdata;
+ ObjectContext::state_t mode; // DELAYED or RMW (or _FLUSHING variant?)
object_info_t *poi;
utime_t mtime;
ObjectStore::Transaction op_t, clone_t, local_t;
vector<PG::Log::Entry> log;
+ ObjectContext *clone_obc; // if we created a clone
+
int data_off; // FIXME: we may want to kill this msgr hint off at some point!
OpContext(Message *_op, osd_reqid_t _reqid, vector<ceph_osd_op>& _ops, bufferlist& _data,
- object_info_t *_poi) :
- op(_op), reqid(_reqid), ops(_ops), indata(_data), poi(_poi),
- data_off(0) {}
+ ObjectContext::state_t _mode, object_info_t *_poi) :
+ op(_op), reqid(_reqid), ops(_ops), indata(_data), mode(_mode), poi(_poi),
+ clone_obc(0), data_off(0) {}
+ ~OpContext() {
+ assert(!clone_obc);
+ }
};
/*
void _make_clone(ObjectStore::Transaction& t,
sobject_t head, sobject_t coid,
- eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps);
- void prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& st,
- sobject_t poid, loff_t old_size, object_info_t& oi,
- eversion_t& at_version, SnapContext& snapc);
+ object_info_t *poi);
+ void prepare_clone(OpContext *ctx, loff_t old_size,
+ eversion_t old_version, utime_t old_mtime, osd_reqid_t old_last_reqid);
void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);
int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,