if (mode.is_rmw_mode())
apply_repop(repop);
- // (logical) local ack.
- // (if alone and delayed, this will apply the update.)
- int whoami = osd->get_nodeid();
- assert(repop->waitfor_ack.count(whoami));
- repop->waitfor_ack.erase(whoami);
eval_repop(repop);
repop->put();
// ========================================================================
// rep op gather
+class C_OSD_OpApplied : public Context {
+public:
+ ReplicatedPG *pg;
+ ReplicatedPG::RepGather *repop;
+
+ C_OSD_OpApplied(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
+ pg(p), repop(rg) {
+ repop->get();
+ pg->get(); // we're copying the pointer
+ }
+ void finish(int r) {
+ pg->lock();
+ pg->op_applied(repop);
+ repop->put();
+ pg->unlock();
+ pg->put();
+ }
+};
+
class C_OSD_OpCommit : public Context {
public:
ReplicatedPG *pg;
}
};
-/** op_commit
- * transaction commit on the acker.
- */
-void ReplicatedPG::op_ondisk(RepGather *repop)
-{
- if (repop->aborted) {
- dout(10) << "op_ondisk " << *repop << " -- aborted" << dendl;
- } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
- dout(10) << "op_ondisk " << *repop << " -- already marked ondisk" << dendl;
- } else {
- dout(10) << "op_ondisk " << *repop << dendl;
- repop->waitfor_disk.erase(osd->get_nodeid());
- repop->waitfor_nvram.erase(osd->get_nodeid());
- last_complete_ondisk = repop->pg_local_last_complete;
- eval_repop(repop);
- }
-}
-
-
void ReplicatedPG::apply_repop(RepGather *repop)
{
dout(10) << "apply_repop applying update on " << *repop << dendl;
dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << dendl;
assert(0);
}
-
+
+ op_applied(repop);
+}
+
+void ReplicatedPG::op_applied(RepGather *repop)
+{
+ dout(10) << "op_applied " << *repop << dendl;
+
// discard my reference to the buffer
repop->ctx->op->get_data().clear();
- tls.clear();
repop->ctx->op_t.clear_data();
repop->applied = true;
+
+ // (logical) local ack.
+ int whoami = osd->get_nodeid();
+ assert(repop->waitfor_ack.count(whoami));
+ repop->waitfor_ack.erase(whoami);
if (repop->ctx->clone_obc) {
put_object_context(repop->ctx->clone_obc);
}
dout(10) << "apply_repop mode was " << mode << dendl;
- mode.finish_write();
+ mode.write_applied();
dout(10) << "apply_repop mode now " << mode << " (finish_write)" << dendl;
put_object_context(repop->obc);
}
break;
}
-
}
+void ReplicatedPG::op_ondisk(RepGather *repop)
+{
+ if (repop->aborted) {
+ dout(10) << "op_ondisk " << *repop << " -- aborted" << dendl;
+ } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
+ dout(10) << "op_ondisk " << *repop << " -- already marked ondisk" << dendl;
+ } else {
+ dout(10) << "op_ondisk " << *repop << dendl;
+ repop->waitfor_disk.erase(osd->get_nodeid());
+ repop->waitfor_nvram.erase(osd->get_nodeid());
+ last_complete_ondisk = repop->pg_local_last_complete;
+ eval_repop(repop);
+ }
+}
+
+
+
void ReplicatedPG::eval_repop(RepGather *repop)
{
dout(10) << "eval_repop " << *repop << dendl;
// apply?
if (!repop->applied &&
mode.is_delayed_mode() &&
- repop->waitfor_ack.empty()) // all replicas have acked
+ repop->waitfor_ack.size() == 1) // all other replicas have acked
apply_repop(repop);
// disk?
RepGather *repop = new RepGather(ctx, obc, noop, rep_tid, info.last_complete);
dout(10) << "new_repop mode was " << mode << dendl;
- mode.start_write();
+ mode.write_start();
obc->get(); // we take a ref
dout(10) << "new_repop mode now " << mode << " (start_write)" << dendl;
return state == RMW || state == RMW_FLUSHING;
}
- void start_write() {
+ void write_start() {
num_wr++;
assert(state == DELAYED || state == RMW);
}
- void finish_write() {
+ void write_applied() {
assert(num_wr > 0);
--num_wr;
if (num_wr == 0) {
wake = true;
}
}
+ void write_commit() {
+ }
};
map<tid_t, RepGather*> repop_map;
void apply_repop(RepGather *repop);
+ void op_applied(RepGather *repop);
+ void op_ondisk(RepGather *repop);
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, int dest, utime_t now,
bool old_exists, __u64 old_size, eversion_t old_version);
int result, int ack_type,
int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
+ friend class C_OSD_OpCommit;
+ friend class C_OSD_OpApplied;
// projected object info
map<sobject_t, ObjectContext*> object_contexts;
// low level ops
- void op_ondisk(RepGather *repop);
void _make_clone(ObjectStore::Transaction& t,
const sobject_t& head, const sobject_t& coid,
int prepare_transaction(OpContext *ctx);
void log_op(vector<Log::Entry>& log, eversion_t trim_to, ObjectStore::Transaction& t);
- friend class C_OSD_OpCommit;
friend class C_OSD_RepModifyCommit;
// pg on-disk content