prepare_assert_ops(&wr);
wr.watch(*cookie, CEPH_OSD_WATCH_OP_WATCH);
bufferlist bl;
- wc->linger_id = objecter->linger_mutate(oid, oloc, wr,
- snapc, ceph_clock_now(NULL), bl,
- *cookie, 0,
- NULL, onfinish, &wc->on_error,
- &objver);
+ wc->linger_op = objecter->linger_watch(oid, oloc, wr,
+ snapc, ceph_clock_now(NULL), bl,
+ *cookie, 0,
+ NULL, onfinish, &wc->on_error,
+ &objver);
lock->Unlock();
mylock.Lock();
// Issue RADOS op
C_SaferCond onack;
version_t objver;
- wc->linger_id = objecter->linger_read(oid, oloc, rd, snap_seq, inbl, NULL, 0,
- &onack, &objver);
+ wc->linger_op = objecter->linger_notify(oid, oloc, rd, snap_seq, inbl, NULL, 0,
+ &onack, &objver);
lock->Unlock();
- ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_id << dendl;
+ ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_op << dendl;
int r_issue = onack.wait();
- ldout(client->cct, 10) << __func__ << " linger op " << wc->linger_id << " acked (" << r_issue << ")" << dendl;
+ ldout(client->cct, 10) << __func__ << " linger op " << wc->linger_op << " acked (" << r_issue << ")" << dendl;
if (r_issue == 0) {
- ldout(client->cct, 10) << __func__ << "waiting for watch_notify message for linger op " << wc->linger_id << dendl;
+ ldout(client->cct, 10) << __func__ << "waiting for watch_notify message for linger op " << wc->linger_op << dendl;
mylock_all.Lock();
while (!done_all)
cond_all.Wait(mylock_all);
mylock_all.Unlock();
}
- ldout(client->cct, 10) << __func__ << " completed notify (linger op " << wc->linger_id << "), unregistering" << dendl;
+ ldout(client->cct, 10) << __func__ << " completed notify (linger op " << wc->linger_op << "), unregistering" << dendl;
lock->Lock();
client->unregister_watch_notify_callback(cookie, NULL); // destroys wc
info->watch_lock.Unlock();
}
-int Objecter::linger_check(uint64_t linger_id)
+int Objecter::linger_check(LingerOp *info)
{
RWLock::WLocker wl(rwlock);
- map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
- if (iter == linger_ops.end()) {
- ldout(cct, 10) << __func__ << " " << linger_id << " dne" << dendl;
- return -EBADF;
- }
-
- LingerOp *info = iter->second;
utime_t age = ceph_clock_now(NULL) - info->watch_valid_thru;
- ldout(cct, 10) << __func__ << " " << linger_id
+ ldout(cct, 10) << __func__ << " " << info->linger_id
<< " err " << info->last_error
<< " age " << age << dendl;
if (info->last_error)
return age.to_msec();
}
-void Objecter::unregister_linger(uint64_t linger_id)
+void Objecter::linger_cancel(LingerOp *info)
{
RWLock::WLocker wl(rwlock);
- _unregister_linger(linger_id);
+ _linger_cancel(info);
+ info->put();
}
-void Objecter::_unregister_linger(uint64_t linger_id)
+void Objecter::_linger_cancel(LingerOp *info)
{
assert(rwlock.is_wlocked());
- ldout(cct, 20) << __func__ << " linger_id=" << linger_id << dendl;
-
- map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
- if (iter != linger_ops.end()) {
- LingerOp *info = iter->second;
+ ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
+ if (!info->canceled) {
OSDSession *s = info->session;
s->lock.get_write();
_session_linger_op_remove(s, info);
s->lock.unlock();
- linger_ops.erase(iter);
+ linger_ops.erase(info->linger_id);
info->canceled = true;
info->put();
}
}
-ceph_tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime,
- bufferlist& inbl, uint64_t cookie, int flags,
- Context *onack, Context *oncommit,
- Context *onerror,
- version_t *objver)
+Objecter::LingerOp *Objecter::linger_watch(const object_t& oid,
+ const object_locator_t& oloc,
+ ObjectOperation& op,
+ const SnapContext& snapc, utime_t mtime,
+ bufferlist& inbl, uint64_t cookie, int flags,
+ Context *onack, Context *oncommit,
+ Context *onerror,
+ version_t *objver)
{
LingerOp *info = new LingerOp;
info->target.base_oid = oid;
RWLock::WLocker wl(rwlock);
_linger_submit(info);
logger->inc(l_osdc_linger_active);
- return info->linger_id;
+
+ info->get(); // for the caller
+ return info;
}
-ceph_tid_t Objecter::linger_read(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
- Context *onfinish,
- version_t *objver)
+Objecter::LingerOp *Objecter::linger_notify(const object_t& oid,
+ const object_locator_t& oloc,
+ ObjectOperation& op,
+ snapid_t snap, bufferlist& inbl,
+ bufferlist *poutbl, int flags,
+ Context *onfinish,
+ version_t *objver)
{
LingerOp *info = new LingerOp;
info->target.base_oid = oid;
RWLock::WLocker wl(rwlock);
_linger_submit(info);
logger->inc(l_osdc_linger_active);
- return info->linger_id;
+ info->get(); // for the caller
+ return info;
}
void Objecter::_linger_submit(LingerOp *info)
{
assert(rwlock.is_wlocked());
- list<uint64_t> unregister_lingers;
+ list<LingerOp*> unregister_lingers;
RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
case RECALC_OP_TARGET_POOL_DNE:
_check_linger_pool_dne(op, &unregister);
if (unregister) {
- ldout(cct, 10) << " need to unregister linger op " << op->linger_id << dendl;
- unregister_lingers.push_back(op->linger_id);
+ ldout(cct, 10) << " need to unregister linger op "
+ << op->linger_id << dendl;
+ op->get();
+ unregister_lingers.push_back(op);
}
break;
}
s->lock.unlock();
- for (list<uint64_t>::iterator iter = unregister_lingers.begin(); iter != unregister_lingers.end(); ++iter) {
- _unregister_linger(*iter);
+ for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
+ iter != unregister_lingers.end();
+ ++iter) {
+ _linger_cancel(*iter);
+ (*iter)->put();
}
}
objecter->_check_linger_pool_dne(op, &unregister);
if (unregister) {
- objecter->_unregister_linger(op->linger_id);
+ objecter->_linger_cancel(op);
}
op->put();
}
return op_submit(o, ctx_budget);
}
- ceph_tid_t linger_mutate(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- const SnapContext& snapc, utime_t mtime,
- bufferlist& inbl, uint64_t cookie, int flags,
- Context *onack, Context *onfinish, Context *onerror,
- version_t *objver);
- ceph_tid_t linger_read(const object_t& oid, const object_locator_t& oloc,
- ObjectOperation& op,
- snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
- Context *onack,
- version_t *objver);
- int linger_check(uint64_t linger_id);
- void unregister_linger(uint64_t linger_id);
- void _unregister_linger(uint64_t linger_id);
+
+ // caller owns a ref
+ LingerOp *linger_watch(const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation& op,
+ const SnapContext& snapc, utime_t mtime,
+ bufferlist& inbl, uint64_t cookie, int flags,
+ Context *onack, Context *onfinish, Context *onerror,
+ version_t *objver);
+ LingerOp *linger_notify(const object_t& oid, const object_locator_t& oloc,
+ ObjectOperation& op,
+ snapid_t snap, bufferlist& inbl,
+ bufferlist *poutbl, int flags,
+ Context *onack,
+ version_t *objver);
+ int linger_check(LingerOp *info);
+ void linger_cancel(LingerOp *info); // releases a reference
+ void _linger_cancel(LingerOp *info);
/**
* set up initial ops in the op vector, and allocate a final op slot.