RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
- ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
- vector<OSDOp> opv = info->ops; // need to pass a copy to ops
- Context *onack = (!info->registered && info->on_reg_ack) ?
- new C_Linger_Ack(this, info) : NULL;
+ vector<OSDOp> opv;
+ Context *onack = NULL;
+ if (info->registered) {
+ ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl;
+ onack = new C_Linger_Reconnect(this, info);
+ opv.push_back(OSDOp());
+ opv.back().op.op = CEPH_OSD_OP_WATCH;
+ opv.back().op.watch.cookie = info->cookie;
+ opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
+ } else {
+ ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;
+ onack = new C_Linger_Register(this, info);
+ opv = info->ops;
+ }
Context *oncommit = new C_Linger_Commit(this, info);
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
opv, info->target.flags | CEPH_OSD_FLAG_READ,
logger->inc(l_osdc_linger_send);
}
-void Objecter::_linger_ack(LingerOp *info, int r)
+void Objecter::_linger_register(LingerOp *info, int r)
{
- ldout(cct, 10) << "_linger_ack " << info->linger_id << dendl;
+ ldout(cct, 10) << "_linger_register " << info->linger_id << dendl;
if (info->on_reg_ack) {
info->on_reg_ack->complete(r);
info->on_reg_ack = NULL;
info->pobjver = NULL;
}
+void Objecter::_linger_reconnect(LingerOp *info, int r)
+{
+ ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
+ << " (last_error " << info->last_error << ")" << dendl;
+}
+
void Objecter::unregister_linger(uint64_t linger_id)
{
RWLock::WLocker wl(rwlock);
info->mtime = mtime;
info->target.flags = flags | CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
+ info->cookie = cookie;
info->inbl = inbl;
info->poutbl = NULL;
info->pobjver = objver;
bufferlist *poutbl;
version_t *pobjver;
+ uint64_t cookie; ///< non-zero if this is a watch
+ int last_error; ///< error from last failed ping|reconnect, if any
bool registered;
bool canceled;
Context *on_reg_ack, *on_reg_commit;
target(object_t(), object_locator_t(), 0),
snap(CEPH_NOSNAP),
poutbl(NULL), pobjver(NULL),
+ cookie(0),
+ last_error(0),
registered(false),
canceled(false),
on_reg_ack(NULL), on_reg_commit(NULL),
~LingerOp() {}
};
- struct C_Linger_Ack : public Context {
+ struct C_Linger_Register : public Context {
Objecter *objecter;
LingerOp *info;
- C_Linger_Ack(Objecter *o, LingerOp *l) : objecter(o), info(l) {
+ C_Linger_Register(Objecter *o, LingerOp *l) : objecter(o), info(l) {
info->get();
}
- ~C_Linger_Ack() {
+ ~C_Linger_Register() {
info->put();
}
void finish(int r) {
- objecter->_linger_ack(info, r);
+ objecter->_linger_register(info, r);
}
};
}
};
+ struct C_Linger_Reconnect : public Context {
+ Objecter *objecter;
+ LingerOp *info;
+ C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
+ info->get();
+ }
+ ~C_Linger_Reconnect() {
+ info->put();
+ }
+ void finish(int r) {
+ objecter->_linger_reconnect(info, r);
+ }
+ };
+
struct C_Linger_Map_Latest : public Context {
Objecter *objecter;
uint64_t linger_id;
void _linger_submit(LingerOp *info);
void _send_linger(LingerOp *info);
- void _linger_ack(LingerOp *info, int r);
+ void _linger_register(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);
+ void _linger_reconnect(LingerOp *info, int r);
void _check_op_pool_dne(Op *op, bool session_locked);
void _send_op_map_check(Op *op);