l_osdc_linger_active,
l_osdc_linger_send,
l_osdc_linger_resend,
+ l_osdc_linger_ping,
l_osdc_poolop_active,
l_osdc_poolop_send,
pcb.add_u64(l_osdc_linger_active, "linger_active");
pcb.add_u64_counter(l_osdc_linger_send, "linger_send");
pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend");
+ pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping");
pcb.add_u64(l_osdc_poolop_active, "poolop_active");
pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send");
}
}
+void Objecter::_send_linger_ping(LingerOp *info)
+{
+ assert(rwlock.is_locked());
+ assert(info->session->lock.is_locked());
+
+ utime_t now = ceph_clock_now(NULL);
+ ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now << dendl;
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
+
+ vector<OSDOp> opv(1);
+ opv[0].op.op = CEPH_OSD_OP_WATCH;
+ opv[0].op.watch.cookie = info->cookie;
+ opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
+ C_Linger_Ping *onack = new C_Linger_Ping(this, info);
+ Op *o = new Op(info->target.base_oid, info->target.base_oloc,
+ opv, info->target.flags | CEPH_OSD_FLAG_READ,
+ onack, NULL, NULL);
+ o->target = info->target;
+ o->should_resend = false;
+ _send_op_account(o);
+ MOSDOp *m = _prepare_osd_op(o);
+ o->tid = last_tid.inc();
+ _session_op_assign(info->session, o);
+ _send_op(o, m);
+ info->ping_tid = o->tid;
+
+ onack->sent = now;
+ logger->inc(l_osdc_linger_ping);
+}
+
+void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent)
+{
+ ldout(cct, 10) << __func__ << " " << info->linger_id
+ << " sent " << sent << " = " << r
+ << " (last_error " << info->last_error << ")" << dendl;
+ info->watch_lock.Lock();
+ if (r == 0) {
+ info->watch_valid_thru = sent;
+ } else if (r < 0) {
+ info->last_error = r;
+ if (info->on_error)
+ info->on_error->complete(r);
+ }
+ info->watch_cond.SignalAll();
+ info->watch_lock.Unlock();
+}
+
void Objecter::unregister_linger(uint64_t linger_id)
{
RWLock::WLocker wl(rwlock);
assert(op->session);
ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
toping.insert(op->session);
+ _send_linger_ping(op);
}
for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
p != s->command_ops.end();
version_t *pobjver;
uint64_t cookie; ///< non-zero if this is a watch
+ utime_t watch_valid_thru; ///< send time for last acked ping
int last_error; ///< error from last failed ping|reconnect, if any
Mutex watch_lock;
Cond watch_cond;
OSDSession *session;
ceph_tid_t register_tid;
+ ceph_tid_t ping_tid;
epoch_t map_dne_bound;
LingerOp() : linger_id(0),
on_error(NULL),
session(NULL),
register_tid(0),
+ ping_tid(0),
map_dne_bound(0) {}
// no copy!
}
};
+ struct C_Linger_Ping : public Context {
+ Objecter *objecter;
+ LingerOp *info;
+ utime_t sent;
+ C_Linger_Ping(Objecter *o, LingerOp *l) : objecter(o), info(l) {
+ info->get();
+ }
+ ~C_Linger_Ping() {
+ info->put();
+ }
+ void finish(int r) {
+ objecter->_linger_ping(info, r, sent);
+ }
+ };
+
struct C_Linger_Map_Latest : public Context {
Objecter *objecter;
uint64_t linger_id;
void _linger_register(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);
void _linger_reconnect(LingerOp *info, int r);
+ void _send_linger_ping(LingerOp *info);
+ void _linger_ping(LingerOp *info, int r, utime_t sent);
void _check_op_pool_dne(Op *op, bool session_locked);
void _send_op_map_check(Op *op);