From 8a75086d5ea8a7422c16a719a66f5d23dc91e01c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 14 Dec 2010 10:35:40 -0800 Subject: [PATCH] objecter: clean up linger interface Put LingerOp on heap. Use xlist to attach to PGs. Add in/out bufferlists. Signed-off-by: Sage Weil --- src/librados.cc | 5 +-- src/osdc/Objecter.cc | 91 ++++++++++++++++---------------------------- src/osdc/Objecter.h | 35 ++++++++++------- 3 files changed, 57 insertions(+), 74 deletions(-) diff --git a/src/librados.cc b/src/librados.cc index 3aa3f2a345476..55ad0e5bafc2f 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -1600,9 +1600,8 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ pool.assert_ver = 0; } rd->watch(*cookie, ver, 1); - uint64_t linger_id; - objecter->linger(oid, oloc, *rd, pool.snap_seq, NULL, 0, onack, NULL, &objver, &linger_id); - wc->linger_id = linger_id; + bufferlist bl; + wc->linger_id = objecter->linger(oid, oloc, *rd, pool.snap_seq, bl, NULL, 0, onack, NULL, &objver); lock.Unlock(); mylock.Lock(); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index efece005271fb..238f61a1b2e60 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -58,76 +58,52 @@ void Objecter::shutdown() { } -tid_t Objecter::resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver) +tid_t Objecter::resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver) { - vector ops = info.ops; // need to pass a copy to ops - Op *o = new Op(info.oid, info.oloc, ops, info.flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true); - o->snapid = info.snap; - return op_submit(o, info.linger_id); + vector ops = info->ops; // need to pass a copy to ops + Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true); + o->snapid = info->snap; + return op_submit(o, info); } -tid_t Objecter::resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver) +uint64_t Objecter::register_linger(LingerOp *info) { Mutex::Locker locker(linger_info_mutex); - map::iterator iter = op_linger_info.find(linger_id); - if (iter != op_linger_info.end()) { - return resend_linger(iter->second, onack, onfinish, objver); - } else { - dout(0) << "WARNING: resend_linger(): could not find linger_id" << linger_id << dendl; // should that happen? - } - return -1; -} - -uint64_t Objecter::register_linger(LingerOpInfo& info, uint64_t linger_id) -{ - Mutex::Locker locker(linger_info_mutex); - if (!linger_id) - linger_id = ++max_linger_id; - - info.linger_id = linger_id; - - op_linger_info[linger_id] = info; - - return linger_id; + info->linger_id = ++max_linger_id; + op_linger_info[info->linger_id] = info; + return info->linger_id; } void Objecter::unregister_linger(uint64_t linger_id) { Mutex::Locker locker(linger_info_mutex); - map::iterator iter = op_linger_info.find(linger_id); + map::iterator iter = op_linger_info.find(linger_id); if (iter != op_linger_info.end()) { - LingerOpInfo& info = iter->second; - pg_t pgid = osdmap->object_locator_to_pg(info.oid, info.oloc); - - // find - PG &pg = get_pg(pgid); - map::iterator pg_iter = pg.linger_ops.find(linger_id); - if (pg_iter != pg.linger_ops.end()) - pg.linger_ops.erase(pg_iter); - + LingerOp *info = iter->second; + info->pg_item.remove_myself(); op_linger_info.erase(iter); + delete info; } } tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, - snapid_t snap, bufferlist *pbl, int flags, + snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags, Context *onack, Context *onfinish, - eversion_t *objver, - uint64_t *linger_id) + eversion_t *objver) { - uint64_t lid; - LingerOpInfo info; - info.oid = oid; - info.oloc = oloc; - info.snap = snap; - info.flags = flags; - info.ops = op.ops; - lid = register_linger(info, 0); - if (linger_id) - *linger_id = lid; - return resend_linger(info, onack, onfinish, objver); + LingerOp *info = new LingerOp; + info->oid = oid; + info->oloc = oloc; + info->snap = snap; + info->flags = flags; + info->ops = op.ops; + info->inbl = inbl; + info->poutbl = poutbl; + uint64_t lid = register_linger(info); + resend_linger(info, onack, onfinish, objver); + return lid; } void Objecter::dispatch(Message *m) @@ -382,11 +358,10 @@ void Objecter::kick_requests(set& changed_pgs) // resubmit ops! set tids; tids.swap( pg.active_tids ); - map::iterator liter; - for (liter = pg.linger_ops.begin(); liter != pg.linger_ops.end(); ++liter) { - resend_linger(liter->first, NULL, NULL, NULL); - } - dout(0) << "pg.linger_ops.empty()=" << pg.linger_ops.empty() << dendl; + + // resend lingers + for (xlist::iterator j = pg.linger_ops.begin(); !j.end(); ++j) + resend_linger(*j, NULL, NULL, NULL); if (pg.linger_ops.empty()) close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing @@ -488,7 +463,7 @@ void Objecter::resend_mon_ops() // read | write --------------------------- -tid_t Objecter::op_submit(Op *op, uint64_t linger_id) +tid_t Objecter::op_submit(Op *op, LingerOp *linger_op) { // throttle. before we look at any state, because // take_op_budget() may drop our lock while it blocks. @@ -500,8 +475,8 @@ tid_t Objecter::op_submit(Op *op, uint64_t linger_id) // find PG &pg = get_pg(op->pgid); - if (linger_id) - pg.linger_ops[linger_id] = true; + if (linger_op) + pg.linger_ops.push_back(&linger_op->pg_item); // pick tid op->tid = ++last_tid; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index a9376e8d1a732..02caa109d1aa6 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -416,12 +416,14 @@ public: * track pending ops by pg * ...so we can cope with failures, map changes */ + class LingerOp; + class PG { public: vector acting; set active_tids; // active ops utime_t last; - map linger_ops; + xlist linger_ops; PG() {} @@ -439,7 +441,7 @@ public: hash_map pg_map; - struct LingerOpInfo { + struct LingerOp { uint64_t linger_id; object_t oid; object_locator_t oloc; @@ -448,11 +450,19 @@ public: snapid_t snap; int flags; vector ops; + bufferlist inbl; + bufferlist *poutbl; PG *pg; - LingerOpInfo() : linger_id(0), off(0), len(0), flags(0), pg(NULL) {} + xlist::item pg_item; + + LingerOp() : linger_id(0), off(0), len(0), flags(0), poutbl(NULL), pg(NULL), pg_item(this) {} + + // no copy! + const LingerOp &operator=(const LingerOp& r); + LingerOp(const LingerOp& o); }; - map op_linger_info; + map op_linger_info; Mutex linger_info_mutex; @@ -530,7 +540,12 @@ public: private: // low-level - tid_t op_submit(Op *op, uint64_t linger_id = 0); + tid_t op_submit(Op *op, LingerOp *linger_op = NULL); + + tid_t resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver); + uint64_t register_linger(LingerOp *info); +public: // FIXME + void unregister_linger(uint64_t linger_id); // public interface public: @@ -568,17 +583,11 @@ private: o->outbl = pbl; return op_submit(o); } - - tid_t resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver); - tid_t resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver); - uint64_t register_linger(LingerOpInfo& info, uint64_t linger_id = 0); - void unregister_linger(uint64_t linger_id); - tid_t linger(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, - snapid_t snap, bufferlist *pbl, int flags, + snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags, Context *onack, Context *onfinish, - eversion_t *objver, uint64_t *linger_id); + eversion_t *objver); int init_ops(vector& ops, int ops_count, ObjectOperation *extra_ops) { -- 2.39.5