int err;
C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) {
info->get();
+ info->queued_async();
}
void finish(int r) {
info->watch_context->handle_error(info->linger_id, err);
+ info->finished_async();
info->put();
}
};
int Objecter::linger_check(LingerOp *info)
{
RWLock::WLocker wl(rwlock);
- utime_t age = ceph_clock_now(NULL) - info->watch_valid_thru;
+ Mutex::Locker l(info->watch_lock);
+
+ utime_t stamp = info->watch_valid_thru;
+ if (!info->watch_pending_async.empty())
+ stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
+ utime_t age = ceph_clock_now(NULL) - stamp;
+
ldout(cct, 10) << __func__ << " " << info->linger_id
<< " err " << info->last_error
<< " age " << age << dendl;
C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
: objecter(o), info(i), msg(m) {
info->get();
+ info->queued_async();
msg->get();
}
void finish(int r) {
}
out:
+ info->finished_async();
info->put();
m->put();
}
Mutex watch_lock;
Cond watch_cond;
+ // queue of pending async operations, with the timestamp of
+ // when they were queued.
+ list<utime_t> watch_pending_async;
+
uint32_t register_gen;
bool registered;
bool canceled;
ceph_tid_t ping_tid;
epoch_t map_dne_bound;
+ void queued_async() {
+ Mutex::Locker l(watch_lock);
+ watch_pending_async.push_back(ceph_clock_now(NULL));
+ }
+ void finished_async() {
+ Mutex::Locker l(watch_lock);
+ assert(!watch_pending_async.empty());
+ watch_pending_async.pop_front();
+ }
+
LingerOp() : linger_id(0),
target(object_t(), object_locator_t(), 0),
snap(CEPH_NOSNAP),