]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
objectcacher: use ObjectSet container instead of inodeno_t hash_maps
authorSage Weil <sage@newdream.net>
Thu, 11 Feb 2010 23:11:23 +0000 (15:11 -0800)
committerSage Weil <sage@newdream.net>
Thu, 11 Feb 2010 23:11:23 +0000 (15:11 -0800)
Caller provides an ObjectSet* to group objects into.
Later we can put other info here, like truncate_seq and
truncate_size.

src/client/Client.cc
src/client/Client.h
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/osdc/Objecter.h

index a2367cb0f7aa757a925faee11813d8ca4301c7cf..1153b5f75a96ab2b6a582966836b17d25cf66275 100644 (file)
@@ -95,10 +95,10 @@ ostream& operator<<(ostream &out, Inode &in)
 }
 
 
-void client_flush_set_callback(void *p, inodeno_t ino)
+void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
 {
   Client *client = (Client*)p;
-  client->flush_set_callback(ino);
+  client->flush_set_callback(oset);
 }
 
 
@@ -1528,7 +1528,7 @@ void Client::put_inode(Inode *in, int n)
     remove_all_caps(in);
 
     //cout << "put_inode deleting " << in << " " << in->ino << std::endl;
-    objectcacher->release_set(in->ino);
+    objectcacher->release_set(&in->oset);
     if (in->snapdir_parent)
       put_inode(in->snapdir_parent);
     inode_map.erase(in->vino());
@@ -1961,7 +1961,7 @@ void Client::wake_inode_waiters(int mds_num)
 void Client::_release(Inode *in, bool checkafter)
 {
   if (in->cap_refs[CEPH_CAP_FILE_CACHE]) {
-    objectcacher->release_set(in->ino);
+    objectcacher->release_set(&in->oset);
     if (checkafter)
       put_cap_ref(in, CEPH_CAP_FILE_CACHE);
     else 
@@ -1975,18 +1975,18 @@ void Client::_flush(Inode *in, Context *onfinish)
   dout(10) << "_flush " << *in << dendl;
   if (!onfinish)
     onfinish = new C_NoopContext;
-  bool safe = objectcacher->commit_set(in->ino, onfinish);
+  bool safe = objectcacher->commit_set(&in->oset, onfinish);
   if (safe && onfinish) {
     onfinish->finish(0);
     delete onfinish;
   }
 }
 
-void Client::flush_set_callback(inodeno_t ino)
+void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
 {
   //  Mutex::Locker l(client_lock);
   assert(client_lock.is_locked());   // will be called via dispatch() -> objecter -> ...
-  Inode *in = inode_map[vinodeno_t(ino,CEPH_NOSNAP)];
+  Inode *in = (Inode *)oset->parent;
   if (in)
     _flushed(in);
 }
@@ -1998,7 +1998,7 @@ void Client::_flushed(Inode *in)
 
   // release clean pages too, if we dont hold RDCACHE reference
   if (in->cap_refs[CEPH_CAP_FILE_CACHE] == 0)
-    objectcacher->release_set(in->ino);
+    objectcacher->release_set(&in->oset);
 
   put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
 }
@@ -2555,7 +2555,7 @@ void Client::handle_cap_trunc(Inode *in, MClientCaps *m)
     filer->file_to_extents(in->ino, &in->layout, 
                           m->get_size(), in->size - m->get_size(),
                           ls);
-    objectcacher->truncate_set(in->ino, ls);
+    objectcacher->truncate_set(&in->oset, ls);
   }
   
   in->reported_size = in->size = m->get_size(); 
@@ -4361,10 +4361,10 @@ int Client::_read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl)
             << " min " << min
             << " (caller wants " << off << "~" << len << ")" << dendl;
     if (l > (loff_t)len) {
-      if (objectcacher->file_is_cached(in->ino, &in->layout, in->snapid, off, min))
+      if (objectcacher->file_is_cached(&in->oset, &in->layout, in->snapid, off, min))
        dout(20) << "readahead already have min" << dendl;
       else {
-       objectcacher->file_read(in->ino, &in->layout, in->snapid, off, l, NULL, 0, 0);
+       objectcacher->file_read(&in->oset, &in->layout, in->snapid, off, l, NULL, 0, 0);
        dout(20) << "readahead initiated" << dendl;
       }
     }
@@ -4377,10 +4377,10 @@ int Client::_read_async(Fh *f, __u64 off, __u64 len, bufferlist *bl)
   bool done = false;
   Context *onfinish = new C_SafeCond(&flock, &cond, &done, &rvalue);
   if (in->snapid == CEPH_NOSNAP)
-    r = objectcacher->file_read(in->ino, &in->layout, in->snapid,
+    r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
                                off, len, bl, 0, onfinish);
   else
-    r = objectcacher->file_read(in->ino, &in->layout, in->snapid,
+    r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
                                off, len, bl, 0, onfinish);
   if (r == 0) {
     while (!done) 
@@ -4563,7 +4563,7 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
     objectcacher->wait_for_write(size, client_lock);
     
     // async, caching, non-blocking.
-    objectcacher->file_write(in->ino, &in->layout, in->snaprealm->get_snap_context(),
+    objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
                             offset, size, bl, g_clock.now(), 0);
   } else {
     /*
index e00c23fb9da4da94ddfdcf15c7ed1fe4c2a0c34b..f25f9e338a5552a8bd5f3bb439885e526b6b3b9b 100644 (file)
@@ -62,6 +62,7 @@ using std::fstream;
 using namespace __gnu_cxx;
 
 
+#include "osdc/ObjectCacher.h"
 
 class MClientSession;
 class MClientRequest;
@@ -404,6 +405,8 @@ class Inode {
   map<int,int> open_by_mode;
   map<int,int> cap_refs;
 
+  ObjectCacher::ObjectSet oset;
+
   __u64     reported_size, wanted_max_size, requested_max_size;
 
   int       ref;      // ref count. 1 for each dentry, fh that links to me.
@@ -480,6 +483,7 @@ class Inode {
     exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
     cap_item(this), flushing_cap_item(this), last_flush_tid(0),
     snaprealm(0), snaprealm_item(this), snapdir_parent(0),
+    oset((void *)this, ino),
     reported_size(0), wanted_max_size(0), requested_max_size(0),
     ref(0), ll_ref(0), 
     dir(0), dn(0),
@@ -1138,7 +1142,7 @@ protected:
   void _release(Inode *in, bool checkafter=true);
   void _flush(Inode *in, Context *onfinish=NULL);
   void _flushed(Inode *in);
-  void flush_set_callback(inodeno_t ino);
+  void flush_set_callback(ObjectCacher::ObjectSet *oset);
 
   void close_release(Inode *in);
   void close_safe(Inode *in);
index 3916410f87be8b50aaac7b7a7780e7b5bef68ccf..4e6c4df63fd0ba5a8662be5ec02933c728b7f4ea 100644 (file)
@@ -419,9 +419,6 @@ void ObjectCacher::close_object(Object *ob)
   
   // ok!
   objects.erase(ob->get_soid());
-  objects_by_ino[ob->get_ino()].erase(ob);
-  if (objects_by_ino[ob->get_ino()].empty())
-       objects_by_ino.erase(ob->get_ino());
   delete ob;
 }
 
@@ -539,9 +536,8 @@ void ObjectCacher::bh_write(BufferHead *bh)
   oncommit->tid = tid;
   bh->ob->last_write_tid = tid;
   bh->last_write_tid = tid;
-  if (commit_set_callback) {
-    uncommitted_by_ino[bh->ob->get_ino()].push_back(&bh->ob->uncommitted_item);
-  }
+  if (commit_set_callback)
+    bh->ob->oset->uncommitted.push_back(&bh->ob->uncommitted_item);
 
   mark_tx(bh);
 }
@@ -668,10 +664,8 @@ void ObjectCacher::bh_write_ack(sobject_t oid, loff_t start, __u64 length, tid_t
     }
 
     // is the entire object set now clean?
-    if (flush_set_callback && 
-       dirty_tx_by_ino[ob->get_ino()] == 0) {
-      flush_set_callback(flush_set_callback_arg, ob->get_ino());
-      dirty_tx_by_ino.erase(ob->get_ino());
+    if (flush_set_callback && ob->oset->dirty_tx == 0) {
+      flush_set_callback(flush_set_callback_arg, ob->oset);
     }
   }
   //lock.Unlock();
@@ -708,13 +702,12 @@ void ObjectCacher::bh_write_commit(sobject_t oid, loff_t start, __u64 length, ti
     if (commit_set_callback &&
        ob->last_commit_tid == ob->last_write_tid) {
       ob->uncommitted_item.remove_myself();
-      inodeno_t ino = ob->get_ino();
+      ObjectSet *oset = ob->oset;
       if (ob->can_close())
        close_object(ob);
-      if (uncommitted_by_ino[ino].empty()) {  // no uncommitted in flight
-       uncommitted_by_ino.erase(ino);
-       if (dirty_tx_by_ino[ino] == 0)        // AND nothing dirty/tx
-         commit_set_callback(flush_set_callback_arg, ino);      
+      if (oset->uncommitted.empty()) {  // no uncommitted in flight
+       if (oset->dirty_tx == 0)        // AND nothing dirty/tx
+         commit_set_callback(flush_set_callback_arg, oset);      
       }
     }
   }
@@ -782,7 +775,7 @@ void ObjectCacher::trim(loff_t max)
 
 /* public */
 
-bool ObjectCacher::is_cached(inodeno_t ino, vector<ObjectExtent>& extents, snapid_t snapid)
+bool ObjectCacher::is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid)
 {
   for (vector<ObjectExtent>::iterator ex_it = extents.begin();
        ex_it != extents.end();
@@ -791,7 +784,7 @@ bool ObjectCacher::is_cached(inodeno_t ino, vector<ObjectExtent>& extents, snapi
 
     // get Object cache
     sobject_t soid(ex_it->oid, snapid);
-    Object *o = get_object_maybe(soid, ino, ex_it->layout);
+    Object *o = get_object_maybe(soid, ex_it->layout);
     if (!o)
       return false;
     if (!o->is_cached(ex_it->offset, ex_it->length))
@@ -805,7 +798,7 @@ bool ObjectCacher::is_cached(inodeno_t ino, vector<ObjectExtent>& extents, snapi
  * returns # bytes read (if in cache).  onfinish is untouched (caller must delete it)
  * returns 0 if doing async read
  */
-int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
+int ObjectCacher::readx(OSDRead *rd, ObjectSet *oset, Context *onfinish)
 {
   bool success = true;
   list<BufferHead*> hit_ls;
@@ -818,7 +811,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
 
     // get Object cache
     sobject_t soid(ex_it->oid, rd->snap);
-    Object *o = get_object(soid, ino, ex_it->layout);
+    Object *o = get_object(soid, oset, ex_it->layout);
     
     // map extent into bufferheads
     map<loff_t, BufferHead*> hits, missing, rx;
@@ -833,7 +826,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
         if (success && onfinish) {
           dout(10) << "readx missed, waiting on " << *bh_it->second 
                    << " off " << bh_it->first << dendl;
-         bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+         bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
         }
        success = false;
       }
@@ -846,7 +839,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
         if (success && onfinish) {
           dout(10) << "readx missed, waiting on " << *bh_it->second 
                    << " off " << bh_it->first << dendl;
-         bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+         bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
         }
        success = false;
       }      
@@ -947,7 +940,7 @@ int ObjectCacher::readx(OSDRead *rd, inodeno_t ino, Context *onfinish)
 }
 
 
-int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset)
 {
   utime_t now = g_clock.now();
   
@@ -956,7 +949,7 @@ int ObjectCacher::writex(OSDWrite *wr, inodeno_t ino)
        ex_it++) {
     // get object cache
     sobject_t soid(ex_it->oid, CEPH_NOSNAP);
-    Object *o = get_object(soid, ino, ex_it->layout);
+    Object *o = get_object(soid, oset, ex_it->layout);
 
     // map it all into a single bufferhead.
     BufferHead *bh = o->map_write(wr);
@@ -1080,10 +1073,10 @@ void ObjectCacher::flusher_entry()
 
   
 // blocking.  atomic+sync.
-int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock)
 {
   dout(10) << "atomic_sync_readx " << rd
-           << " in " << ino
+           << " in " << oset
            << dendl;
 
   if (rd->extents.size() == 1) {
@@ -1114,7 +1107,7 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
          i != by_oid.end();
          i++) {
       sobject_t soid(i->first, rd->snap);
-      Object *o = get_object(soid, ino, i->second.layout);
+      Object *o = get_object(soid, oset, i->second.layout);
       rdlock(o);
     }
 
@@ -1125,7 +1118,7 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
     Mutex flock("ObjectCacher::atomic_sync_readx flock 2");
     Cond cond;
     bool done = false;
-    readx(rd, ino, new C_SafeCond(&flock, &cond, &done));
+    readx(rd, oset, new C_SafeCond(&flock, &cond, &done));
     
     // block
     while (!done) cond.Wait(lock);
@@ -1144,10 +1137,10 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock)
   return 0;
 }
 
-int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
+int ObjectCacher::atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock)
 {
   dout(10) << "atomic_sync_writex " << wr
-           << " in " << ino
+           << " in " << oset
            << dendl;
 
   if (wr->extents.size() == 1 &&
@@ -1157,14 +1150,15 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
     // make sure we aren't already locking/locked...
     sobject_t oid(wr->extents.front().oid, CEPH_NOSNAP);
     Object *o = 0;
-    if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout);
+    if (objects.count(oid))
+      o = get_object(oid, oset, wr->extents.front().layout);
     if (!o || 
         (o->lock_state != Object::LOCK_WRLOCK &&
          o->lock_state != Object::LOCK_WRLOCKING &&
          o->lock_state != Object::LOCK_UPGRADING)) {
       // just write synchronously.
       dout(10) << "atomic_sync_writex " << wr
-               << " in " << ino
+               << " in " << oset
                << " doing sync write"
                << dendl;
 
@@ -1193,7 +1187,7 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
        i != by_oid.end();
        i++) {
     sobject_t soid(i->first, CEPH_NOSNAP);
-    Object *o = get_object(soid, ino, i->second.layout);
+    Object *o = get_object(soid, oset, i->second.layout);
     wrlock(o);
   }
   
@@ -1201,7 +1195,7 @@ int ObjectCacher::atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock)
   vector<ObjectExtent> extents = wr->extents;
 
   // do the write, into our cache
-  writex(wr, ino);
+  writex(wr, oset);
   
   // flush 
   // ...and release the locks?
@@ -1364,20 +1358,18 @@ void ObjectCacher::wrunlock(Object *o)
 // -------------------------------------------------
 
 
-bool ObjectCacher::set_is_cached(inodeno_t ino)
+bool ObjectCacher::set_is_cached(ObjectSet *oset)
 {
-  if (objects_by_ino.count(ino) == 0) 
+  if (oset->objects.empty())
     return false;
   
-  set<Object*>& s = objects_by_ino[ino];
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
-    Object *ob = *i;
-    for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
-         p != ob->data.end();
-         p++) {
-      BufferHead *bh = p->second;
+  for (xlist<Object*>::iterator p = oset->objects.begin();
+       !p.end(); ++p) {
+    Object *ob = *p;
+    for (map<loff_t,BufferHead*>::iterator q = ob->data.begin();
+         q != ob->data.end();
+         q++) {
+      BufferHead *bh = q->second;
       if (!bh->is_dirty() && !bh->is_tx()) 
         return true;
     }
@@ -1386,15 +1378,13 @@ bool ObjectCacher::set_is_cached(inodeno_t ino)
   return false;
 }
 
-bool ObjectCacher::set_is_dirty_or_committing(inodeno_t ino)
+bool ObjectCacher::set_is_dirty_or_committing(ObjectSet *oset)
 {
-  if (objects_by_ino.count(ino) == 0) 
+  if (oset->objects.empty())
     return false;
   
-  set<Object*>& s = objects_by_ino[ino];
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
     Object *ob = *i;
     
     for (map<loff_t,BufferHead*>::iterator p = ob->data.begin();
@@ -1453,22 +1443,20 @@ bool ObjectCacher::flush(Object *ob)
 
 // flush.  non-blocking, takes callback.
 // returns true if already flushed
-bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
+bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
 {
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "flush_set on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "flush_set on " << oset << " dne" << dendl;
     return true;
   }
 
-  dout(10) << "flush_set " << ino << dendl;
+  dout(10) << "flush_set " << oset << dendl;
 
   C_Gather *gather = 0; // we'll need to wait for all objects to flush!
 
-  set<Object*>& s = objects_by_ino[ino];
   bool safe = true;
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
     Object *ob = *i;
 
     if (!flush(ob)) {
@@ -1477,7 +1465,7 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
         gather = new C_Gather(onfinish);
       safe = false;
 
-      dout(10) << "flush_set " << ino << " will wait for ack tid " 
+      dout(10) << "flush_set " << oset << " will wait for ack tid " 
                << ob->last_write_tid 
                << " on " << *ob
                << dendl;
@@ -1487,7 +1475,7 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
   }
   
   if (safe) {
-    dout(10) << "flush_set " << ino << " has no dirty|tx bhs" << dendl;
+    dout(10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl;
     return true;
   }
   return false;
@@ -1496,31 +1484,29 @@ bool ObjectCacher::flush_set(inodeno_t ino, Context *onfinish)
 
 // commit.  non-blocking, takes callback.
 // return true if already flushed.
-bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
+bool ObjectCacher::commit_set(ObjectSet *oset, Context *onfinish)
 {
   assert(onfinish);  // doesn't make any sense otherwise.
 
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "commit_set on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "commit_set on " << oset << " dne" << dendl;
     return true;
   }
 
-  dout(10) << "commit_set " << ino << dendl;
+  dout(10) << "commit_set " << oset << dendl;
 
   // make sure it's flushing.
-  flush_set(ino);
+  flush_set(oset);
 
   C_Gather *gather = 0; // we'll need to wait for all objects to commit
 
-  set<Object*>& s = objects_by_ino[ino];
   bool safe = true;
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
     Object *ob = *i;
     
     if (ob->last_write_tid > ob->last_commit_tid) {
-      dout(10) << "commit_set " << ino << " " << *ob 
+      dout(10) << "commit_set " << oset << " " << *ob 
                << " will finish on commit tid " << ob->last_write_tid
                << dendl;
       if (!gather && onfinish) gather = new C_Gather(onfinish);
@@ -1531,25 +1517,23 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
   }
 
   if (safe) {
-    dout(10) << "commit_set " << ino << " all committed" << dendl;
+    dout(10) << "commit_set " << oset << " all committed" << dendl;
     return true;
   }
   return false;
 }
 
-void ObjectCacher::purge_set(inodeno_t ino)
+void ObjectCacher::purge_set(ObjectSet *oset)
 {
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "purge_set on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "purge_set on " << oset << " dne" << dendl;
     return;
   }
 
-  dout(10) << "purge_set " << ino << dendl;
+  dout(10) << "purge_set " << oset << dendl;
 
-  set<Object*>& s = objects_by_ino[ino];
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
     Object *ob = *i;
        purge(ob);
   }
@@ -1586,36 +1570,34 @@ loff_t ObjectCacher::release(Object *ob)
   return o_unclean;
 }
 
-loff_t ObjectCacher::release_set(inodeno_t ino)
+loff_t ObjectCacher::release_set(ObjectSet *oset)
 {
   // return # bytes not clean (and thus not released).
   loff_t unclean = 0;
 
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "release_set on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "release_set on " << oset << " dne" << dendl;
     return 0;
   }
 
-  dout(10) << "release_set " << ino << dendl;
+  dout(10) << "release_set " << oset << dendl;
 
-  set<Object*> s = objects_by_ino[ino];
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
-    Object *ob = *i;
+  for (xlist<Object*>::iterator p = oset->objects.begin();
+       !p.end(); ++p) {
+    Object *ob = *p;
     
     loff_t o_unclean = release(ob);
     unclean += o_unclean;
 
     if (o_unclean) 
-      dout(10) << "release_set " << ino << " " << *ob 
+      dout(10) << "release_set " << oset << " " << *ob 
                << " has " << o_unclean << " bytes left"
                << dendl;
     
   }
 
   if (unclean) {
-    dout(10) << "release_set " << ino
+    dout(10) << "release_set " << oset
              << ", " << unclean << " bytes left" << dendl;
   }
 
@@ -1654,14 +1636,14 @@ __u64 ObjectCacher::release_all()
 
 
 
-void ObjectCacher::truncate_set(inodeno_t ino, vector<ObjectExtent>& exls)
+void ObjectCacher::truncate_set(ObjectSet *oset, vector<ObjectExtent>& exls)
 {
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "truncate_set on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "truncate_set on " << oset << " dne" << dendl;
     return;
   }
   
-  dout(10) << "truncate_set " << ino << dendl;
+  dout(10) << "truncate_set " << oset << dendl;
 
   for (vector<ObjectExtent>::iterator p = exls.begin();
        p != exls.end();
@@ -1690,21 +1672,19 @@ void ObjectCacher::truncate_set(inodeno_t ino, vector<ObjectExtent>& exls)
 }
 
 
-void ObjectCacher::kick_sync_writers(inodeno_t ino)
+void ObjectCacher::kick_sync_writers(ObjectSet *oset)
 {
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "kick_sync_writers on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "kick_sync_writers on " << oset << " dne" << dendl;
     return;
   }
 
-  dout(10) << "kick_sync_writers on " << ino << dendl;
+  dout(10) << "kick_sync_writers on " << oset << dendl;
 
   list<Context*> ls;
 
-  set<Object*>& s = objects_by_ino[ino];
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
     Object *ob = *i;
     
     ls.splice(ls.begin(), ob->waitfor_wr);
@@ -1713,21 +1693,19 @@ void ObjectCacher::kick_sync_writers(inodeno_t ino)
   finish_contexts(ls);
 }
 
-void ObjectCacher::kick_sync_readers(inodeno_t ino)
+void ObjectCacher::kick_sync_readers(ObjectSet *oset)
 {
-  if (objects_by_ino.count(ino) == 0) {
-    dout(10) << "kick_sync_readers on " << ino << " dne" << dendl;
+  if (oset->objects.empty()) {
+    dout(10) << "kick_sync_readers on " << oset << " dne" << dendl;
     return;
   }
 
-  dout(10) << "kick_sync_readers on " << ino << dendl;
+  dout(10) << "kick_sync_readers on " << oset << dendl;
 
   list<Context*> ls;
 
-  set<Object*>& s = objects_by_ino[ino];
-  for (set<Object*>::iterator i = s.begin();
-       i != s.end();
-       i++) {
+  for (xlist<Object*>::iterator i = oset->objects.begin();
+       !i.end(); ++i) {
     Object *ob = *i;
     
     ls.splice(ls.begin(), ob->waitfor_rd);
index 0447c1dc05a4176a65b324b824d27ee1afe75ceb..f9c93cde7724df1318a197a47b94dfed384081eb 100644 (file)
@@ -19,9 +19,10 @@ class Objecter;
 class ObjectCacher {
  public:
 
-  typedef void (*flush_set_callback_t) (void *p, inodeno_t ino);
-
   class Object;
+  class ObjectSet;
+
+  typedef void (*flush_set_callback_t) (void *p, ObjectSet *oset);
 
   // read scatter/gather  
   struct OSDRead {
@@ -131,9 +132,14 @@ class ObjectCacher {
     // ObjectCacher::Object fields
     ObjectCacher *oc;
     sobject_t oid;
-    inodeno_t ino;
+  public:
+    ObjectSet *oset;
+    xlist<Object*>::item set_item;
+  private:
     ceph_object_layout layout;
     
+    friend class ObjectSet;
+
   public:
     map<loff_t, BufferHead*>     data;
 
@@ -163,21 +169,24 @@ class ObjectCacher {
     int rdlock_ref;  // how many ppl want or are using a READ lock
 
   public:
-    Object(ObjectCacher *_oc, sobject_t o, inodeno_t i, ceph_object_layout& l) : 
+    Object(ObjectCacher *_oc, sobject_t o, ObjectSet *os, ceph_object_layout& l) : 
       oc(_oc),
-      oid(o), ino(i), layout(l),
+      oid(o), oset(os), set_item(this), layout(l),
       last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
       uncommitted_item(this),
-      lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
-      {}
+      lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {
+      // add to set
+      os->objects.push_back(&set_item);
+    }
     ~Object() {
       assert(data.empty());
+      set_item.remove_myself();
     }
 
     sobject_t get_soid() { return oid; }
     object_t get_oid() { return oid.oid; }
     snapid_t get_snap() { return oid.snap; }
-    inodeno_t get_ino() { return ino; }
+    ObjectSet *get_object_set() { return oset; }
     
     ceph_object_layout& get_layout() { return layout; }
     void set_layout(ceph_object_layout& l) { layout = l; }
@@ -235,6 +244,22 @@ class ObjectCacher {
 
   };
   
+
+  struct ObjectSet {
+    void *parent;
+
+    inodeno_t ino;
+    __u64 truncate_seq, truncate_size;
+
+    xlist<Object*> objects;
+    xlist<Object*> uncommitted;
+
+    int dirty_tx;
+
+    ObjectSet(void *p, inodeno_t i) : parent(p), ino(i), truncate_seq(0), truncate_size(0), dirty_tx(0) {}
+  };
+
+
   // ******* ObjectCacher *********
   // ObjectCacher fields
  public:
@@ -248,9 +273,6 @@ class ObjectCacher {
   void *flush_set_callback_arg;
 
   hash_map<sobject_t, Object*> objects;
-  hash_map<inodeno_t, set<Object*> > objects_by_ino;
-  hash_map<inodeno_t, int> dirty_tx_by_ino;
-  hash_map<inodeno_t, xlist<Object*> > uncommitted_by_ino;
 
   set<BufferHead*>    dirty_bh;
   LRU   lru_dirty, lru_rest;
@@ -270,22 +292,21 @@ class ObjectCacher {
   
 
   // objects
-  Object *get_object_maybe(sobject_t oid, inodeno_t ino, ceph_object_layout &l) {
+  Object *get_object_maybe(sobject_t oid, ceph_object_layout &l) {
     // have it?
     if (objects.count(oid))
       return objects[oid];
     return NULL;
   }
 
-  Object *get_object(sobject_t oid, inodeno_t ino, ceph_object_layout &l) {
+  Object *get_object(sobject_t oid, ObjectSet *oset, ceph_object_layout &l) {
     // have it?
     if (objects.count(oid))
       return objects[oid];
 
     // create it.
-    Object *o = new Object(this, oid, ino, l);
+    Object *o = new Object(this, oid, oset, l);
     objects[oid] = o;
-    objects_by_ino[ino].insert(o);
     return o;
   }
   void close_object(Object *ob);
@@ -306,11 +327,11 @@ class ObjectCacher {
     case BufferHead::STATE_CLEAN: stat_clean += bh->length(); break;
     case BufferHead::STATE_DIRTY: 
       stat_dirty += bh->length(); 
-      dirty_tx_by_ino[bh->ob->get_ino()] += bh->length();
+      bh->ob->oset->dirty_tx += bh->length();
       break;
     case BufferHead::STATE_TX: 
       stat_tx += bh->length(); 
-      dirty_tx_by_ino[bh->ob->get_ino()] += bh->length();
+      bh->ob->oset->dirty_tx += bh->length();
       break;
     case BufferHead::STATE_RX: stat_rx += bh->length(); break;
     }
@@ -322,11 +343,11 @@ class ObjectCacher {
     case BufferHead::STATE_CLEAN: stat_clean -= bh->length(); break;
     case BufferHead::STATE_DIRTY: 
       stat_dirty -= bh->length(); 
-      dirty_tx_by_ino[bh->ob->get_ino()] -= bh->length();
+      bh->ob->oset->dirty_tx -= bh->length();
       break;
     case BufferHead::STATE_TX: 
       stat_tx -= bh->length(); 
-      dirty_tx_by_ino[bh->ob->get_ino()] -= bh->length();
+      bh->ob->oset->dirty_tx -= bh->length();
       break;
     case BufferHead::STATE_RX: stat_rx -= bh->length(); break;
     }
@@ -504,12 +525,12 @@ class ObjectCacher {
   class C_RetryRead : public Context {
     ObjectCacher *oc;
     OSDRead *rd;
-    inodeno_t ino;
+    ObjectSet *oset;
     Context *onfinish;
   public:
-    C_RetryRead(ObjectCacher *_oc, OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
+    C_RetryRead(ObjectCacher *_oc, OSDRead *r, ObjectSet *os, Context *c) : oc(_oc), rd(r), oset(os), onfinish(c) {}
     void finish(int) {
-      int r = oc->readx(rd, ino, onfinish);
+      int r = oc->readx(rd, oset, onfinish);
       if (r > 0 && onfinish) {
         onfinish->finish(r);
         delete onfinish;
@@ -520,87 +541,87 @@ class ObjectCacher {
 
 
   // non-blocking.  async.
-  int readx(OSDRead *rd, inodeno_t ino, Context *onfinish);
-  int writex(OSDWrite *wr, inodeno_t ino);
-  bool is_cached(inodeno_t ino, vector<ObjectExtent>& extents, snapid_t snapid);
+  int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
+  int writex(OSDWrite *wr, ObjectSet *oset);
+  bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);
 
   // write blocking
   bool wait_for_write(__u64 len, Mutex& lock);
   
   // blocking.  atomic+sync.
-  int atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock);
-  int atomic_sync_writex(OSDWrite *wr, inodeno_t ino, Mutex& lock);
+  int atomic_sync_readx(OSDRead *rd, ObjectSet *oset, Mutex& lock);
+  int atomic_sync_writex(OSDWrite *wr, ObjectSet *oset, Mutex& lock);
 
-  bool set_is_cached(inodeno_t ino);
-  bool set_is_dirty_or_committing(inodeno_t ino);
+  bool set_is_cached(ObjectSet *oset);
+  bool set_is_dirty_or_committing(ObjectSet *oset);
 
-  bool flush_set(inodeno_t ino, Context *onfinish=0);
+  bool flush_set(ObjectSet *oset, Context *onfinish=0);
   void flush_all(Context *onfinish=0);
 
-  bool commit_set(inodeno_t ino, Context *oncommit);
+  bool commit_set(ObjectSet *oset, Context *oncommit);
   void commit_all(Context *oncommit=0);
 
-  void purge_set(inodeno_t ino);
+  void purge_set(ObjectSet *oset);
 
-  loff_t release_set(inodeno_t ino);  // returns # of bytes not released (ie non-clean)
+  loff_t release_set(ObjectSet *oset);  // returns # of bytes not released (ie non-clean)
   __u64 release_all();
 
-  void truncate_set(inodeno_t ino, vector<ObjectExtent>& ex);
+  void truncate_set(ObjectSet *oset, vector<ObjectExtent>& ex);
 
-  void kick_sync_writers(inodeno_t ino);
-  void kick_sync_readers(inodeno_t ino);
+  void kick_sync_writers(ObjectSet *oset);
+  void kick_sync_readers(ObjectSet *oset);
 
 
   // file functions
 
   /*** async+caching (non-blocking) file interface ***/
-  int file_is_cached(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid,
+  int file_is_cached(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid,
                     loff_t offset, __u64 len) {
     vector<ObjectExtent> extents;
-    filer.file_to_extents(ino, layout, offset, len, extents);
-    return is_cached(ino, extents, snapid);
+    filer.file_to_extents(oset->ino, layout, offset, len, extents);
+    return is_cached(oset, extents, snapid);
   }
 
-  int file_read(inodeno_t ino, ceph_file_layout *layout, snapid_t snapid,
+  int file_read(ObjectSet *oset, ceph_file_layout *layout, snapid_t snapid,
                 loff_t offset, __u64 len, 
                 bufferlist *bl,
                int flags,
                 Context *onfinish) {
     OSDRead *rd = prepare_read(snapid, bl, flags);
-    filer.file_to_extents(ino, layout, offset, len, rd->extents);
-    return readx(rd, ino, onfinish);
+    filer.file_to_extents(oset->ino, layout, offset, len, rd->extents);
+    return readx(rd, oset, onfinish);
   }
 
-  int file_write(inodeno_t ino, ceph_file_layout *layout, const SnapContext& snapc,
+  int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
                  loff_t offset, __u64 len, 
                  bufferlist& bl, utime_t mtime, int flags) {
     OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
-    filer.file_to_extents(ino, layout, offset, len, wr->extents);
-    return writex(wr, ino);
+    filer.file_to_extents(oset->ino, layout, offset, len, wr->extents);
+    return writex(wr, oset);
   }
 
 
 
   /*** sync+blocking file interface ***/
 
-  int file_atomic_sync_read(inodeno_t ino, ceph_file_layout *layout, 
+  int file_atomic_sync_read(ObjectSet *oset, ceph_file_layout *layout, 
                            snapid_t snapid,
                             loff_t offset, __u64 len, 
                             bufferlist *bl, int flags,
                             Mutex &lock) {
     OSDRead *rd = prepare_read(snapid, bl, flags);
-    filer.file_to_extents(ino, layout, offset, len, rd->extents);
-    return atomic_sync_readx(rd, ino, lock);
+    filer.file_to_extents(oset->ino, layout, offset, len, rd->extents);
+    return atomic_sync_readx(rd, oset, lock);
   }
 
-  int file_atomic_sync_write(inodeno_t ino, ceph_file_layout *layout, 
+  int file_atomic_sync_write(ObjectSet *oset, ceph_file_layout *layout, 
                             const SnapContext& snapc,
                              loff_t offset, __u64 len, 
                              bufferlist& bl, utime_t mtime, int flags,
                              Mutex &lock) {
     OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
-    filer.file_to_extents(ino, layout, offset, len, wr->extents);
-    return atomic_sync_writex(wr, ino, lock);
+    filer.file_to_extents(oset->ino, layout, offset, len, wr->extents);
+    return atomic_sync_writex(wr, oset, lock);
   }
 
 };
@@ -625,7 +646,7 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh)
 inline ostream& operator<<(ostream& out, ObjectCacher::Object &ob)
 {
   out << "object["
-      << ob.get_soid() << " ino " << hex << ob.get_ino() << dec
+      << ob.get_soid() << " oset " << ob.oset << dec
       << " wr " << ob.last_write_tid << "/" << ob.last_ack_tid << "/" << ob.last_commit_tid;
 
   switch (ob.lock_state) {
index 8042a39eeaace20e6aaec805328cb1e7c12e70f9..789bc13fbda80c5c3ee201d8ec7d563778b57a35 100644 (file)
@@ -329,7 +329,7 @@ public:
 
   struct StatfsOp {
     tid_t tid;
-    ceph_statfs *stats;
+    struct ceph_statfs *stats;
     Context *onfinish;
 
     utime_t last_submit;
@@ -653,7 +653,7 @@ private:
   void fs_stats_submit(StatfsOp *op);
 public:
   void handle_fs_stats_reply(MStatfsReply *m);
-  void get_fs_stats(ceph_statfs& result, Context *onfinish);
+  void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
 
   // ---------------------------
   // some scatter/gather hackery