OPTION(osd_debug_drop_pg_create_probability, OPT_DOUBLE, 0)
OPTION(osd_debug_drop_pg_create_duration, OPT_INT, 1)
OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op
+OPTION(osd_debug_op_order, OPT_BOOL, false)
OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track
OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track
OPTION(osd_target_transaction_size, OPT_INT, 30) // to adjust various transactions that batch smaller items
// continuing on to write path, make sure object context is registered
assert(obc->registered);
+ // verify that we are doing this in order?
+ if (g_conf->osd_debug_op_order && m->get_source().is_client()) {
+ map<client_t,tid_t>& cm = debug_op_order[obc->obs.oi.soid];
+ tid_t t = m->get_tid();
+ client_t n = m->get_source().num();
+ map<client_t,tid_t>::iterator p = cm.find(n);
+ if (p == cm.end()) {
+ dout(20) << " op order client." << n << " tid " << t << " (first)" << dendl;
+ cm[n] = t;
+ } else {
+ dout(20) << " op order client." << n << " tid " << t << " last was " << p->second << dendl;
+ if (p->second > t) {
+ derr << "bad op order, already applied " << p->second << " > this " << t << dendl;;
+ assert(0 == "out of order op");
+ }
+ p->second = t;
+ }
+ }
+
// issue replica writes
tid_t rep_tid = osd->get_tid();
RepGather *repop = new_repop(ctx, obc, rep_tid); // new repop claims our obc, src_obc refs
// clear snap_trimmer state
snap_trimmer_machine.process_event(Reset());
+
+ debug_op_order.clear();
}
void ReplicatedPG::on_role_change()
map<hobject_t, ObjectContext*> object_contexts;
map<object_t, SnapSetContext*> snapset_contexts;
+ // debug order that client ops are applied
+ map<hobject_t, map<client_t, tid_t> > debug_op_order;
+
void populate_obc_watchers(ObjectContext *obc);
public:
void handle_watch_timeout(WatchRef watch);