WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
wc->watch_ctx = ctx;
wc->watch_ctx2 = ctx2;
+ wc->linger_op = objecter->linger_register(oid, oloc, 0);
client->register_watch_notify_callback(wc, cookie);
prepare_assert_ops(&wr);
wr.watch(*cookie, CEPH_OSD_WATCH_OP_WATCH);
bufferlist bl;
- wc->linger_op = objecter->linger_watch(oid, oloc, wr,
- snapc, ceph_clock_now(NULL), bl,
- *cookie, 0,
- NULL, onfinish, &wc->on_error,
- &objver);
+ objecter->linger_watch(wc->linger_op, wr,
+ snapc, ceph_clock_now(NULL), bl,
+ *cookie,
+ NULL, onfinish, &wc->on_error,
+ &objver);
lock->Unlock();
mylock.Lock();
// Acquire cookie
uint64_t cookie;
+ wc->linger_op = objecter->linger_register(oid, oloc, 0);
client->register_watch_notify_callback(wc, &cookie);
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
// Issue RADOS op
C_SaferCond onack;
version_t objver;
- wc->linger_op = objecter->linger_notify(oid, oloc, rd, snap_seq, inbl, NULL, 0,
- &onack, &objver);
+ objecter->linger_notify(wc->linger_op,
+ rd, snap_seq, inbl, NULL,
+ &onack, &objver);
lock->Unlock();
ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_op << dendl;
}
}
-Objecter::LingerOp *Objecter::linger_watch(const object_t& oid,
- const object_locator_t& oloc,
- ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime,
- bufferlist& inbl, uint64_t cookie, int flags,
- Context *onack, Context *oncommit,
- Context *onerror,
- version_t *objver)
+
+
+Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
+ const object_locator_t& oloc,
+ int flags)
{
LingerOp *info = new LingerOp;
info->target.base_oid = oid;
info->target.base_oloc = oloc;
if (info->target.base_oloc.key == oid)
info->target.base_oloc.key.clear();
+ info->target.flags = flags;
+ info->watch_valid_thru = ceph_clock_now(NULL);
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
+ // Acquire linger ID
+ info->linger_id = ++max_linger_id;
+ ldout(cct, 10) << __func__ << " info " << info
+ << " linger_id " << info->linger_id << dendl;
+ linger_ops[info->linger_id] = info;
+
+ info->get(); // for the caller
+ return info;
+}
+
+ceph_tid_t Objecter::linger_watch(LingerOp *info,
+ ObjectOperation& op,
+ const SnapContext& snapc, utime_t mtime,
+ bufferlist& inbl, uint64_t cookie,
+ Context *onack, Context *oncommit,
+ Context *onerror,
+ version_t *objver)
+{
info->snapc = snapc;
info->mtime = mtime;
- info->target.flags = flags | CEPH_OSD_FLAG_WRITE;
+ info->target.flags |= CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
info->cookie = cookie;
info->inbl = inbl;
info->on_reg_ack = onack;
info->on_reg_commit = oncommit;
info->on_error = onerror;
- info->watch_valid_thru = ceph_clock_now(NULL);
RWLock::WLocker wl(rwlock);
_linger_submit(info);
logger->inc(l_osdc_linger_active);
- info->get(); // for the caller
- return info;
+ return info->linger_id;
}
-Objecter::LingerOp *Objecter::linger_notify(const object_t& oid,
- const object_locator_t& oloc,
- ObjectOperation& op,
- snapid_t snap, bufferlist& inbl,
- bufferlist *poutbl, int flags,
- Context *onfinish,
- version_t *objver)
+ceph_tid_t Objecter::linger_notify(LingerOp *info,
+ ObjectOperation& op,
+ snapid_t snap, bufferlist& inbl,
+ bufferlist *poutbl,
+ Context *onfinish,
+ version_t *objver)
{
- LingerOp *info = new LingerOp;
- info->target.base_oid = oid;
- info->target.base_oloc = oloc;
- if (info->target.base_oloc.key == oid)
- info->target.base_oloc.key.clear();
info->snap = snap;
- info->target.flags = flags | CEPH_OSD_FLAG_READ;
+ info->target.flags |= CEPH_OSD_FLAG_READ;
info->ops = op.ops;
info->inbl = inbl;
info->poutbl = poutbl;
RWLock::WLocker wl(rwlock);
_linger_submit(info);
logger->inc(l_osdc_linger_active);
- info->get(); // for the caller
- return info;
+
+ return info->linger_id;
}
void Objecter::_linger_submit(LingerOp *info)
assert(rwlock.is_wlocked());
RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
- // Acquire linger ID
- info->linger_id = ++max_linger_id;
- ldout(cct, 10) << __func__ << " info " << info
- << " linger_id " << info->linger_id << dendl;
- linger_ops[info->linger_id] = info;
+ assert(info->linger_id);
// Populate Op::target
OSDSession *s = NULL;
}
// caller owns a ref
- LingerOp *linger_watch(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime,
- bufferlist& inbl, uint64_t cookie, int flags,
- Context *onack, Context *onfinish, Context *onerror,
- version_t *objver);
- LingerOp *linger_notify(const object_t& oid, const object_locator_t& oloc,
+ LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
+ int flags);
+ ceph_tid_t linger_watch(LingerOp *info,
ObjectOperation& op,
- snapid_t snap, bufferlist& inbl,
- bufferlist *poutbl, int flags,
- Context *onack,
+ const SnapContext& snapc, utime_t mtime,
+ bufferlist& inbl, uint64_t cookie,
+ Context *onack, Context *onfinish, Context *onerror,
version_t *objver);
+ ceph_tid_t linger_notify(LingerOp *info,
+ ObjectOperation& op,
+ snapid_t snap, bufferlist& inbl,
+ bufferlist *poutbl,
+ Context *onack,
+ version_t *objver);
int linger_check(LingerOp *info);
void linger_cancel(LingerOp *info); // releases a reference
void _linger_cancel(LingerOp *info);