const sobject_t& soid = obc->obs.oi.soid;
OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops,
&obc->obs, this);
- bool noop = false;
ctx->obc = obc;
if (op->may_write()) {
// version
ctx->at_version = log.head;
- if (!noop) {
- ctx->at_version.epoch = osd->osdmap->get_epoch();
- ctx->at_version.version++;
- assert(ctx->at_version > info.last_update);
- assert(ctx->at_version > log.head);
- }
+
+ ctx->at_version.epoch = osd->osdmap->get_epoch();
+ ctx->at_version.version++;
+ assert(ctx->at_version > info.last_update);
+ assert(ctx->at_version > log.head);
ctx->mtime = op->get_mtime();
eversion_t old_version = obc->obs.oi.version;
// we are acker.
- if (!noop) {
-
- if (op->may_read()) {
- dout(10) << " taking ondisk_read_lock" << dendl;
- obc->ondisk_read_lock();
- }
- int result = prepare_transaction(ctx);
- if (op->may_read()) {
- dout(10) << " dropping ondisk_read_lock" << dendl;
- obc->ondisk_read_unlock();
- }
-
- if (result == -EAGAIN) // must have referenced non-existent class
- return;
+ if (op->may_read()) {
+ dout(10) << " taking ondisk_read_lock" << dendl;
+ obc->ondisk_read_lock();
+ }
+ int result = prepare_transaction(ctx);
+ if (op->may_read()) {
+ dout(10) << " dropping ondisk_read_lock" << dendl;
+ obc->ondisk_read_unlock();
+ }
- // prepare the reply
- ctx->reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
- ctx->reply->set_data(ctx->outdata);
- ctx->reply->get_header().data_off = ctx->data_off;
- ctx->reply->set_result(result);
+ if (result == -EAGAIN) // must have referenced non-existent class
+ return;
- if (result >= 0)
- ctx->reply->set_version(ctx->reply_version);
+ // prepare the reply
+ ctx->reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), 0);
+ ctx->reply->set_data(ctx->outdata);
+ ctx->reply->get_header().data_off = ctx->data_off;
+ ctx->reply->set_result(result);
- // read or error?
- if (ctx->op_t.empty() || result < 0) {
- log_op_stats(ctx);
+ if (result >= 0)
+ ctx->reply->set_version(ctx->reply_version);
- MOSDOpReply *reply = ctx->reply;
- ctx->reply = NULL;
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- osd->client_messenger->send_message(reply, op->get_connection());
- op->put();
- delete ctx;
- put_object_context(obc);
- return;
- }
+ // read or error?
+ if (ctx->op_t.empty() || result < 0) {
+ log_op_stats(ctx);
+
+ MOSDOpReply *reply = ctx->reply;
+ ctx->reply = NULL;
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ osd->client_messenger->send_message(reply, op->get_connection());
+ op->put();
+ delete ctx;
+ put_object_context(obc);
+ return;
+ }
- assert(op->may_write());
+ assert(op->may_write());
- // trim log?
- calc_trim_to();
+ // trim log?
+ calc_trim_to();
- log_op(ctx->log, pg_trim_to, ctx->local_t);
- }
+ log_op(ctx->log, pg_trim_to, ctx->local_t);
// continuing on to write path, make sure object context is registered
assert(obc->registered);
// issue replica writes
tid_t rep_tid = osd->get_tid();
- RepGather *repop = new_repop(ctx, obc, noop, rep_tid);
+ RepGather *repop = new_repop(ctx, obc, rep_tid);
// note: repop now owns ctx AND ctx->op
issue_repop(repop, now, old_last_update, old_exists, old_size, old_version);
uint64_t old_size = obc->obs.oi.size;
eversion_t old_version = obc->obs.oi.version;
- RepGather *repop = new_repop(ctx, obc, false, rep_tid);
+ RepGather *repop = new_repop(ctx, obc, rep_tid);
ObjectStore::Transaction *t = &ctx->op_t;
}
if (ssc->snapset.head_exists && // head exists
- snapc.snaps.size() && // there are snaps
+ snapc.snaps.size() && // there are snaps
snapc.snaps[0] > ssc->snapset.seq) { // existing object is old
// clone
sobject_t coid = soid;
// forward the write/update/whatever
MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
- repop->noop, acks_wanted,
+ false, acks_wanted,
osd->osdmap->get_epoch(),
repop->rep_tid, repop->ctx->at_version);
send_message(wr, osd->osdmap->get_cluster_inst(peer));
// keep peer_info up to date
- if (!repop->noop) {
- Info &in = peer_info[peer];
+ Info &in = peer_info[peer];
+ in.last_update = ctx->at_version;
+ if (in.last_complete == old_last_update)
in.last_update = ctx->at_version;
- if (in.last_complete == old_last_update)
- in.last_update = ctx->at_version;
- }
}
}
ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext *obc,
- bool noop, tid_t rep_tid)
+ tid_t rep_tid)
{
if (ctx->op)
dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl;
else
dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
- RepGather *repop = new RepGather(ctx, obc, noop, rep_tid, info.last_complete);
+ RepGather *repop = new RepGather(ctx, obc, rep_tid, info.last_complete);
dout(10) << "new_repop mode was " << mode << dendl;
mode.write_start();
ObjectContext *obc;
tid_t rep_tid;
- bool noop;
bool applying, applied, aborted;
list<ObjectStore::Transaction*> tls;
- RepGather(OpContext *c, ObjectContext *pi, bool noop_, tid_t rt,
+ RepGather(OpContext *c, ObjectContext *pi, tid_t rt,
eversion_t lc) :
queue_item(this),
nref(1),
ctx(c), obc(pi),
rep_tid(rt),
- noop(noop_),
applying(false), applied(false), aborted(false),
sent_ack(false),
//sent_nvram(false),
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, utime_t now,
eversion_t old_last_update, bool old_exists, uint64_t old_size, eversion_t old_version);
- RepGather *new_repop(OpContext *ctx, ObjectContext *obc, bool noop, tid_t rep_tid);
+ RepGather *new_repop(OpContext *ctx, ObjectContext *obc, tid_t rep_tid);
void remove_repop(RepGather *repop);
void repop_ack(RepGather *repop,
int result, int ack_type,