t.setattr(info.pgid.to_coll(), coid, OI_ATTR, bv);
}
-void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& stats,
+void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& stats,
sobject_t soid, loff_t old_size, object_info_t& oi,
eversion_t& at_version, SnapContext& snapc)
{
dout(10) << "cloning v " << oi.version
<< " to " << coid << " v " << at_version
<< " snaps=" << snaps << dendl;
- Log::Entry cloneentry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime);
- ::encode(snaps, cloneentry.snaps);
- add_log_entry(cloneentry, logbl);
+ log.push_back(Log::Entry(PG::Log::Entry::CLONE, coid, at_version, oi.version, reqid, oi.mtime));
+ ::encode(snaps, log.back().snaps);
at_version.version++;
}
void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size, eversion_t trim_to)
{
- bufferlist log_bl;
eversion_t log_version = ctx->at_version;
assert(!ctx->ops.empty());
vector<ceph_osd_op>& ops = ctx->ops;
object_info_t *poi = ctx->poi;
- // apply ops
- bool did_snap = false;
+
+ // prepare the actual mutation
bufferlist::iterator bp = ctx->data.begin();
- for (unsigned i=0; i<ops.size(); i++) {
- // clone?
- if (!did_snap && soid.snap &&
- !ceph_osd_op_type_lock(ops[i].op)) { // is a (non-lock) modification
- prepare_clone(ctx->t, log_bl, ctx->reqid, info.stats, soid, size, *poi,
- ctx->at_version, ctx->snapc);
- did_snap = true;
- }
- prepare_simple_op(ctx->t, ctx->reqid, info.stats, soid, size, exists, *poi,
+ for (unsigned i=0; i<ops.size(); i++)
+ prepare_simple_op(ctx->op_t, ctx->reqid, info.stats, soid, size, exists, *poi,
ops, i, bp, ctx->snapc);
- }
- // finish.
+ // FIXME FIXME
+ if (ctx->op_t.empty())
+ return;
+
+ // clone?
+ if (soid.snap)
+ prepare_clone(ctx->clone_t, ctx->log, ctx->reqid, info.stats, soid, size, *poi,
+ ctx->at_version, ctx->snapc);
+
+ // finish and log the op.
poi->version = ctx->at_version;
if (exists) {
poi->version = ctx->at_version;
bufferlist bv(sizeof(*poi));
::encode(*poi, bv);
- ctx->t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
+ ctx->op_t.setattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
}
// append to log
int logopcode = Log::Entry::MODIFY;
if (!exists)
logopcode = Log::Entry::DELETE;
- Log::Entry logentry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime);
- add_log_entry(logentry, log_bl);
+ ctx->log.push_back(Log::Entry(logopcode, soid, ctx->at_version, old_version, ctx->reqid, ctx->mtime));
+
+ // update the local pg, pg log
+ write_info(ctx->local_t);
- // write pg info, log to disk
- write_info(ctx->t);
- append_log(ctx->t, log_bl, log_version, trim_to);
+ bufferlist log_bl;
+ for (vector<Log::Entry>::iterator p = ctx->log.begin();
+ p != ctx->log.end();
+ p++)
+ add_log_entry(*p, log_bl);
+ append_log(ctx->local_t, log_bl, log_version, trim_to);
}
assert(!repop->applied);
Context *oncommit = new C_OSD_ModifyCommit(this, repop);
- unsigned r = osd->store->apply_transaction(repop->ctx->t, oncommit);
+
+ list<ObjectStore::Transaction*> tls;
+ tls.push_back(&repop->ctx->clone_t);
+ tls.push_back(&repop->ctx->op_t);
+ tls.push_back(&repop->ctx->local_t);
+ unsigned r = osd->store->apply_transactions(tls, oncommit);
if (r)
dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << dendl;
// discard my reference to the buffer
repop->ctx->op->get_data().clear();
+ tls.clear();
+ repop->ctx->op_t.clear_data();
repop->applied = true;
}
-void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
+void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now,
+ bool old_exists, __u64 old_size, eversion_t old_version)
{
const sobject_t& soid = repop->ctx->poi->soid;
dout(7) << " issue_repop rep_tid " << repop->rep_tid
osd->osdmap->get_epoch(),
repop->rep_tid, repop->ctx->at_version);
wr->mtime = repop->ctx->mtime;
- wr->old_exists = repop->obc->exists;
- wr->old_size = repop->obc->size;
- wr->old_version = repop->obc->oi.version;
+ wr->old_exists = old_exists;
+ wr->old_size = old_size;
+ wr->old_version = old_version;
wr->snapset = repop->obc->oi.snapset;
wr->snapc = repop->ctx->snapc;
wr->get_data() = repop->ctx->op->get_data(); // _copy_ bufferlist
// head?
if (snapid > hobc->oi.snapset.seq) {
- dout(10) << "get_object_context " << head
+ dout(10) << "find_object_context " << head
<< " want " << snapid << " > snapset seq " << hobc->oi.snapset.seq
<< " -- HIT" << dendl;
*pobc = hobc;
if (log.top.version - log.bottom.version > info.stats.num_objects)
trim_to = peers_complete_thru;
+ // note some basic context for op replication that prepare_transaction may clobber
+ bool old_exists = obc->exists;
+ __u64 old_size = obc->size;
+ eversion_t old_version = obc->oi.version;
+
// we are acker.
if (!noop) {
// log and update later.
}
for (unsigned i=1; i<acting.size(); i++)
- issue_repop(repop, acting[i], now);
+ issue_repop(repop, acting[i], now, old_exists, old_size, old_version);
// keep peer_info up to date
for (unsigned i=1; i<acting.size(); i++) {
utime_t mtime;
SnapContext snapc; // writer snap context
eversion_t at_version; // pg's current version pointer
- ObjectStore::Transaction t;
+
+ ObjectStore::Transaction op_t, clone_t, local_t;
+ vector<PG::Log::Entry> log;
int data_off; // FIXME: we may want to kill this msgr hint off at some point!
void apply_repop(RepGather *repop);
void eval_repop(RepGather*);
- void issue_repop(RepGather *repop, int dest, utime_t now);
+ void issue_repop(RepGather *repop, int dest, utime_t now,
+ bool old_exists, __u64 old_size, eversion_t old_version);
RepGather *new_repop(OpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid);
void repop_ack(RepGather *repop,
int result, int ack_type,
void _make_clone(ObjectStore::Transaction& t,
sobject_t head, sobject_t coid,
eversion_t ov, eversion_t v, osd_reqid_t& reqid, utime_t mtime, vector<snapid_t>& snaps);
- void prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid, pg_stat_t& st,
+ void prepare_clone(ObjectStore::Transaction& t, vector<Log::Entry>& log, osd_reqid_t reqid, pg_stat_t& st,
sobject_t poid, loff_t old_size, object_info_t& oi,
eversion_t& at_version, SnapContext& snapc);
void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);