struct WatchInfo : public Objecter::WatchContext {
librados::IoCtxImpl *ioctx;
- uint64_t user_cookie;
object_t oid;
librados::WatchCtx *ctx;
librados::WatchCtx2 *ctx2;
- WatchInfo(librados::IoCtxImpl *io, uint64_t uc, object_t o,
+ WatchInfo(librados::IoCtxImpl *io, object_t o,
librados::WatchCtx *c, librados::WatchCtx2 *c2)
- : ioctx(io), user_cookie(uc), oid(o), ctx(c), ctx2(c2) {}
+ : ioctx(io), oid(o), ctx(c), ctx2(c2) {}
void handle_notify(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) {
ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
- << " cookie " << user_cookie
+ << " cookie " << cookie
<< " notifier_id " << notifier_id
<< " len " << bl.length()
<< dendl;
if (ctx2)
- ctx2->handle_notify(notify_id, user_cookie, notifier_id, bl);
+ ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
if (ctx) {
ctx->notify(0, 0, bl);
// send ACK back to OSD if using legacy protocol
bufferlist empty;
- ioctx->notify_ack(oid, notify_id, user_cookie, empty);
+ ioctx->notify_ack(oid, notify_id, cookie, empty);
}
}
void handle_failed_notify(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id) {
ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
- << " cookie " << user_cookie
+ << " cookie " << cookie
<< " notifier_id " << notifier_id
<< dendl;
if (ctx2)
- ctx2->handle_failed_notify(notify_id, user_cookie, notifier_id);
+ ctx2->handle_failed_notify(notify_id, cookie, notifier_id);
}
void handle_error(uint64_t cookie, int err) {
- ldout(ioctx->client->cct, 10) << __func__ << " cookie " << user_cookie
+ ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
<< " err " << err
<< dendl;
if (ctx2)
- ctx2->handle_error(user_cookie, err);
+ ctx2->handle_error(cookie, err);
}
};
lock->Lock();
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
- *handle = reinterpret_cast<uint64_t>(linger_op);
+ *handle = linger_op->get_cookie();
linger_op->watch_context = new WatchInfo(this,
- reinterpret_cast<uint64_t>(linger_op),
oid, ctx, ctx2);
prepare_assert_ops(&wr);
- wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_WATCH);
+ wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH);
bufferlist bl;
objecter->linger_watch(linger_op, wr,
snapc, ceph_clock_now(NULL), bl,
{
Mutex::Locker l(*lock);
::ObjectOperation rd;
- Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
prepare_assert_ops(&rd);
- rd.notify_ack(notify_id, linger_op->linger_id, bl);
+ rd.notify_ack(notify_id, cookie, bl);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
return 0;
}
lock->Lock();
::ObjectOperation wr;
prepare_assert_ops(&wr);
- wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_UNWATCH);
+ wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
objecter->mutate(linger_op->target.base_oid, oloc, wr,
snapc, ceph_clock_now(client->cct), 0, NULL, &onfinish, &ver);
objecter->linger_cancel(linger_op);
// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify(linger_op->linger_id, inbl);
+ rd.notify(linger_op->get_cookie(), inbl);
// Issue RADOS op
C_SaferCond onack;
_session_linger_op_remove(homeless_session, lop);
}
linger_ops.erase(lop->linger_id);
+ linger_ops_set.erase(lop);
lop->put();
}
ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl;
opv.push_back(OSDOp());
opv.back().op.op = CEPH_OSD_OP_WATCH;
- opv.back().op.watch.cookie = info->linger_id;
+ opv.back().op.watch.cookie = info->get_cookie();
opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
opv.back().op.watch.gen = ++info->register_gen;
oncommit = new C_Linger_Reconnect(this, info);
vector<OSDOp> opv(1);
opv[0].op.op = CEPH_OSD_OP_WATCH;
- opv[0].op.watch.cookie = info->linger_id;
+ opv[0].op.watch.cookie = info->get_cookie();
opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
opv[0].op.watch.gen = info->register_gen;
C_Linger_Ping *onack = new C_Linger_Ping(this, info);
s->lock.unlock();
linger_ops.erase(info->linger_id);
+ linger_ops_set.erase(info);
+ assert(linger_ops.size() == linger_ops_set.size());
+
info->canceled = true;
info->put();
// Acquire linger ID
info->linger_id = ++max_linger_id;
ldout(cct, 10) << __func__ << " info " << info
- << " linger_id " << info->linger_id << dendl;
+ << " linger_id " << info->linger_id
+ << " cookie " << info->get_cookie()
+ << dendl;
linger_ops[info->linger_id] = info;
+ linger_ops_set.insert(info);
+ assert(linger_ops.size() == linger_ops_set.size());
info->get(); // for the caller
return info;
return;
}
- map<uint64_t,LingerOp*>::iterator p = linger_ops.find(m->cookie);
- if (p == linger_ops.end()) {
+ LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
+ if (linger_ops_set.count(info) == 0) {
ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
return;
}
-
- LingerOp *info = p->second;
finisher->queue(new C_DoWatchNotify(this, info, m));
}