// ========================================================================
// READS
-int ReplicatedPG::do_read_ops(ReadOpContext *ctx,
+int ReplicatedPG::do_read_ops(OpContext *ctx,
bufferlist::iterator& bp, bufferlist& data)
{
int result = 0;
void ReplicatedPG::op_read(MOSDOp *op, ObjectContext *obc)
{
const sobject_t& soid = obc->soid;
- ReadOpContext ctx(op, op->ops, &obc->oi);
+ OpContext ctx(op, op->get_reqid(), op->ops, op->get_data(), &obc->oi);
dout(10) << "op_read " << soid << " " << ctx.ops << dendl;
- bufferlist::iterator bp = op->get_data().begin();
+ bufferlist::iterator bp = ctx.data.begin();
bufferlist data;
int result = 0;
return 0;
}
-void ReplicatedPG::prepare_transaction(WriteOpContext *ctx, bool& exists, __u64& size, eversion_t trim_to)
+void ReplicatedPG::prepare_transaction(OpContext *ctx, bool& exists, __u64& size, eversion_t trim_to)
{
bufferlist log_bl;
eversion_t log_version = ctx->at_version;
eversion_t old_version = ctx->poi->version;
- sobject_t& soid = ctx->soid;
+ const sobject_t& soid = ctx->poi->soid;
vector<ceph_osd_op>& ops = ctx->ops;
object_info_t *poi = ctx->poi;
update_stats();
// any completion stuff to do here?
- sobject_t& soid = repop->ctx->soid;
+ const sobject_t& soid = repop->ctx->poi->soid;
ceph_osd_op& first = repop->ctx->ops[0];
switch (first.op) {
void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
{
- sobject_t& soid = repop->ctx->soid;
+ const sobject_t& soid = repop->ctx->poi->soid;
dout(7) << " issue_repop rep_tid " << repop->rep_tid
<< " o " << soid
<< " to osd" << dest
osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
}
-ReplicatedPG::RepGather *ReplicatedPG::new_repop(WriteOpContext *ctx, ObjectContext *obc,
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *obc,
bool noop, tid_t rep_tid)
{
dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl;
RepGather *repop = new RepGather(ctx, obc, noop, rep_tid, info.last_complete);
+ obc->start_write();
+ obc->get(); // we take a ref
+
// initialize gather sets
for (unsigned i=0; i<acting.size(); i++) {
int osd = acting[i];
{
int whoami = osd->get_nodeid();
- WriteOpContext *ctx = new WriteOpContext(op, op->ops, op->get_data(),
- sobject_t(op->get_oid(), CEPH_NOSNAP),
- op->get_reqid(), op->get_mtime());
- sobject_t& soid = ctx->soid;
+ OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, op->get_data(), &obc->oi);
+
+ const sobject_t& soid = ctx->poi->soid;
// balance-reads set?
#if 0
}
#endif
- // get existing object info
- obc->get();
- ctx->poi = &obc->oi;
-
// --- locking ---
// wrlock?
opname = ceph_osd_op_name(ctx->ops[0].op);
+ ctx->mtime = op->get_mtime();
+
// version
ctx->at_version = log.top;
if (!noop) {
// note my stats
utime_t now = g_clock.now();
- obc->start_write();
-
// issue replica writes
tid_t rep_tid = osd->get_tid();
RepGather *repop = new_repop(ctx, obc, noop, rep_tid);
oi.version = op->old_version;
oi.snapset = op->snapset;
- WriteOpContext ctx(op, op->ops, op->get_data(), op->poid, op->reqid, op->mtime);
- ctx.poi = &oi;
+ OpContext ctx(op, op->reqid, op->ops, op->get_data(), &oi);
+ ctx.mtime = op->mtime;
ctx.at_version = op->version;
ctx.snapc = op->snapc;
/*
- * Capture all object state associated with an in-progress read.
+ * Capture all object state associated with an in-progress read or write.
*/
- struct ReadOpContext {
- MOSDOp *op;
- vector<ceph_osd_op>& ops;
-
- object_info_t *poi;
- int data_off; // FIXME: we may want to kill this msgr hint off at some point!
-
- ReadOpContext(MOSDOp *_op, vector<ceph_osd_op>& _ops, object_info_t *_poi) :
- op(_op), ops(_ops), poi(_poi), data_off(0) {}
- };
-
- /*
- * Capture all state associated with a write operation being processed
- * on the current OSD.
- */
- struct WriteOpContext {
+ struct OpContext {
Message *op;
+ osd_reqid_t reqid;
vector<ceph_osd_op>& ops;
bufferlist& data;
- sobject_t soid;
- osd_reqid_t reqid;
- utime_t mtime;
-
- SnapContext snapc; // writer snap context
object_info_t *poi;
+ utime_t mtime;
+ SnapContext snapc; // writer snap context
eversion_t at_version; // pg's current version pointer
ObjectStore::Transaction t;
- WriteOpContext(Message *_op, vector<ceph_osd_op>& _ops, bufferlist& _data,
- sobject_t _soid, osd_reqid_t _reqid, utime_t _mtime) :
- op(_op), ops(_ops), data(_data), soid(_soid), reqid(_reqid), mtime(_mtime),
- poi(0) {}
+ int data_off; // FIXME: we may want to kill this msgr hint off at some point!
+
+ OpContext(Message *_op, osd_reqid_t _reqid, vector<ceph_osd_op>& _ops, bufferlist& _data,
+ object_info_t *_poi) :
+ op(_op), reqid(_reqid), ops(_ops), data(_data), poi(_poi),
+ data_off(0) {}
};
/*
xlist<RepGather*>::item queue_item;
int nref;
- WriteOpContext *ctx;
+ OpContext *ctx;
ObjectContext *obc;
tid_t rep_tid;
eversion_t pg_local_last_complete;
map<int,eversion_t> pg_complete_thru;
- RepGather(WriteOpContext *c, ObjectContext *pi, bool noop_, tid_t rt,
+ RepGather(OpContext *c, ObjectContext *pi, bool noop_, tid_t rt,
eversion_t lc) :
queue_item(this),
nref(1),
void apply_repop(RepGather *repop);
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, int dest, utime_t now);
- RepGather *new_repop(WriteOpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid);
+ RepGather *new_repop(OpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid);
void repop_ack(RepGather *repop,
int result, int ack_type,
int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
sobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc);
- void prepare_transaction(WriteOpContext *ctx, bool& exists, __u64& size, eversion_t trim_to);
+ void prepare_transaction(OpContext *ctx, bool& exists, __u64& size, eversion_t trim_to);
friend class C_OSD_ModifyCommit;
friend class C_OSD_RepModifyCommit;
void op_read(MOSDOp *op, ObjectContext *obc);
void op_modify(MOSDOp *op, ObjectContext *obc);
- int do_read_ops(ReadOpContext *ctx, bufferlist::iterator& bp, bufferlist& data);
+ int do_read_ops(OpContext *ctx, bufferlist::iterator& bp, bufferlist& data);
void sub_op_modify(MOSDSubOp *op);
void sub_op_modify_reply(MOSDSubOpReply *reply);