]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: Add pool awareness to the ObjectCacher, to prevent unfortunate collisions.
authorGreg Farnum <gregf@hq.newdream.net>
Mon, 25 Oct 2010 21:18:02 +0000 (14:18 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Mon, 25 Oct 2010 23:50:07 +0000 (16:50 -0700)
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h

index 3e8145b95da3bf3e39a3a7428c8785d264980f32..6654774ae7c41d206595e4179c0389968ec9aa6d 100644 (file)
@@ -427,7 +427,7 @@ void ObjectCacher::close_object(Object *ob)
   assert(ob->can_close());
   
   // ok!
-  objects.erase(ob->get_soid());
+  objects[ob->layout.ol_pgid.pool].erase(ob->get_soid());
   delete ob;
 }
 
@@ -441,7 +441,8 @@ void ObjectCacher::bh_read(BufferHead *bh)
   mark_rx(bh);
 
   // finisher
-  C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_soid(), bh->start(), bh->length());
+  C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->layout.ol_pgid.pool,
+                                            bh->ob->get_soid(), bh->start(), bh->length());
 
   ObjectSet *oset = bh->ob->oset;
 
@@ -453,7 +454,7 @@ void ObjectCacher::bh_read(BufferHead *bh)
                 onfinish);
 }
 
-void ObjectCacher::bh_read_finish(sobject_t oid, loff_t start, uint64_t length, bufferlist &bl)
+void ObjectCacher::bh_read_finish(int poolid, sobject_t oid, loff_t start, uint64_t length, bufferlist &bl)
 {
   //lock.Lock();
   dout(7) << "bh_read_finish " 
@@ -470,10 +471,10 @@ void ObjectCacher::bh_read_finish(sobject_t oid, loff_t start, uint64_t length,
     bl.push_back(bp);
   }
   
-  if (objects.count(oid) == 0) {
+  if (objects[poolid].count(oid) == 0) {
     dout(7) << "bh_read_finish no object cache" << dendl;
   } else {
-    Object *ob = objects[oid];
+    Object *ob = objects[poolid][oid];
     
     // apply to bh's!
     loff_t opos = start;
@@ -534,8 +535,10 @@ void ObjectCacher::bh_write(BufferHead *bh)
   dout(7) << "bh_write " << *bh << dendl;
   
   // finishers
-  C_WriteAck *onack = new C_WriteAck(this, bh->ob->get_soid(), bh->start(), bh->length());
-  C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_soid(), bh->start(), bh->length());
+  C_WriteAck *onack = new C_WriteAck(this, bh->ob->layout.ol_pgid.pool,
+                                     bh->ob->get_soid(), bh->start(), bh->length());
+  C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->layout.ol_pgid.pool,
+                                              bh->ob->get_soid(), bh->start(), bh->length());
 
   ObjectSet *oset = bh->ob->oset;
 
@@ -557,19 +560,19 @@ void ObjectCacher::bh_write(BufferHead *bh)
   mark_tx(bh);
 }
 
-void ObjectCacher::lock_ack(list<sobject_t>& oids, tid_t tid)
+void ObjectCacher::lock_ack(int poolid, list<sobject_t>& oids, tid_t tid)
 {
   for (list<sobject_t>::iterator i = oids.begin();
        i != oids.end();
        i++) {
     sobject_t oid = *i;
 
-    if (objects.count(oid) == 0) {
+    if (objects[poolid].count(oid) == 0) {
       dout(7) << "lock_ack no object cache" << dendl;
       assert(0);
     } 
     
-    Object *ob = objects[oid];
+    Object *ob = objects[poolid][oid];
 
     list<Context*> ls;
 
@@ -619,7 +622,7 @@ void ObjectCacher::lock_ack(list<sobject_t>& oids, tid_t tid)
   }
 }
 
-void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, uint64_t length, tid_t tid)
+void ObjectCacher::bh_write_ack(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
 {
   //lock.Lock();
   
@@ -628,11 +631,11 @@ void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, uint64_t length, ti
           << " tid " << tid
           << " " << start << "~" << length
           << dendl;
-  if (objects.count(oid) == 0) {
+  if (objects[poolid].count(oid) == 0) {
     dout(7) << "bh_write_ack no object cache" << dendl;
     assert(0);
   } else {
-    Object *ob = objects[oid];
+    Object *ob = objects[poolid][oid];
     
     // apply to bh's!
     for (map<loff_t, BufferHead*>::iterator p = ob->data.lower_bound(start);
@@ -686,7 +689,7 @@ void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, uint64_t length, ti
   //lock.Unlock();
 }
 
-void ObjectCacher::bh_write_commit(sobject_t oid, loff_t start, uint64_t length, tid_t tid)
+void ObjectCacher::bh_write_commit(int poolid, sobject_t oid, loff_t start, uint64_t length, tid_t tid)
 {
   //lock.Lock();
   
@@ -696,11 +699,11 @@ void ObjectCacher::bh_write_commit(sobject_t oid, loff_t start, uint64_t length,
           << " tid " << tid
           << " " << start << "~" << length
           << dendl;
-  if (objects.count(oid) == 0) {
+  if (objects[poolid].count(oid) == 0) {
     dout(7) << "bh_write_commit no object cache" << dendl;
     //assert(0);
   } else {
-    Object *ob = objects[oid];
+    Object *ob = objects[poolid][oid];
     
     // update last_commit.
     ob->last_commit_tid = tid;
@@ -1146,8 +1149,8 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock)
          ex_it != extents.end();
          ex_it++) {
       sobject_t soid(ex_it->oid, snapid);
-      assert(objects.count(soid));
-      Object *o = objects[soid];
+      assert(objects[oset->poolid].count(soid));
+      Object *o = objects[oset->poolid][soid];
       rdunlock(o);
     }
   }
@@ -1168,7 +1171,7 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock)
     // make sure we aren't already locking/locked...
     sobject_t oid(wr->extents.front().oid, CEPH_NOSNAP);
     Object *o = 0;
-    if (objects.count(oid))
+    if (objects[oset->poolid].count(oid))
       o = get_object(oid, oset, wr->extents.front().layout);
     if (!o || 
         (o->lock_state != Object::LOCK_WRLOCK &&
@@ -1222,8 +1225,8 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock)
        ex_it != extents.end();
        ex_it++) {
     sobject_t soid(ex_it->oid, CEPH_NOSNAP);
-    assert(objects.count(soid));
-    Object *o = objects[soid];
+    assert(objects[oset->poolid].count(soid));
+    Object *o = objects[oset->poolid][soid];
     
     wrunlock(o);
   }
@@ -1245,8 +1248,9 @@ void ObjectCacher::rdlock(Object *o)
     
     o->lock_state = Object::LOCK_RDLOCKING;
     
-    C_LockAck *ack = new C_LockAck(this, o->get_soid());
-    C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+    C_LockAck *ack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+    C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+                                              o->get_soid(), 0, 0);
     
     commit->tid = 
       ack->tid = 
@@ -1289,8 +1293,9 @@ void ObjectCacher::wrlock(Object *o)
       op = CEPH_OSD_OP_WRLOCK;
     }
     
-    C_LockAck *ack = new C_LockAck(this, o->get_soid());
-    C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+    C_LockAck *ack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+    C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+                                              o->get_soid(), 0, 0);
     
     commit->tid = 
       ack->tid = 
@@ -1334,8 +1339,9 @@ void ObjectCacher::rdunlock(Object *o)
 
   o->lock_state = Object::LOCK_RDUNLOCKING;
 
-  C_LockAck *lockack = new C_LockAck(this, o->get_soid());
-  C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+  C_LockAck *lockack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+  C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+                                            o->get_soid(), 0, 0);
   commit->tid = 
     lockack->tid = 
     o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDUNLOCK, 0, lockack, commit);
@@ -1366,8 +1372,9 @@ void ObjectCacher::wrunlock(Object *o)
     o->lock_state = Object::LOCK_WRUNLOCKING;
   }
 
-  C_LockAck *lockack = new C_LockAck(this, o->get_soid());
-  C_WriteCommit *commit = new C_WriteCommit(this, o->get_soid(), 0, 0);
+  C_LockAck *lockack = new C_LockAck(this, o->layout.ol_pgid.pool, o->get_soid());
+  C_WriteCommit *commit = new C_WriteCommit(this, o->layout.ol_pgid.pool,
+                                            o->get_soid(), 0, 0);
   commit->tid = 
     lockack->tid = 
     o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, lockack, commit);
@@ -1634,21 +1641,25 @@ uint64_t ObjectCacher::release_all()
   dout(10) << "release_all" << dendl;
   uint64_t unclean = 0;
   
-  hash_map<sobject_t, Object*>::iterator p = objects.begin();
-  while (p != objects.end()) {
-    hash_map<sobject_t, Object*>::iterator n = p;
-    n++;
-
-    Object *ob = p->second;
-
-    loff_t o_unclean = release(ob);
-    unclean += o_unclean;
-
-    if (o_unclean) 
-      dout(10) << "release_all " << *ob 
-               << " has " << o_unclean << " bytes left"
-               << dendl;
+  vector<hash_map<sobject_t, Object*> >::iterator i = objects.begin();
+  while (i != objects.end()) {
+    hash_map<sobject_t, Object*>::iterator p = i->begin();
+    while (p != i->end()) {
+      hash_map<sobject_t, Object*>::iterator n = p;
+      n++;
+
+      Object *ob = p->second;
+
+      loff_t o_unclean = release(ob);
+      unclean += o_unclean;
+
+      if (o_unclean)
+        dout(10) << "release_all " << *ob
+        << " has " << o_unclean << " bytes left"
+        << dendl;
     p = n;
+    }
+    ++i;
   }
 
   if (unclean) {
@@ -1675,9 +1686,9 @@ void ObjectCacher::truncate_set(ObjectSet *oset, vector<ObjectExtent>& exls)
        ++p) {
     ObjectExtent &ex = *p;
     sobject_t soid(ex.oid, CEPH_NOSNAP);
-    if (objects.count(soid) == 0)
+    if (objects[oset->poolid].count(soid) == 0)
       continue;
-    Object *ob = objects[soid];
+    Object *ob = objects[oset->poolid][soid];
     
     // purge or truncate?
     if (ex.offset == 0) {
@@ -1746,33 +1757,36 @@ void ObjectCacher::verify_stats() const
   dout(10) << "verify_stats" << dendl;
 
   loff_t clean = 0, dirty = 0, rx = 0, tx = 0, missing = 0;
-
-  for (hash_map<sobject_t, Object*>::const_iterator p = objects.begin();
-       p != objects.end();
-       p++) {
-    Object *ob = p->second;
-    for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
-        q != ob->data.end();
-        q++) {
-      BufferHead *bh = q->second;
-      switch (bh->get_state()) {
-      case BufferHead::STATE_MISSING:
-       missing += bh->length();
-       break;
-      case BufferHead::STATE_CLEAN:
-       clean += bh->length();
-       break;
-      case BufferHead::STATE_DIRTY: 
-       dirty += bh->length(); 
-       break;
-      case BufferHead::STATE_TX: 
-       tx += bh->length(); 
-       break;
-      case BufferHead::STATE_RX:
-       rx += bh->length();
-       break;
-      default:
-       assert(0);
+  for (vector<hash_map<sobject_t, Object*> >::const_iterator i = objects.begin();
+      i != objects.end();
+      ++i) {
+    for (hash_map<sobject_t, Object*>::const_iterator p = i->begin();
+        p != i->end();
+        ++p) {
+      Object *ob = p->second;
+      for (map<loff_t, BufferHead*>::const_iterator q = ob->data.begin();
+          q != ob->data.end();
+          q++) {
+        BufferHead *bh = q->second;
+        switch (bh->get_state()) {
+        case BufferHead::STATE_MISSING:
+          missing += bh->length();
+          break;
+        case BufferHead::STATE_CLEAN:
+          clean += bh->length();
+          break;
+        case BufferHead::STATE_DIRTY:
+          dirty += bh->length();
+          break;
+        case BufferHead::STATE_TX:
+          tx += bh->length();
+          break;
+        case BufferHead::STATE_RX:
+          rx += bh->length();
+          break;
+        default:
+          assert(0);
+        }
       }
     }
   }
index 5178eab3ef1d992efb79bbf6f027563a8d42f85d..563bf07908064e08d57e72adaa5a078514ab0d6b 100644 (file)
@@ -132,13 +132,12 @@ class ObjectCacher {
     // ObjectCacher::Object fields
     ObjectCacher *oc;
     sobject_t oid;
+    friend class ObjectSet;
+
   public:
     ObjectSet *oset;
     xlist<Object*>::item set_item;
-  private:
     ceph_object_layout layout;
-    
-    friend class ObjectSet;
 
   public:
     map<loff_t, BufferHead*>     data;
@@ -251,12 +250,14 @@ class ObjectCacher {
     inodeno_t ino;
     uint64_t truncate_seq, truncate_size;
 
+    int poolid;
     xlist<Object*> objects;
     xlist<Object*> uncommitted;
 
     int dirty_tx;
 
-    ObjectSet(void *p, inodeno_t i) : parent(p), ino(i), truncate_seq(0), truncate_size(0), dirty_tx(0) {}
+    ObjectSet(void *p, int _poolid, inodeno_t i) : parent(p), ino(i), truncate_seq(0),
+                                      truncate_size(0), poolid(_poolid), dirty_tx(0) {}
   };
 
 
@@ -272,7 +273,7 @@ class ObjectCacher {
   flush_set_callback_t flush_set_callback, commit_set_callback;
   void *flush_set_callback_arg;
 
-  hash_map<sobject_t, Object*> objects;
+  vector<hash_map<sobject_t, Object*> > objects; // indexed by pool_id
 
   set<BufferHead*>    dirty_bh;
   LRU   lru_dirty, lru_rest;
@@ -294,19 +295,24 @@ class ObjectCacher {
   // objects
   Object *get_object_maybe(sobject_t oid, ceph_object_layout &l) {
     // have it?
-    if (objects.count(oid))
-      return objects[oid];
+    if ((l.ol_pgid.pool < objects.size()) &&
+        (objects[l.ol_pgid.pool].count(oid)))
+      return objects[l.ol_pgid.pool][oid];
     return NULL;
   }
 
   Object *get_object(sobject_t oid, ObjectSet *oset, ceph_object_layout &l) {
     // have it?
-    if (objects.count(oid))
-      return objects[oid];
+    if (l.ol_pgid.pool < objects.size()) {
+      if (objects[l.ol_pgid.pool].count(oid))
+        return objects[l.ol_pgid.pool][oid];
+    } else {
+      objects.resize(l.ol_pgid.pool+1);
+    }
 
     // create it.
     Object *o = new Object(this, oid, oset, l);
-    objects[oid] = o;
+    objects[l.ol_pgid.pool][oid] = o;
     return o;
   }
   void close_object(Object *ob);
@@ -450,59 +456,66 @@ class ObjectCacher {
   void wrunlock(Object *o);
 
  public:
-  void bh_read_finish(sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl);
-  void bh_write_ack(sobject_t oid, loff_t offset, uint64_t length, tid_t t);
-  void bh_write_commit(sobject_t oid, loff_t offset, uint64_t length, tid_t t);
-  void lock_ack(list<sobject_t>& oids, tid_t tid);
+  void bh_read_finish(int poolid, sobject_t oid, loff_t offset, uint64_t length, bufferlist &bl);
+  void bh_write_ack(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
+  void bh_write_commit(int poolid, sobject_t oid, loff_t offset, uint64_t length, tid_t t);
+  void lock_ack(int poolid, list<sobject_t>& oids, tid_t tid);
 
   class C_ReadFinish : public Context {
     ObjectCacher *oc;
+    int poolid;
     sobject_t oid;
     loff_t start;
     uint64_t length;
   public:
     bufferlist bl;
-    C_ReadFinish(ObjectCacher *c, sobject_t o, loff_t s, uint64_t l) : oc(c), oid(o), start(s), length(l) {}
+    C_ReadFinish(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
+      oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
     void finish(int r) {
-      oc->bh_read_finish(oid, start, length, bl);
+      oc->bh_read_finish(poolid, oid, start, length, bl);
     }
   };
 
   class C_WriteAck : public Context {
     ObjectCacher *oc;
+    int poolid;
     sobject_t oid;
     loff_t start;
     uint64_t length;
   public:
     tid_t tid;
-    C_WriteAck(ObjectCacher *c, sobject_t o, loff_t s, uint64_t l) : oc(c), oid(o), start(s), length(l) {}
+    C_WriteAck(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
+      oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
     void finish(int r) {
-      oc->bh_write_ack(oid, start, length, tid);
+      oc->bh_write_ack(poolid, oid, start, length, tid);
     }
   };
   class C_WriteCommit : public Context {
     ObjectCacher *oc;
+    int poolid;
     sobject_t oid;
     loff_t start;
     uint64_t length;
   public:
     tid_t tid;
-    C_WriteCommit(ObjectCacher *c, sobject_t o, loff_t s, uint64_t l) : oc(c), oid(o), start(s), length(l) {}
+    C_WriteCommit(ObjectCacher *c, int _poolid, sobject_t o, loff_t s, uint64_t l) :
+      oc(c), poolid(_poolid), oid(o), start(s), length(l) {}
     void finish(int r) {
-      oc->bh_write_commit(oid, start, length, tid);
+      oc->bh_write_commit(poolid, oid, start, length, tid);
     }
   };
 
   class C_LockAck : public Context {
     ObjectCacher *oc;
   public:
+    int poolid;
     list<sobject_t> oids;
     tid_t tid;
-    C_LockAck(ObjectCacher *c, sobject_t o) : oc(c) {
+    C_LockAck(ObjectCacher *c, int _poolid, sobject_t o) : oc(c), poolid(_poolid) {
       oids.push_back(o);
     }
     void finish(int r) {
-      oc->lock_ack(oids, tid);
+      oc->lock_ack(poolid, oids, tid);
     }
   };