o->snapid = info->snap;
// do not resend this; we will send a new op to reregister
- o->should_resend = false;
+ o->should_resend = !info->is_watch;
if (info->session) {
int r = recalc_op_target(o);
if (info->register_tid) {
// repeat send. cancel old registeration op, if any.
- if (ops.count(info->register_tid)) {
+ if (info->is_watch && ops.count(info->register_tid)) {
Op *o = ops[info->register_tid];
cancel_op(o);
}
}
}
+/**
+ * Note that this is meant to handle a watch OR a notify, but not both in the same ObjectOperation.
+ * This is because watches need to be resent with a new tid on map changes, while notifies
+ * need to resend using the old tid.
+ */
tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
info->snap = snap;
info->flags = flags;
info->ops = op.ops;
+ bool saw_notify = false;
+ for (vector<OSDOp>::const_iterator it = info->ops.begin();
+ it != info->ops.end(); ++it) {
+ if (it->op.op == CEPH_OSD_OP_WATCH)
+ info->is_watch = true;
+ if (it->op.op == CEPH_OSD_OP_NOTIFY)
+ saw_notify = true;
+ if (info->is_watch)
+ assert(it->op.op != CEPH_OSD_OP_NOTIFY);
+ if (saw_notify)
+ assert(it->op.op != CEPH_OSD_OP_WATCH);
+ }
info->inbl = inbl;
info->poutbl = poutbl;
info->pobjver = objver;
tid_t register_tid;
epoch_t map_dne_bound;
+ bool is_watch;
LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL),
registered(false),
on_reg_ack(NULL), on_reg_commit(NULL),
session(NULL), session_item(this),
register_tid(0),
- map_dne_bound(0) {}
+ map_dne_bound(0), is_watch(false) {}
// no copy!
const LingerOp &operator=(const LingerOp& r);