]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
memory leaks, threadpool tweaks
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 26 Jun 2005 17:54:59 +0000 (17:54 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Sun, 26 Jun 2005 17:54:59 +0000 (17:54 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@336 29311d96-e01e-0410-9327-a35deaab8ce9

19 files changed:
ceph/client/Client.cc
ceph/client/Client.h
ceph/client/SyntheticClient.cc
ceph/client/SyntheticClient.h
ceph/common/ThreadPool.h
ceph/config.cc
ceph/config.h
ceph/fakesyn.cc
ceph/include/bufferlist.h
ceph/include/types.h
ceph/mds/MDS.cc
ceph/msg/FakeMessenger.cc
ceph/msg/TCPMessenger.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/OSDMap.cc
ceph/osd/OSDMap.h
ceph/osdc/Filer.cc
ceph/script/find_bufferleaks.pl [new file with mode: 0755]

index 6d9f076d1e81489db781af6773d49a9219831a63..375755dc32af5bbe7b5454d3cff50e19692f05ff 100644 (file)
@@ -65,6 +65,7 @@ void Client::tear_down_cache()
           it != fh_map.end();
           it++) {
        Fh *fh = it->second;
+       dout(3) << "forcing close of fh " << it->first << " ino " << fh->inode->inode.ino << endl;
        put_inode(fh->inode);
        delete fh;
   }
@@ -72,7 +73,13 @@ void Client::tear_down_cache()
 
   // empty lru
   lru.lru_set_max(0);
-  trim_cache();
+  int last = 0;
+  while (lru.lru_get_size() != last) {
+       last = lru.lru_get_size();
+       dout(10) << "trim pass, size is " << last << endl;
+       //dump_cache();
+       trim_cache();
+  }
 
   // close root ino
   assert(inode_map.size() <= 1);
@@ -86,6 +93,44 @@ void Client::tear_down_cache()
 }
 
 
+
+// debug crapola
+
+void Client::dump_inode(Inode *in, set<Inode*>& did)
+{
+  dout(1) << "inode " << in << " ref " << in->ref << " dir " << in->dir << endl;
+
+  if (in->dir) {
+       dout(1) << "  dir size " << in->dir->dentries.size() << endl;
+       for (hash_map<string, Dentry*>::iterator it = in->dir->dentries.begin();
+                it != in->dir->dentries.end();
+                it++) {
+         dout(1) << "    dn " << it->first << " ref " << it->second->ref << endl;
+         dump_inode(it->second->inode, did);
+       }
+  }
+}
+
+void Client::dump_cache()
+{
+  set<Inode*> did;
+
+  if (root) dump_inode(root, did);
+
+  for (map<inodeno_t, Inode*>::iterator it = inode_map.begin();
+          it != inode_map.end();
+          it++) {
+       if (did.count(it->second)) continue;
+       
+       dout(1) << "inode " << it->first << " ref " << it->second->ref << " dir " << it->second->dir << endl;
+       if (it->second->dir) {
+         dout(1) << "  dir size " << it->second->dir->dentries.size() << endl;
+       }
+  }
+}
+
+
 void Client::init() {
   
 }
@@ -680,6 +725,7 @@ int Client::lstat(const char *path, struct stat *stbuf)
   if (dn && ((now - dn->inode->last_updated) <= g_conf.client_cache_stat_ttl)) {
        inode = dn->inode->inode;
        dout(10) << "lstat cache hit, age is " << (now - dn->inode->last_updated) << endl;
+       delete req;  // don't need this
   } else {  
        // FIXME where does FUSE maintain user information
        req->set_caller_uid(getuid());
@@ -695,8 +741,9 @@ int Client::lstat(const char *path, struct stat *stbuf)
          
          //Update metadata cache
          this->insert_trace(trace);
-         delete reply;
        }
+
+       delete reply;
   }
      
   if (res == 0) {
@@ -871,18 +918,21 @@ int Client::getdir(const char *path, map<string,inode_t*>& contents)
        assert(diri);
        assert(diri->inode.mode & INODE_MODE_DIR);
 
-       Dir *dir = diri->open_dir();
-       assert(dir);
-       time_t now = time(NULL);
-       for (vector<c_inode_info*>::iterator it = reply->get_dir_contents().begin(); 
-                it != reply->get_dir_contents().end(); 
-                it++) {
-         // put in cache
-         Inode *in = this->insert_inode_info(dir, *it);
-         in->last_updated = now;
-         
-         // contents to caller too!
-         contents[(*it)->ref_dn] = &in->inode;
+       if (reply->get_dir_contents().size()) {
+         // only open dir if we're actually adding stuff to it!
+         Dir *dir = diri->open_dir();
+         assert(dir);
+         time_t now = time(NULL);
+         for (vector<c_inode_info*>::iterator it = reply->get_dir_contents().begin(); 
+                  it != reply->get_dir_contents().end(); 
+                  it++) {
+               // put in cache
+               Inode *in = this->insert_inode_info(dir, *it);
+               in->last_updated = now;
+               
+               // contents to caller too!
+               contents[(*it)->ref_dn] = &in->inode;
+         }
        }
 
        // FIXME: remove items in cache that weren't in my readdir
index a609fc12c3e7696c9b6548cccc8c6505ea0524b1..47f0e970c815e5e412dda40476b68b2921ed5010 100644 (file)
@@ -228,6 +228,8 @@ class Client : public Dispatcher {
 
   // trim cache.
   void trim_cache();
+  void dump_inode(Inode *in, set<Inode*>& did);
+  void dump_cache();  // debug
   
   // find dentry based on filepath
   Dentry *lookup(filepath& path);
index d514f0248460ebb145c774345f16aaa8a1ed0e5d..300d55bb117b75fb2c3ed1f4f560d4a3a728293c 100644 (file)
@@ -276,6 +276,7 @@ int SyntheticClient::random_walk(int num_req)
         
        if (op == MDS_OP_UTIME) {
          struct utimbuf b;
+         memset(&b, 1, sizeof(b));
          if (contents.empty()) 
                r = client->utime( cwd.c_str(), &b );
          else
index 78b925e3272d429b54ff54c4dcfa09d6fd846ebc..4d843c78220cc18b06e5284c6dad65f707305b09 100644 (file)
@@ -73,6 +73,8 @@ class SyntheticClient {
   SyntheticClient(Client *client) {
        this->client = client;
        thread_id = 0;
+
+       did_readdir = false;
   }
 
   int start_thread();
index f1c4820da7dbf2787eb16727bbc2c21d37a9a5ee..814f7c1569de34eb0f79bae32349582cab3c8b45 100644 (file)
@@ -33,15 +33,16 @@ class ThreadPool {
   void * do_ops(void *nothing)
   {
     T* op;
-
+       
     cout << "Thread "<< pthread_self() << " ready for action\n";
     while(1) {
       q_sem.Get();
       op = get_op();
-
+         
       if(op == NULL) {
-       cout << "Thread exiting\n";
-       pthread_exit(0);
+               cout << "Thread exiting\n";
+               //pthread_exit(0);
+               return 0;   // like this, i think!
       }
       cout << "Thread "<< pthread_self() << " calling the function\n";
       func(u, op);
@@ -81,11 +82,17 @@ class ThreadPool {
 
   ~ThreadPool()
   {
+       // put null ops to make threads exit cleanly
+    for(int i = 0; i < num_threads; i++) 
+         put_op(0);
+
+       // wait for them to die
     for(int i = 0; i < num_threads; i++) {
-      cout << "Killing thread " << i << "\n";
-      pthread_cancel(thread[i]);
+      cout << "Joining thread " << i << "\n";
+         void *rval = 0;  // we don't actually care
+      pthread_join(thread[i], &rval);
     }
-    delete thread;
+    delete[] thread;
   }
 
   void put_op(T* op)
index 6afb54e9a0370fdc287ba38f2d9c708bc2aabf71..1ffb00fb60fd25b6e15c47b16b885ecbcfc8e7f3 100644 (file)
@@ -59,7 +59,7 @@ md_config_t g_conf = {
 
   // --- osd ---
   osd_fsync: true,
-
+  osd_maxthreads: 10,
 
 
   // --- fakeclient (mds regression testing) ---
@@ -151,6 +151,8 @@ void parse_config_options(int argc, char **argv,
 
        else if (strcmp(argv[i], "--osd_fsync") == 0) 
          g_conf.osd_fsync = atoi(argv[++i]);
+       else if (strcmp(argv[i], "--osd_maxthreads") == 0) 
+         g_conf.osd_maxthreads = atoi(argv[++i]);
 
        else {
          //cout << "passing arg " << argv[i] << endl;
index 3d2cc10b23f3df6130c27d3c25ae522835bcd2f4..d5c3d06cb41d13747b20ea403625df73bebff0b3 100644 (file)
@@ -48,6 +48,7 @@ struct md_config_t {
 
   // osd
   bool  osd_fsync;
+  int   osd_maxthreads;
 
   // fake client
   int      num_fakeclient;
index 60afa92c0a28ff8be2bf8624d92dc1e653871f2b..8e834e2d7d36e6611f20d9a0502de3d382e8d1cf 100644 (file)
@@ -30,9 +30,9 @@ public:
 };
 
 
-int main(int oargc, char **oargv) {
-
-  //cerr << "mpisyn starting " << myrank << "/" << world << endl;
+int main(int oargc, char **oargv) 
+{
+  cerr << "fakesyn starting" << endl;
   int argc;
   char **argv;
   parse_config_options(oargc, oargv,
index 1af1eebf2f3432785bcd0f92cdeecfbc6e6c6b61..e3cb5678d2e48e4bf4d66d126a31d2f17107f0d4 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <list>
 #include <set>
+#include <vector>
 using namespace std;
 
 #include <ext/rope>
@@ -336,6 +337,36 @@ inline void _decode(set<int>& s, bufferlist& bl, int& off)
   assert(s.size() == n);
 }
 
+// vector<int>
+inline void _encode(vector<int>& s, bufferlist& bl)
+{
+  int n = s.size();
+  bl.append((char*)&n, sizeof(n));
+  for (vector<int>::iterator it = s.begin();
+          it != s.end();
+          it++) {
+       int v = *it;
+       bl.append((char*)&v, sizeof(v));
+       n--;
+  }
+  assert(n==0);
+}
+inline void _decode(vector<int>& s, bufferlist& bl, int& off) 
+{
+  s.clear();
+  int n;
+  bl.copy(off, sizeof(n), (char*)&n);
+  off += sizeof(n);
+  s = vector<int>(n);
+  for (int i=0; i<n; i++) {
+       int v;
+       bl.copy(off, sizeof(v), (char*)&v);
+       off += sizeof(v);
+       s[i] = v;
+  }
+  assert(s.size() == n);
+}
+
 
 
 
index 67da3e93882504d02ea9818e566594a6b6c44ffc..ea1e70d7d3e042c119e83fa146381fc4b5b17eb8 100644 (file)
@@ -6,6 +6,7 @@
 
 #include <string>
 #include <set>
+#include <vector>
 #include <iostream>
 using namespace std;
 
index 07d6bbd5f0a5c39b8d64ab933e510172f390936e..e39b5666d0936d01be8cd960f08ff1ecce94812f 100644 (file)
@@ -100,6 +100,7 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
   osdg.num_osds = g_conf.num_osd;
   for (int i=0; i<osdg.num_osds; i++) osdg.osds.push_back(i);
   osdg.weight = 100;
+  osdg.osd_size = 100;  // not used yet?
   osdcluster->add_group(osdg);
   // </HACK>
 
index 5f0ff0262ddbf29f7a82fbb4fcd7c66a617a1e16..d9f5a93a25b45e2d33198746287179278f2beb1b 100644 (file)
@@ -44,9 +44,20 @@ bool      awake = false;
 bool      shutdown = false;
 pthread_t thread_id;
 
+
+class C_FakeKicker : public Context {
+  void finish(int r) {
+       dout(18) << "timer kick" << endl;
+       pending_timer = true;
+       cond.Signal();  // why not
+  }
+};
+
+
 void *fakemessenger_thread(void *ptr) 
 {
-  dout(1) << "thread start" << endl;
+  dout(1) << "thread start, setting timer kicker" << endl;
+  g_timer.set_messenger_kicker(new C_FakeKicker());
 
   lock.Lock();
   while (1) {
@@ -62,6 +73,9 @@ void *fakemessenger_thread(void *ptr)
   }
   lock.Unlock();
 
+  cout << "unsetting messenger kicker" << endl;
+  g_timer.unset_messenger_kicker();
+
   dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
 }
 
@@ -86,10 +100,6 @@ void fakemessenger_wait()
   void *ptr;
   pthread_join(thread_id, &ptr);
 
-
-  g_timer.unset_messenger_kicker();
-
-
 }
 
 
@@ -170,22 +180,11 @@ int fakemessenger_do_loop_2()
 }
 
 
-// class
-
-class C_FakeKicker : public Context {
-  void finish(int r) {
-       dout(18) << "timer kick" << endl;
-       pending_timer = true;
-       cond.Signal();  // why not
-  }
-};
-
 FakeMessenger::FakeMessenger(long me)  : Messenger(me)
 {
   whoami = me;
   directory[ whoami ] = this;
 
-  g_timer.set_messenger_kicker(new C_FakeKicker());
 
   cout << "fakemessenger " << whoami << " messenger is " << this << endl;
 
@@ -218,8 +217,10 @@ int FakeMessenger::shutdown()
 {
   //cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
   directory.erase(whoami);
-  if (directory.empty()) 
+  if (directory.empty()) {
        ::shutdown = true;
+       cond.Signal();  // why not
+  }
 }
 
 /*
index aac912cad2c34975d92719d5aca081faf788c895..2ad415d9161f484b3dbc999d1e4f39157bb3a4db 100644 (file)
@@ -178,9 +178,9 @@ int tcpmessenger_shutdown()
   // bleh
 
 
-  delete remote_addr;
-  delete in_sd;
-  delete out_sd;
+  delete[] remote_addr;
+  delete[] in_sd;
+  delete[] out_sd;
 }
 
 int tcpmessenger_world()
index 0f67bd31717143088b6ca4ad5b415db006bc348b..8b9d8af4c48582ffd33401cc10296b1119d48ddc 100644 (file)
@@ -79,7 +79,7 @@ OSD::OSD(int id, Messenger *m)
   logger = new Logger(name, (LogType*)&osd_logtype);
 
   // Thread pool
-  threadpool = new ThreadPool<OSD, MOSDOp>(10, (void (*)(OSD*, MOSDOp*))doop, this);
+  threadpool = new ThreadPool<OSD, MOSDOp>(g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this);
 }
 
 OSD::~OSD()
@@ -89,6 +89,7 @@ OSD::~OSD()
   if (messenger) { delete messenger; messenger = 0; }
   if (logger) { delete logger; logger = 0; }
   if (store) { delete store; store = 0; }
+  if (threadpool) { delete threadpool; threadpool = 0; }
 }
 
 int OSD::init()
index 53fc1783565354a07c117da0a56e5a0782294531..6633c611b4a5cdc75ed84e0885d1143d6196abbc 100644 (file)
@@ -15,6 +15,7 @@ class Messenger;
 class Message;
 
 
+
 // ways to be dirty
 #define RG_DIRTY_LOCAL_LOG     1
 #define RG_DIRTY_LOCAL_SYNC    2
index 52664ba85252961928a53605f697334a6d276164..2aef9b3fee739a81fc57d7633313cabac96b2258 100644 (file)
@@ -13,7 +13,7 @@ void OSDCluster::encode(bufferlist& blist)
   int ngroups = osd_groups.size();
   blist.append((char*)&ngroups, sizeof(ngroups));
   for (int i=0; i<ngroups; i++) {
-       blist.append((char*)&osd_groups[i], sizeof(OSDGroup));
+       osd_groups[i]._encode(blist);
   }
 
   _encode(down_osds, blist);
@@ -32,8 +32,7 @@ void OSDCluster::decode(bufferlist& blist)
 
   osd_groups = vector<OSDGroup>(ngroups);
   for (int i=0; i<ngroups; i++) {
-       blist.copy(off, sizeof(OSDGroup), (char*)&osd_groups[i]);
-       off += sizeof(OSDGroup);
+       osd_groups[i]._decode(blist, off);
   }
 
   _decode(down_osds, blist, off);
index b6042d8f7b71d47b4076c1eb3c054a3512c25ed0..e665e7edc0def78bbde1aced0847bc3ecb31cb1a 100644 (file)
@@ -40,11 +40,28 @@ using namespace __gnu_cxx;
 /** OSDGroup
  * a group of identical disks added to the OSD cluster
  */
-struct OSDGroup {
+class OSDGroup {
+ public:
   int         num_osds; // num disks in this group           (aka num_disks_in_cluster[])
   float       weight;   // weight (for data migration etc.)  (aka weight_cluster[])
   size_t      osd_size; // osd size (in MB)
   vector<int> osds;     // the list of actual osd's
+
+  void _encode(bufferlist& bl) {
+       bl.append((char*)&num_osds, sizeof(num_osds));
+       bl.append((char*)&weight, sizeof(weight));
+       bl.append((char*)&osd_size, sizeof(osd_size));
+       ::_encode(osds, bl);
+  }
+  void _decode(bufferlist& bl, int& off) {
+       bl.copy(off, sizeof(num_osds), (char*)&num_osds);
+       off += sizeof(num_osds);
+       bl.copy(off, sizeof(weight), (char*)&weight);
+       off += sizeof(weight);
+       bl.copy(off, sizeof(osd_size), (char*)&osd_size);
+       off += sizeof(osd_size);
+       ::_decode(osds, bl, off);
+  }
 };
 
 
@@ -52,10 +69,10 @@ struct OSDGroup {
  * for mapping (ino, offset, len) to a (list of) byte extents in objects on osds
  */
 struct OSDExtent {
-  int         osd;
-  object_t    oid;
-  repgroup_t  rg;
-  size_t      offset, len;
+  int         osd;       // (acting) primary osd
+  object_t    oid;       // object id
+  repgroup_t  rg;        // replica group
+  size_t      offset, len;   // extent within the object
 };
 
 
index 2817327d6f9b689b3b9f65fc55215eb2484518e2..475897b279c4f5ffc9c2223b0a8ec2e62e467772 100644 (file)
@@ -354,6 +354,7 @@ int Filer::remove(inodeno_t ino, size_t size, Context *onfinish)
 
   size_t off = 0;  // ptr into buffer
 
+  int n = 0;
   for (list<OSDExtent>::iterator it = extents.begin();
           it != extents.end();
           it++) {
@@ -369,8 +370,16 @@ int Filer::remove(inodeno_t ino, size_t size, Context *onfinish)
        // add to gather set
        p->outstanding_ops.insert(last_tid);
        op_removes[last_tid] = p;
+       n++;
   }
 
+  if (n == 0) {
+       delete p;
+       if (onfinish) {
+         onfinish->finish(0);
+         delete onfinish;
+       }
+  }
 }
 
 
diff --git a/ceph/script/find_bufferleaks.pl b/ceph/script/find_bufferleaks.pl
new file mode 100755 (executable)
index 0000000..8c5fd2a
--- /dev/null
@@ -0,0 +1,53 @@
+#!/usr/bin/perl
+
+use strict;
+my %buffers;
+my %ref;
+my %mal;
+my $l = 1;
+while (<>) {
+       #print "$l: $_";
+
+       # cinode:auth_pin on inode [1000000002625 /gnu/blah_client_created. 0x89b7700] count now 1 + 0
+
+       if (/^buffer\.cons /) {
+               my ($x) = /(0x\S+)/;
+               $buffers{$x} = 1;
+       }
+       if (/^buffer\.des /) {
+               my ($x) = /(0x\S+)/;
+               die "des without cons at $l: $_" unless $buffers{$x};
+               delete $buffers{$x};
+               die "des with ref>0 at $l: $_" unless $ref{$x} == 0;
+               delete $ref{$x};
+       }
+
+       if (/^buffer\.malloc /) {
+               my ($x) = /(0x\S+)/;
+               $mal{$x} = 1;
+       }
+       if (/^buffer\.free /) {
+               my ($x) = /(0x\S+)/;
+               die "free with malloc at $l: $_" unless $mal{$x};
+               delete $mal{$x};
+       }
+
+       if (/^buffer\.get /) {
+               my ($x) = /(0x\S+)/;
+               $ref{$x}++;
+       }
+       if (/^buffer\.get /) {
+               my ($x) = /(0x\S+)/;
+               $ref{$x}--;
+       }
+
+$l++;
+}
+
+for my $x (keys %buffers) {
+       print "leaked buffer $x ref $ref{$x}\n";
+}
+
+for my $x (keys %mal) {
+       print "leaked buffer dataptr $x ref $ref{$x}\n";
+}