]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: clean up linger interface
authorSage Weil <sage@newdream.net>
Tue, 14 Dec 2010 18:35:40 +0000 (10:35 -0800)
committerSage Weil <sage@newdream.net>
Tue, 14 Dec 2010 18:35:40 +0000 (10:35 -0800)
Put LingerOp on heap.  Use xlist to attach to PGs.  Add in/out bufferlists.

Signed-off-by: Sage Weil <sage@newdream.net>
src/librados.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 3aa3f2a34547657bdf396c02711f4d484ee6a3fa..55ad0e5bafc2f4eb77fc495a06f0e4a0fc6a4d40 100644 (file)
@@ -1600,9 +1600,8 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
     pool.assert_ver = 0;
   }
   rd->watch(*cookie, ver, 1);
-  uint64_t linger_id;
-  objecter->linger(oid, oloc, *rd, pool.snap_seq, NULL, 0, onack, NULL, &objver, &linger_id);
-  wc->linger_id = linger_id;
+  bufferlist bl;
+  wc->linger_id = objecter->linger(oid, oloc, *rd, pool.snap_seq, bl, NULL, 0, onack, NULL, &objver);
   lock.Unlock();
 
   mylock.Lock();
index efece005271fb30fb44dd7bfebd1b780e6833e28..238f61a1b2e60d66e873705fdb940bd0ca5ed7e0 100644 (file)
@@ -58,76 +58,52 @@ void Objecter::shutdown()
 {
 }
 
-tid_t Objecter::resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver)
+tid_t Objecter::resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver)
 {
-  vector<OSDOp> ops = info.ops; // need to pass a copy to ops
-  Op *o = new Op(info.oid, info.oloc, ops, info.flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true);
-  o->snapid = info.snap;
-  return op_submit(o, info.linger_id);
+  vector<OSDOp> ops = info->ops; // need to pass a copy to ops
+  Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true);
+  o->snapid = info->snap;
+  return op_submit(o, info);
 }
 
-tid_t Objecter::resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver)
+uint64_t Objecter::register_linger(LingerOp *info)
 {
   Mutex::Locker locker(linger_info_mutex);
-  map<uint64_t, LingerOpInfo>::iterator iter = op_linger_info.find(linger_id);
-  if (iter != op_linger_info.end()) {
-    return resend_linger(iter->second, onack, onfinish, objver);
-  } else {
-    dout(0) << "WARNING: resend_linger(): could not find linger_id" << linger_id << dendl; // should that happen?
-  }
-  return -1;
-}
-
-uint64_t Objecter::register_linger(LingerOpInfo& info, uint64_t linger_id)
-{
-  Mutex::Locker locker(linger_info_mutex);
-  if (!linger_id)
-    linger_id = ++max_linger_id;
-
-  info.linger_id = linger_id;
-  op_linger_info[linger_id] = info;
-
-  return linger_id;
+  info->linger_id = ++max_linger_id;
+  op_linger_info[info->linger_id] = info;
+  return info->linger_id;
 }
 
 
 void Objecter::unregister_linger(uint64_t linger_id)
 {
   Mutex::Locker locker(linger_info_mutex);
-  map<uint64_t, LingerOpInfo>::iterator iter = op_linger_info.find(linger_id);
+  map<uint64_t, LingerOp*>::iterator iter = op_linger_info.find(linger_id);
   if (iter != op_linger_info.end()) {
-    LingerOpInfo& info = iter->second;
-    pg_t pgid = osdmap->object_locator_to_pg(info.oid, info.oloc);
-
-    // find
-    PG &pg = get_pg(pgid);
-    map<uint64_t, bool>::iterator pg_iter = pg.linger_ops.find(linger_id);
-    if (pg_iter != pg.linger_ops.end())
-      pg.linger_ops.erase(pg_iter);
-
+    LingerOp *info = iter->second;
+    info->pg_item.remove_myself();
     op_linger_info.erase(iter);
+    delete info;
   }
 }
 
 tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc, 
                       ObjectOperation& op,
-                      snapid_t snap, bufferlist *pbl, int flags,
+                      snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
                       Context *onack, Context *onfinish,
-                      eversion_t *objver,
-                       uint64_t *linger_id)
+                      eversion_t *objver)
 {
-  uint64_t lid;
-  LingerOpInfo info;
-  info.oid = oid;
-  info.oloc = oloc;
-  info.snap = snap;
-  info.flags = flags;
-  info.ops = op.ops;
-  lid = register_linger(info, 0);
-  if (linger_id)
-    *linger_id = lid;
-  return resend_linger(info, onack, onfinish, objver);
+  LingerOp *info = new LingerOp;
+  info->oid = oid;
+  info->oloc = oloc;
+  info->snap = snap;
+  info->flags = flags;
+  info->ops = op.ops;
+  info->inbl = inbl;
+  info->poutbl = poutbl;
+  uint64_t lid = register_linger(info);
+  resend_linger(info, onack, onfinish, objver);
+  return lid;
 }
 
 void Objecter::dispatch(Message *m)
@@ -382,11 +358,10 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
     // resubmit ops!
     set<tid_t> tids;
     tids.swap( pg.active_tids );
-    map<tid_t, bool>::iterator liter;
-    for (liter = pg.linger_ops.begin(); liter != pg.linger_ops.end(); ++liter) {
-      resend_linger(liter->first, NULL, NULL, NULL);
-    }
-    dout(0) << "pg.linger_ops.empty()=" << pg.linger_ops.empty() << dendl;
+
+    // resend lingers
+    for (xlist<LingerOp*>::iterator j = pg.linger_ops.begin(); !j.end(); ++j)
+      resend_linger(*j, NULL, NULL, NULL);
 
     if (pg.linger_ops.empty())
       close_pg( pgid );  // will pbly reopen, unless it's just commits we're missing
@@ -488,7 +463,7 @@ void Objecter::resend_mon_ops()
 
 // read | write ---------------------------
 
-tid_t Objecter::op_submit(Op *op, uint64_t linger_id)
+tid_t Objecter::op_submit(Op *op, LingerOp *linger_op)
 {
   // throttle.  before we look at any state, because
   // take_op_budget() may drop our lock while it blocks.
@@ -500,8 +475,8 @@ tid_t Objecter::op_submit(Op *op, uint64_t linger_id)
   // find
   PG &pg = get_pg(op->pgid);
 
-  if (linger_id)
-    pg.linger_ops[linger_id] = true;
+  if (linger_op)
+    pg.linger_ops.push_back(&linger_op->pg_item);
     
   // pick tid
   op->tid = ++last_tid;
index a9376e8d1a7320abe50360f6f466135ac008460e..02caa109d1aa683376e99a7fd8094b4ea19eefd3 100644 (file)
@@ -416,12 +416,14 @@ public:
    * track pending ops by pg
    *  ...so we can cope with failures, map changes
    */
+  class LingerOp;
+
   class PG {
   public:
     vector<int> acting;
     set<tid_t>  active_tids; // active ops
     utime_t last;
-    map<uint64_t, bool> linger_ops;
+    xlist<LingerOp*> linger_ops;
 
     PG() {}
     
@@ -439,7 +441,7 @@ public:
 
   hash_map<pg_t,PG> pg_map;
 
-  struct LingerOpInfo {
+  struct LingerOp {
     uint64_t linger_id;
     object_t oid;
     object_locator_t oloc;
@@ -448,11 +450,19 @@ public:
     snapid_t snap;
     int flags;
     vector<OSDOp> ops;
+    bufferlist inbl;
+    bufferlist *poutbl;
     PG *pg;
-    LingerOpInfo() : linger_id(0), off(0), len(0), flags(0), pg(NULL) {}
+    xlist<LingerOp*>::item pg_item;
+
+    LingerOp() : linger_id(0), off(0), len(0), flags(0), poutbl(NULL), pg(NULL), pg_item(this) {}
+
+    // no copy!
+    const LingerOp &operator=(const LingerOp& r);
+    LingerOp(const LingerOp& o);
   };
 
-  map<uint64_t, LingerOpInfo>  op_linger_info;
+  map<uint64_t, LingerOp*>  op_linger_info;
   Mutex linger_info_mutex;
 
   
@@ -530,7 +540,12 @@ public:
 
 private:
   // low-level
-  tid_t op_submit(Op *op, uint64_t linger_id = 0);
+  tid_t op_submit(Op *op, LingerOp *linger_op = NULL);
+
+  tid_t resend_linger(LingerOp *info, Context *onack, Context *onfinish, eversion_t *objver);
+  uint64_t register_linger(LingerOp *info);
+public: // FIXME
+  void unregister_linger(uint64_t linger_id);
 
   // public interface
  public:
@@ -568,17 +583,11 @@ private:
     o->outbl = pbl;
     return op_submit(o);
   }
-
-  tid_t resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver);
-  tid_t resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver);
-  uint64_t register_linger(LingerOpInfo& info, uint64_t linger_id = 0);
-  void unregister_linger(uint64_t linger_id);
-
   tid_t linger(const object_t& oid, const object_locator_t& oloc, 
               ObjectOperation& op,
-              snapid_t snap, bufferlist *pbl, int flags,
+              snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
                Context *onack, Context *onfinish,
-               eversion_t *objver, uint64_t *linger_id);
+               eversion_t *objver);
 
 
   int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {