From b60a9abf561c14d2f72ea034fe7a6b5526637399 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 14 Dec 2010 10:47:19 -0800 Subject: [PATCH] objecter: fix up linger ack/commit to trigger first time only We only want the user-provided ack/commit callbacks to trigger the first time we register the lingering op. Same goes for the eversion_t *objver. Signed-off-by: Sage Weil --- src/osdc/Objecter.cc | 46 ++++++++++++++++++++++++++++++++++++++++---- src/osdc/Objecter.h | 36 ++++++++++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 238f61a1b2e60..eafd6e878c4d4 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -58,14 +58,49 @@ void Objecter::shutdown() { } -tid_t Objecter::resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver) +tid_t Objecter::resend_linger(LingerOp *info) { vector ops = info->ops; // need to pass a copy to ops - Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true); + Context *onack = NULL; + Context *oncommit = NULL; + if (!info->registered) { + if (info->on_reg_ack) + onack = new C_Linger_Ack(this, info); + if (info->on_reg_commit) + oncommit = new C_Linger_Commit(this, info); + } + Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, + onack, oncommit, + info->pobjver, true); o->snapid = info->snap; return op_submit(o, info); } +void Objecter::_linger_ack(LingerOp *info, int r) +{ + dout(10) << "_linger_ack " << info->linger_id << dendl; + if (info->on_reg_ack) { + info->on_reg_ack->finish(r); + delete info->on_reg_ack; + info->on_reg_ack = NULL; + } +} + +void Objecter::_linger_commit(LingerOp *info, int r) +{ + dout(10) << "_linger_commit " << info->linger_id << dendl; + if (info->on_reg_commit) { + info->on_reg_commit->finish(r); + delete info->on_reg_commit; + info->on_reg_commit = NULL; + } + + // only tell the user the first time we do this + info->registered = true; + info->pobjver = NULL; +} + + uint64_t Objecter::register_linger(LingerOp *info) { Mutex::Locker locker(linger_info_mutex); @@ -101,8 +136,11 @@ tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc, info->ops = op.ops; info->inbl = inbl; info->poutbl = poutbl; + info->pobjver = objver; + info->on_reg_ack = onack; + info->on_reg_commit = onfinish; uint64_t lid = register_linger(info); - resend_linger(info, onack, onfinish, objver); + resend_linger(info); return lid; } @@ -361,7 +399,7 @@ void Objecter::kick_requests(set& changed_pgs) // resend lingers for (xlist::iterator j = pg.linger_ops.begin(); !j.end(); ++j) - resend_linger(*j, NULL, NULL, NULL); + resend_linger(*j); if (pg.linger_ops.empty()) close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 02caa109d1aa6..9cfdda35a7a04 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -441,21 +441,29 @@ public: hash_map pg_map; + // --- + // lingering ops + struct LingerOp { uint64_t linger_id; object_t oid; object_locator_t oloc; - uint64_t off; - uint64_t len; snapid_t snap; int flags; vector ops; bufferlist inbl; bufferlist *poutbl; + eversion_t *pobjver; + + bool registered; + Context *on_reg_ack, *on_reg_commit; + PG *pg; xlist::item pg_item; - LingerOp() : linger_id(0), off(0), len(0), flags(0), poutbl(NULL), pg(NULL), pg_item(this) {} + LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL), + registered(false), on_reg_ack(NULL), on_reg_commit(NULL), + pg(NULL), pg_item(this) {} // no copy! const LingerOp &operator=(const LingerOp& r); @@ -465,6 +473,24 @@ public: map op_linger_info; Mutex linger_info_mutex; + struct C_Linger_Ack : public Context { + Objecter *objecter; + LingerOp *info; + C_Linger_Ack(Objecter *o, LingerOp *l) : objecter(o), info(l) {} + void finish(int r) { + objecter->_linger_ack(info, r); + } + }; + + struct C_Linger_Commit : public Context { + Objecter *objecter; + LingerOp *info; + C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {} + void finish(int r) { + objecter->_linger_commit(info, r); + } + }; + PG &get_pg(pg_t pgid); void close_pg(pg_t pgid) { @@ -542,7 +568,9 @@ private: // low-level tid_t op_submit(Op *op, LingerOp *linger_op = NULL); - tid_t resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver); + tid_t resend_linger(LingerOp *info); + void _linger_ack(LingerOp *info, int r); + void _linger_commit(LingerOp *info, int r); uint64_t register_linger(LingerOp *info); public: // FIXME void unregister_linger(uint64_t linger_id); -- 2.39.5