]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
fixed lots of memory leaks! cleaned up C_Gather. Thread has stricter interface...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 12 Mar 2007 02:52:30 +0000 (02:52 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Mon, 12 Mar 2007 02:52:30 +0000 (02:52 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1207 29311d96-e01e-0410-9327-a35deaab8ce9

19 files changed:
trunk/ceph/client/Client.cc
trunk/ceph/client/FileCache.cc
trunk/ceph/client/FileCache.h
trunk/ceph/client/fuse.cc
trunk/ceph/common/Logger.cc
trunk/ceph/common/Thread.h
trunk/ceph/common/Timer.cc
trunk/ceph/ebofs/BlockDevice.h
trunk/ceph/ebofs/Table.h
trunk/ceph/fakesyn.cc
trunk/ceph/include/Context.h
trunk/ceph/mds/MDS.cc
trunk/ceph/mon/MonitorStore.cc
trunk/ceph/mon/OSDMonitor.cc
trunk/ceph/msg/FakeMessenger.cc
trunk/ceph/osd/OSD.cc
trunk/ceph/osdc/ObjectCacher.cc
trunk/ceph/osdc/ObjectCacher.h
trunk/ceph/osdc/Objecter.cc

index 72de4379443e1f9128149ec20a9553e6109e1e8f..b01b13b7cf2918afca87e4a2f0b5448124309908 100644 (file)
@@ -127,13 +127,19 @@ Client::Client(Messenger *m, MonMap *mm)
 
 Client::~Client() 
 {
-  if (messenger) { delete messenger; messenger = 0; }
+  tear_down_cache();
+
+  if (objectcacher) { 
+    delete objectcacher; 
+    objectcacher = 0; 
+  }
+
   if (filer) { delete filer; filer = 0; }
-  if (objectcacher) { delete objectcacher; objectcacher = 0; }
   if (objecter) { delete objecter; objecter = 0; }
   if (osdmap) { delete osdmap; osdmap = 0; }
+  if (mdsmap) { delete mdsmap; mdsmap = 0; }
 
-  tear_down_cache();
+  if (messenger) { delete messenger; messenger = 0; }
 }
 
 
@@ -792,6 +798,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
       if (cap_reap_queue[in->ino()].empty())
         cap_reap_queue.erase(in->ino());
     }
+    delete m;
     return;
   }
 
@@ -819,7 +826,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
     } else {
       //dout(0) << "didn't put_inode" << endl;
     }
-    
+    delete m;
     return;
   }
 
@@ -879,7 +886,6 @@ void Client::handle_file_caps(MClientFileCaps *m)
       }
     }
     in->fc.set_caps(new_caps, onimplement);
-
   } else {
     // caching off.
 
index 5d572ab7b670c84958f7522d4a6219e269e4643e..2a1dd1576ae5917994471ffa9756740154956977 100644 (file)
@@ -50,6 +50,15 @@ void FileCache::empty(Context *onempty)
 }
 
 
+void FileCache::tear_down()
+{
+  off_t unclean = release_clean();
+  if (unclean) {
+       dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
+       oc->purge_set(inode.ino);
+  }
+}
+
 // caps
 
 void FileCache::set_caps(int caps, Context *onimplement) 
index 742ec98733d9b5ae8e15b6861c565d8a8d89e10d..6bef22f4e0c6a4e62848dce62e6b3ac9c67444c1 100644 (file)
@@ -34,6 +34,9 @@ class FileCache {
     latest_caps(0),
     num_reading(0), num_writing(0),// num_unsafe(0),
     waitfor_release(false) {}
+  ~FileCache() {
+       tear_down();
+  }
 
   // waiters/waiting
   bool can_read() { return latest_caps & CAP_FILE_RD; }
@@ -52,6 +55,8 @@ class FileCache {
   bool is_cached();
   bool is_dirty();  
 
+  void tear_down();
+
   int get_caps() { return latest_caps; }
   void set_caps(int caps, Context *onimplement=0);
   void check_caps();
index 86a2ce360231576362d2cda0e7d015b93a070cfe..f4a1c2d3f779741ed511feda860fb88311cdd0df 100644 (file)
@@ -276,8 +276,6 @@ int ceph_fuse_main(Client *c, int argc, char *argv[])
   
   // go fuse go
   cout << "ok, calling fuse_main" << endl;
-  cout << "cwd was " << get_current_dir_name() << endl;
   int r = fuse_main(newargc, newargv, &ceph_oper);
-  cout << "cwd now " << get_current_dir_name() << endl;
   return r;
 }
index 2fb722a5b0f7c6e42f422206805d15a478dcdb60..91164658a80e58b68f52952459fadcdcbcd98f81 100644 (file)
@@ -36,7 +36,9 @@ Logger::Logger(string fn, LogType *type)
   {
     filename = "";
     if (g_conf.use_abspaths) {
-      filename = get_current_dir_name();
+      char *cwd = get_current_dir_name(); 
+      filename = cwd;
+      delete cwd;
       filename += "/";
     }
 
index 43e2942e84c5ff0bb9c174fe971f0e3e195bbabd..8565ce9effd925e8a44d38e7291bf35c0c98b237 100644 (file)
@@ -45,7 +45,9 @@ class Thread {
   }
 
   int join(void **prval = 0) {
-    if (thread_id == 0) return -1;   // never started.
+    assert(thread_id);
+    //if (thread_id == 0) return -1;   // never started.
+
     int status = pthread_join(thread_id, prval);
     if (status == 0) 
       thread_id = 0;
index adacf0c5eb6c64d9babb7790a5e4ec96921eeb7c..522a623d5ebac14c9c07bd2d71c0a7a000360d7d 100644 (file)
@@ -234,6 +234,10 @@ bool Timer::cancel_event(Context *callback)
     scheduled.erase(tp);
   
   lock.Unlock();
+
+  // delete the canceled event.
+  delete callback;
+
   return true;
 }
 
@@ -290,7 +294,7 @@ void SafeTimer::cancel_event(Context *c)
 
   if (g_timer.cancel_event(scheduled[c])) {
     // hosed wrapper.  hose original event too.
-    delete scheduled[c];
+    delete c;
   } else {
     // clean up later.
     canceled[c] = scheduled[c];
index 25adf62606947947580f44b09435b6432e142f55..18f639f7176b6d1c07f41e04a7f453e4030ef9d3 100644 (file)
@@ -143,6 +143,13 @@ class BlockDevice {
     BarrierQueue(BlockDevice *bd, const char *d) : bdev(bd), dev(d) {
       barrier();
     }
+    ~BarrierQueue() {
+      for (list<Queue*>::iterator p = qls.begin();
+          p != qls.end();
+          ++p) 
+       delete *p;
+      qls.clear();
+    }
     int size() {
       // this isn't perfectly accurate.
       if (!qls.empty())
index e6b3fb39660e4575ceaf66e819c1a513445947d0..f16e506a9dd63269b60aaf61f26921fd71a21373 100644 (file)
@@ -666,8 +666,9 @@ class Table {
           assert(cursor.open[cursor.level].size() == 0);
           assert(depth == 1);
           root = -1;
-          depth = 0;
-          pool.release(cursor.open[0].node);
+         depth = 0;
+         if (cursor.open[0].node)
+           pool.release(cursor.open[0].node);
         }
         verify("remove 1");
         return 0;
index dca78939b9282acfc11c1beafbb7fd1a6878dd8b..d4fc63a4cbba82a56f8fe9fad3ffea9dad1a83f9 100644 (file)
@@ -31,9 +31,6 @@ using namespace std;
 
 #include "common/Timer.h"
 
-#define NUMMDS g_conf.num_mds
-#define NUMOSD g_conf.num_osd
-#define NUMCLIENT g_conf.num_client
 
 class C_Test : public Context {
 public:
@@ -97,9 +94,9 @@ int main(int argc, char **argv)
   }
 
   // create mds
-  MDS *mds[NUMMDS];
-  OSD *mdsosd[NUMMDS];
-  for (int i=0; i<NUMMDS; i++) {
+  MDS *mds[g_conf.num_mds];
+  OSD *mdsosd[g_conf.num_mds];
+  for (int i=0; i<g_conf.num_mds; i++) {
     //cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
     mds[i] = new MDS(-1, new FakeMessenger(MSG_ADDR_MDS_NEW), monmap);
     if (g_conf.mds_local_osd)
@@ -108,17 +105,17 @@ int main(int argc, char **argv)
   }
   
   // create osd
-  OSD *osd[NUMOSD];
-  for (int i=0; i<NUMOSD; i++) {
+  OSD *osd[g_conf.num_osd];
+  for (int i=0; i<g_conf.num_osd; i++) {
     //cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
     osd[i] = new OSD(i, new FakeMessenger(MSG_ADDR_OSD(i)), monmap);
     start++;
   }
   
   // create client
-  Client *client[NUMCLIENT];
-  SyntheticClient *syn[NUMCLIENT];
-  for (int i=0; i<NUMCLIENT; i++) {
+  Client *client[g_conf.num_client];
+  SyntheticClient *syn[g_conf.num_client];
+  for (int i=0; i<g_conf.num_client; i++) {
     //cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
     client[i] = new Client(new FakeMessenger(MSG_ADDR_CLIENT(i)), monmap);
     start++;
@@ -132,19 +129,19 @@ int main(int argc, char **argv)
   for (int i=0; i<g_conf.num_mon; i++) {
     mon[i]->init();
   }
-  for (int i=0; i<NUMMDS; i++) {
+  for (int i=0; i<g_conf.num_mds; i++) {
     mds[i]->init();
     if (g_conf.mds_local_osd)
       mdsosd[i]->init();
   }
   
-  for (int i=0; i<NUMOSD; i++) {
+  for (int i=0; i<g_conf.num_osd; i++) {
     osd[i]->init();
   }
 
   
   // create client(s)
-  for (int i=0; i<NUMCLIENT; i++) {
+  for (int i=0; i<g_conf.num_client; i++) {
     client[i]->init();
     
     // use my argc, argv (make sure you pass a mount point!)
@@ -158,7 +155,7 @@ int main(int argc, char **argv)
   }
 
 
-  for (int i=0; i<NUMCLIENT; i++) {
+  for (int i=0; i<g_conf.num_client; i++) {
     
     cout << "waiting for synthetic client " << i << " to finish" << endl;
     syn[i]->join_thread();
@@ -174,13 +171,16 @@ int main(int argc, char **argv)
   fakemessenger_wait();
   
   // cleanup
-  for (int i=0; i<NUMMDS; i++) {
+  for (int i=0; i<g_conf.num_mon; i++) {
+    delete mon[i];
+  }
+  for (int i=0; i<g_conf.num_mds; i++) {
     delete mds[i];
   }
-  for (int i=0; i<NUMOSD; i++) {
+  for (int i=0; i<g_conf.num_osd; i++) {
     delete osd[i];
   }
-  for (int i=0; i<NUMCLIENT; i++) {
+  for (int i=0; i<g_conf.num_client; i++) {
     delete client[i];
   }
 
index 78059b8d39d82ecbf2fd63d750f4b4549a4184fe..b7798afbc93d96276c850a8979a6177cfb0f949c 100644 (file)
@@ -82,38 +82,55 @@ public:
  */
 class C_Gather : public Context {
 public:
+  bool sub_finish(int r) {
+    //cout << "C_Gather sub_finish " << this << endl;
+    assert(waitfor.count(r));
+    waitfor.erase(r);
+    if (!waitfor.empty()) 
+      return false;  // more subs left
+
+    // last one
+    onfinish->finish(0);
+    delete onfinish;
+    onfinish = 0;
+    return true;
+  }
+
   class C_GatherSub : public Context {
     C_Gather *gather;
     int num;
   public:
     C_GatherSub(C_Gather *g, int n) : gather(g), num(n) {}
     void finish(int r) {
-      gather->finish(num);
+      if (gather->sub_finish(num))
+       delete gather;   // last one!
     }
   };
 
+  Context *new_sub() {
+    num++;
+    waitfor.insert(num);
+    return new C_GatherSub(this, num);
+  }
+
 private:
   Context *onfinish;
   std::set<int> waitfor;
   int num;
 
 public:
-  C_Gather(Context *f) : onfinish(f), num(0) {}
-
+  C_Gather(Context *f) : onfinish(f), num(0) {
+    //cout << "C_Gather new " << this << endl;
+  }
+  ~C_Gather() {
+    //cout << "C_Gather delete " << this << endl;
+    assert(!onfinish);
+  }
   void finish(int r) {
-    assert(waitfor.count(r));
-    waitfor.erase(r);
-    if (waitfor.empty()) {
-      onfinish->finish(0);
-      delete onfinish;
-    }
+    // nobody should ever call me.
+    assert(0);
   }
 
-  Context *new_sub() {
-    num++;
-    waitfor.insert(num);
-    return new C_GatherSub(this, num);
-  }
 };
 
 #endif
index 298c50bf5ac161340ce5fea661c01f306071f563..2c0cf8973f06d8a43d2d6125a85750c79d8e6adb 100644 (file)
@@ -120,6 +120,10 @@ MDS::~MDS() {
   if (anchormgr) { delete anchormgr; anchormgr = NULL; }
   if (anchorclient) { delete anchorclient; anchorclient = NULL; }
   if (osdmap) { delete osdmap; osdmap = 0; }
+  if (mdsmap) { delete mdsmap; mdsmap = 0; }
+
+  if (server) { delete server; server = 0; }
+  if (locker) { delete locker; locker = 0; }
 
   if (filer) { delete filer; filer = 0; }
   if (objecter) { delete objecter; objecter = 0; }
index 8ac98631a0c54bb445e7e4812225ee6ce0dbca49..55389973c8c6ab61a99ac1fadfce10051dfd0970 100644 (file)
@@ -39,7 +39,9 @@ void MonitorStore::mount()
   if (g_conf.use_abspaths) {
     // combine it with the cwd, in case fuse screws things up (i.e. fakefuse)
     string old = dir;
-    dir = get_current_dir_name();
+    char *cwd = get_current_dir_name();
+    dir = cwd;
+    delete cwd;
     dir += "/";
     dir += old;
   }
index 43ec4eddf2eca8fbde1b6e53bce523415509c757..fe9d54b189de6b9431340bf8ac6b35b1b208f79a 100644 (file)
@@ -399,6 +399,8 @@ void OSDMonitor::handle_osd_boot(MOSDBoot *m)
               << (osdmap.osds.size() - osdmap.osd_inst.size())
               << " osds to boot" << endl;
     }
+
+    delete m;
     return;
   }
   
index d2db8c8f7e11c55c063570169ac767a7b0c7c909..2aa6c6b06b75b4224b8732c3089ae8358c3ce8fc 100644 (file)
@@ -143,7 +143,6 @@ int fakemessenger_do_loop_2()
 
       dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has " << mgr->num_incoming() << " queued" << endl;
 
-
       if (!mgr->is_ready()) {
         dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has no dispatcher, skipping" << endl;
         it++;
@@ -247,7 +246,11 @@ FakeMessenger::FakeMessenger(entity_name_t me)  : Messenger(me)
 
 FakeMessenger::~FakeMessenger()
 {
-
+  // hose any undelivered messages
+  for (list<Message*>::iterator p = incoming.begin();
+       p != incoming.end();
+       ++p)
+    delete *p;
 }
 
 
@@ -258,15 +261,6 @@ int FakeMessenger::shutdown()
   assert(directory.count(_myinst.addr) == 1);
   shutdown_set.insert(_myinst.addr);
   
-  /*
-  directory.erase(myaddr);
-  if (directory.empty()) {
-    dout(1) << "fakemessenger: last shutdown" << endl;
-    ::fm_shutdown = true;
-    cond.Signal();  // why not
-  } 
-  */
-
   /*
   if (loggers[myaddr]) {
     delete loggers[myaddr];
@@ -302,42 +296,34 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr
 
   lock.Lock();
 
-  // deliver
-  try {
 #ifdef LOG_MESSAGES
-    // stats
-    loggers[get_myaddr()]->inc("+send",1);
-    loggers[dest]->inc("-recv",1);
-
-    char s[20];
-    sprintf(s,"+%s", m->get_type_name());
-    loggers[get_myaddr()]->inc(s);
-    sprintf(s,"-%s", m->get_type_name());
-    loggers[dest]->inc(s);
+  // stats
+  loggers[get_myaddr()]->inc("+send",1);
+  loggers[dest]->inc("-recv",1);
+  
+  char s[20];
+  sprintf(s,"+%s", m->get_type_name());
+  loggers[get_myaddr()]->inc(s);
+  sprintf(s,"-%s", m->get_type_name());
+  loggers[dest]->inc(s);
 #endif
 
-    // queue
-    FakeMessenger *dm = directory[inst.addr];
-    if (!dm) {
-      dout(1) << "** destination " << inst << " dne" << endl;
-      for (map<entity_addr_t, FakeMessenger*>::iterator p = directory.begin();
-          p != directory.end();
-          ++p) {
-       dout(1) << "** have " << p->first << " to " << p->second << endl;
-      }
-      //assert(dm);
-    }
-    dm->queue_incoming(m);
-
+  // queue
+  if (directory.count(inst.addr)) {
     dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl;
-    
-  }
-  catch (...) {
-    cout << "no destination " << dest << endl;
-    assert(0);
+    directory[inst.addr]->queue_incoming(m);
+  } else {
+    dout(0) << "--> " << get_myname() << " -> " << inst.name << " " << *m
+           << " *** destination DNE ***" << endl;
+    for (map<entity_addr_t, FakeMessenger*>::iterator p = directory.begin();
+        p != directory.end();
+        ++p) {
+      dout(0) << "** have " << p->first << " to " << p->second << endl;
+    }
+    //assert(dm);
+    delete m;
   }
 
-
   // wake up loop?
   if (!awake) {
     dout(10) << "waking up fakemessenger thread" << endl; 
index 0aa0e707fbd0c5b8fca9eadc4a33d4966eaa0f0b..058692fab3fc08cd5cf4de1f34b742e3056fa33f 100644 (file)
@@ -739,8 +739,9 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
   if (dest.is_osd()) {
     // failed osd.  drop message, report to mon.
     int mon = monmap->pick_mon();
-    dout(0) << "ms_handle_failure " << dest << " inst " << inst 
+    dout(0) << "ms_handle_failure " << inst 
             << ", dropping and reporting to mon" << mon 
+           << " " << *m
             << endl;
     messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
                             monmap->get_inst(mon));
@@ -748,15 +749,16 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
   } else if (dest.is_mon()) {
     // resend to a different monitor.
     int mon = monmap->pick_mon(true);
-    dout(0) << "ms_handle_failure " << dest << " inst " << inst 
+    dout(0) << "ms_handle_failure " << inst 
             << ", resending to mon" << mon 
+           << " " << *m
             << endl;
     messenger->send_message(m, monmap->get_inst(mon));
   }
   else {
     // client?
-    dout(0) << "ms_handle_failure " << dest << " inst " << inst 
-            << ", dropping" << endl;
+    dout(0) << "ms_handle_failure " << inst 
+            << ", dropping " << *m << endl;
     delete m;
   }
 }
index cb8db645f49e78cd48929927a6b15b02c36e697a..0933675ae2880fab0904c018b4be0009105616fd 100644 (file)
@@ -96,50 +96,7 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
   dout(10) << "merge_left result " << *left << endl;
 }
 
-/* buggy possibly, but more importnatly, unnecessary.
-void ObjectCacher::Object::merge_right(BufferHead *left, BufferHead *right)
-{
-  assert(left->end() == right->start());
-  assert(left->get_state() == right->get_state());
-
-  dout(10) << "merge_right " << *left << " + " << *right << endl;
-  oc->bh_remove(this, left);
-  oc->bh_stat_sub(right);
-  data.erase(right->start());
-  right->set_start( left->start() );
-  data[right->start()] = right;
-  right->set_length( left->length() + right->length());
-  oc->bh_stat_add(right);
-
-  // data
-  bufferlist nbl;
-  nbl.claim(left->bl);
-  nbl.claim_append(right->bl);
-  right->bl.claim(nbl);
-  
-  // version 
-  // note: this is sorta busted, but should only be used for dirty buffers
-  right->last_write_tid =  MAX( left->last_write_tid, right->last_write_tid );
-
-  // waiters
-  map<off_t,list<Context*> > old;
-  old.swap(right->waitfor_read);
 
-  // take left's waiters
-  right->waitfor_read.swap(left->waitfor_read);
-
-  // shift old waiters
-  for (map<off_t, list<Context*> >::iterator p = old.begin();
-       p != old.end();
-       p++) 
-    right->waitfor_read[p->first + left->length()].swap( p->second );
-  
-  // hose left
-  delete left;
-
-  dout(10) << "merge_right result " << *right << endl;
-}
-*/
 
 /*
  * map a range of bytes into buffer_heads.
@@ -374,8 +331,25 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr
 #define dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << objecter->messenger->get_myname() << ".objectcacher "
 
 
+
 /* private */
 
+void ObjectCacher::close_object(Object *ob) 
+{
+  dout(10) << "close_object " << *ob << endl;
+  assert(ob->can_close());
+  
+  // ok!
+  objects.erase(ob->get_oid());
+  objects_by_ino[ob->get_ino()].erase(ob);
+  if (objects_by_ino[ob->get_ino()].empty())
+       objects_by_ino.erase(ob->get_ino());
+  delete ob;
+}
+
+
+
+
 void ObjectCacher::bh_read(BufferHead *bh)
 {
   dout(7) << "bh_read on " << *bh << endl;
@@ -817,6 +791,9 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
   }
   dout(10) << "readx  result is " << rd->bl->length() << endl;
 
+  // done with read.
+  delete rd;
+
   trim();
   
   return pos;
@@ -1258,6 +1235,26 @@ bool ObjectCacher::set_is_dirty_or_committing(inodeno_t ino)
 }
 
 
+// purge.  non-blocking.  violently removes dirty buffers from cache.
+void ObjectCacher::purge(Object *ob)
+{
+  dout(10) << "purge " << *ob << endl;
+
+  for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+       p != ob->data.end();
+       p++) {
+    BufferHead *bh = p->second;
+       dout(0) << "purge forcibly removing " << *bh << endl;
+       bh_remove(ob, bh);
+       delete bh;
+  }
+
+  if (ob->can_close()) {
+       dout(10) << "trim trimming " << *ob << endl;
+       close_object(ob);
+  }
+}
+
 // flush.  non-blocking.  no callback.
 // true if clean, already flushed.  
 // false if we wrote something.
@@ -1366,6 +1363,24 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish)
   return false;
 }
 
+void ObjectCacher::purge_set(inodeno_t ino)
+{
+  if (objects_by_ino.count(ino) == 0) {
+    dout(10) << "purge_set on " << ino << " dne" << endl;
+    return;
+  }
+
+  dout(10) << "purge_set " << ino << endl;
+
+  set<Object*>& s = objects_by_ino[ino];
+  for (set<Object*>::iterator i = s.begin();
+       i != s.end();
+       i++) {
+    Object *ob = *i;
+       purge(ob);
+  }
+}
+
 
 off_t ObjectCacher::release(Object *ob)
 {
@@ -1384,8 +1399,15 @@ off_t ObjectCacher::release(Object *ob)
 
   for (list<BufferHead*>::iterator p = clean.begin();
           p != clean.end();
-          p++)
+          p++) {
        bh_remove(ob, *p);
+       delete *p;
+  }
+
+  if (ob->can_close()) {
+       dout(10) << "trim trimming " << *ob << endl;
+       close_object(ob);
+  }
 
   return o_unclean;
 }
@@ -1402,7 +1424,7 @@ off_t ObjectCacher::release_set(inodeno_t ino)
 
   dout(10) << "release_set " << ino << endl;
 
-  set<Object*>& s = objects_by_ino[ino];
+  set<Object*> s = objects_by_ino[ino];
   for (set<Object*>::iterator i = s.begin();
        i != s.end();
        i++) {
index 27b154023209d62cd6d14cb23fd1e298a81d6de8..e9a4041008666975b7aa1ab3da0cc881e32c47d2 100644 (file)
@@ -133,6 +133,9 @@ class ObjectCacher {
       last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
       lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
       {}
+       ~Object() {
+         assert(data.empty());
+       }
 
     object_t get_oid() { return oid; }
     inodeno_t get_ino() { return ino; }
@@ -227,16 +230,7 @@ class ObjectCacher {
     objects_by_ino[ino].insert(o);
     return o;
   }
-  void close_object(Object *ob) {
-    assert(ob->can_close());
-
-    // ok!
-    objects.erase(ob->get_oid());
-    objects_by_ino[ob->get_ino()].erase(ob);
-    if (objects_by_ino[ob->get_ino()].empty())
-      objects_by_ino.erase(ob->get_ino());
-    delete ob;
-  }
+  void close_object(Object *ob);
 
   // bh stats
   Cond  stat_cond;
@@ -315,18 +309,22 @@ class ObjectCacher {
 
   void bh_add(Object *ob, BufferHead *bh) {
     ob->add_bh(bh);
-    if (bh->is_dirty())
+    if (bh->is_dirty()) {
       lru_dirty.lru_insert_top(bh);
-    else
+         dirty_bh.insert(bh);
+       } else {
       lru_rest.lru_insert_top(bh);
+       }
     bh_stat_add(bh);
   }
   void bh_remove(Object *ob, BufferHead *bh) {
     ob->remove_bh(bh);
-    if (bh->is_dirty())
+    if (bh->is_dirty()) {
       lru_dirty.lru_remove(bh);
-    else
+         dirty_bh.erase(bh);
+       } else {
       lru_rest.lru_remove(bh);
+       }
     bh_stat_sub(bh);
   }
 
@@ -339,6 +337,7 @@ class ObjectCacher {
 
   bool flush(Object *o);
   off_t release(Object *o);
+  void purge(Object *o);
 
   void rdlock(Object *o);
   void rdunlock(Object *o);
@@ -413,10 +412,17 @@ class ObjectCacher {
     flusher_thread.create();
   }
   ~ObjectCacher() {
-    //lock.Lock();  // hmm.. watch out for deadlock!
+       // we should be empty.
+       assert(objects.empty());
+       assert(lru_rest.lru_get_size() == 0);
+       assert(lru_dirty.lru_get_size() == 0);
+       assert(dirty_bh.empty());
+
+       assert(flusher_thread.is_started());
+    lock.Lock();  // hmm.. watch out for deadlock!
     flusher_stop = true;
     flusher_cond.Signal();
-    //lock.Unlock();
+    lock.Unlock();
     flusher_thread.join();
   }
 
@@ -457,6 +463,8 @@ class ObjectCacher {
   bool commit_set(inodeno_t ino, Context *oncommit);
   void commit_all(Context *oncommit=0);
 
+  void purge_set(inodeno_t ino);
+
   off_t release_set(inodeno_t ino);  // returns # of bytes not released (ie non-clean)
 
   void kick_sync_writers(inodeno_t ino);
index ddd22325747dbc7acc5f7484ffb28da55eb37886..9e49a43ace89baf1c243399030154f0fdf0eac76 100644 (file)
@@ -269,27 +269,31 @@ tid_t Objecter::stat_submit(OSDStat *st)
   ObjectExtent &ex = st->extents.front();
   PG &pg = get_pg( ex.pgid );
 
-  // send
+  // pick tid
   last_tid++;
   assert(client_inc >= 0);
-  MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
-                         ex.oid, ex.pgid, osdmap->get_epoch(), 
-                         OSD_OP_STAT);
+
+  // add to gather set
+  st->tid = last_tid;
+  op_stat[last_tid] = st;    
+
+  pg.active_tids.insert(last_tid);
+
+  // send?
   dout(10) << "stat_submit " << st << " tid " << last_tid
            << " oid " << ex.oid
            << " pg " << ex.pgid
            << " osd" << pg.acker() 
            << endl;
 
-  if (pg.acker() >= 0) 
+  if (pg.acker() >= 0) {
+       MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+                                                  ex.oid, ex.pgid, osdmap->get_epoch(), 
+                                                  OSD_OP_STAT);
+
     messenger->send_message(m, osdmap->get_inst(pg.acker()));
+  }
   
-  // add to gather set
-  st->tid = last_tid;
-  op_stat[last_tid] = st;    
-
-  pg.active_tids.insert(last_tid);
-
   return last_tid;
 }
 
@@ -381,14 +385,17 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex)
   // find OSD
   PG &pg = get_pg( ex.pgid );
 
-  // send
+  // pick tid
   last_tid++;
   assert(client_inc >= 0);
-  MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
-                         ex.oid, ex.pgid, osdmap->get_epoch(), 
-                         OSD_OP_READ);
-  m->set_length(ex.length);
-  m->set_offset(ex.start);
+
+  // add to gather set
+  rd->ops[last_tid] = ex;
+  op_read[last_tid] = rd;    
+
+  pg.active_tids.insert(last_tid);
+
+  // send?
   dout(10) << "readx_submit " << rd << " tid " << last_tid
            << " oid " << ex.oid << " " << ex.start << "~" << ex.length
            << " (" << ex.buffer_extents.size() << " buffer fragments)" 
@@ -396,15 +403,16 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex)
            << " osd" << pg.acker() 
            << endl;
 
-  if (pg.acker() >= 0) 
+  if (pg.acker() >= 0) {
+       MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+                                                  ex.oid, ex.pgid, osdmap->get_epoch(), 
+                                                  OSD_OP_READ);
+       m->set_length(ex.length);
+       m->set_offset(ex.start);
+       
     messenger->send_message(m, osdmap->get_inst(pg.acker()));
+  }
     
-  // add to gather set
-  rd->ops[last_tid] = ex;
-  op_read[last_tid] = rd;    
-
-  pg.active_tids.insert(last_tid);
-
   return last_tid;
 }
 
@@ -641,43 +649,13 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   // find
   PG &pg = get_pg( ex.pgid );
     
-  // send
+  // pick tid
   tid_t tid;
   if (usetid > 0) 
     tid = usetid;
   else
     tid = ++last_tid;
 
-  MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
-                         ex.oid, ex.pgid, osdmap->get_epoch(),
-                         wr->op);
-  m->set_length(ex.length);
-  m->set_offset(ex.start);
-  m->set_rev(ex.rev);
-
-  if (wr->tid_version.count(tid)) 
-    m->set_version(wr->tid_version[tid]);  // we're replaying this op!
-    
-  // what type of op?
-  switch (wr->op) {
-  case OSD_OP_WRITE:
-    {
-      // map buffer segments into this extent
-      // (may be fragmented bc of striping)
-      bufferlist cur;
-      for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
-           bit != ex.buffer_extents.end();
-           bit++) {
-        bufferlist thisbit;
-        thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second);
-        cur.claim_append(thisbit);
-      }
-      assert(cur.length() == ex.length);
-      m->set_data(cur);//.claim(cur);
-    }
-    break;
-  }
-
   // add to gather set
   wr->waitfor_ack[tid] = ex;
   wr->waitfor_commit[tid] = ex;
@@ -687,15 +665,46 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   ++num_unacked;
   ++num_uncommitted;
 
-  // send
+  // send?
   dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
            << "  oid " << ex.oid
            << " " << ex.start << "~" << ex.length 
            << " pg " << ex.pgid 
            << " osd" << pg.primary()
            << endl;
-  if (pg.primary() >= 0)
+  if (pg.primary() >= 0) {
+       MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
+                                                  ex.oid, ex.pgid, osdmap->get_epoch(),
+                                                  wr->op);
+       m->set_length(ex.length);
+       m->set_offset(ex.start);
+       m->set_rev(ex.rev);
+       
+       if (wr->tid_version.count(tid)) 
+         m->set_version(wr->tid_version[tid]);  // we're replaying this op!
+    
+       // what type of op?
+       switch (wr->op) {
+       case OSD_OP_WRITE:
+         {
+               // map buffer segments into this extent
+               // (may be fragmented bc of striping)
+               bufferlist cur;
+               for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
+                        bit != ex.buffer_extents.end();
+                        bit++) {
+                 bufferlist thisbit;
+                 thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second);
+                 cur.claim_append(thisbit);
+      }
+               assert(cur.length() == ex.length);
+               m->set_data(cur);//.claim(cur);
+         }
+         break;
+       }
+       
     messenger->send_message(m, osdmap->get_inst(pg.primary()));
+  }
   
   dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl;