From: sage Date: Sun, 26 Jun 2005 17:54:59 +0000 (+0000) Subject: memory leaks, threadpool tweaks X-Git-Tag: v0.1~2045 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c4b08bcf6a50a26a44ef998918e827c721608242;p=ceph.git memory leaks, threadpool tweaks git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@336 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 6d9f076d1e8..375755dc32a 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -65,6 +65,7 @@ void Client::tear_down_cache() it != fh_map.end(); it++) { Fh *fh = it->second; + dout(3) << "forcing close of fh " << it->first << " ino " << fh->inode->inode.ino << endl; put_inode(fh->inode); delete fh; } @@ -72,7 +73,13 @@ void Client::tear_down_cache() // empty lru lru.lru_set_max(0); - trim_cache(); + int last = 0; + while (lru.lru_get_size() != last) { + last = lru.lru_get_size(); + dout(10) << "trim pass, size is " << last << endl; + //dump_cache(); + trim_cache(); + } // close root ino assert(inode_map.size() <= 1); @@ -86,6 +93,44 @@ void Client::tear_down_cache() } + +// debug crapola + +void Client::dump_inode(Inode *in, set& did) +{ + dout(1) << "inode " << in << " ref " << in->ref << " dir " << in->dir << endl; + + if (in->dir) { + dout(1) << " dir size " << in->dir->dentries.size() << endl; + for (hash_map::iterator it = in->dir->dentries.begin(); + it != in->dir->dentries.end(); + it++) { + dout(1) << " dn " << it->first << " ref " << it->second->ref << endl; + dump_inode(it->second->inode, did); + } + } +} + +void Client::dump_cache() +{ + set did; + + if (root) dump_inode(root, did); + + for (map::iterator it = inode_map.begin(); + it != inode_map.end(); + it++) { + if (did.count(it->second)) continue; + + dout(1) << "inode " << it->first << " ref " << it->second->ref << " dir " << it->second->dir << endl; + if (it->second->dir) { + dout(1) << " dir size " << it->second->dir->dentries.size() << endl; + } + } + +} + + void Client::init() { } @@ -680,6 +725,7 @@ int Client::lstat(const char *path, struct stat *stbuf) if (dn && ((now - dn->inode->last_updated) <= g_conf.client_cache_stat_ttl)) { inode = dn->inode->inode; dout(10) << "lstat cache hit, age is " << (now - dn->inode->last_updated) << endl; + delete req; // don't need this } else { // FIXME where does FUSE maintain user information req->set_caller_uid(getuid()); @@ -695,8 +741,9 @@ int Client::lstat(const char *path, struct stat *stbuf) //Update metadata cache this->insert_trace(trace); - delete reply; } + + delete reply; } if (res == 0) { @@ -871,18 +918,21 @@ int Client::getdir(const char *path, map& contents) assert(diri); assert(diri->inode.mode & INODE_MODE_DIR); - Dir *dir = diri->open_dir(); - assert(dir); - time_t now = time(NULL); - for (vector::iterator it = reply->get_dir_contents().begin(); - it != reply->get_dir_contents().end(); - it++) { - // put in cache - Inode *in = this->insert_inode_info(dir, *it); - in->last_updated = now; - - // contents to caller too! - contents[(*it)->ref_dn] = &in->inode; + if (reply->get_dir_contents().size()) { + // only open dir if we're actually adding stuff to it! + Dir *dir = diri->open_dir(); + assert(dir); + time_t now = time(NULL); + for (vector::iterator it = reply->get_dir_contents().begin(); + it != reply->get_dir_contents().end(); + it++) { + // put in cache + Inode *in = this->insert_inode_info(dir, *it); + in->last_updated = now; + + // contents to caller too! + contents[(*it)->ref_dn] = &in->inode; + } } // FIXME: remove items in cache that weren't in my readdir diff --git a/ceph/client/Client.h b/ceph/client/Client.h index a609fc12c3e..47f0e970c81 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -228,6 +228,8 @@ class Client : public Dispatcher { // trim cache. void trim_cache(); + void dump_inode(Inode *in, set& did); + void dump_cache(); // debug // find dentry based on filepath Dentry *lookup(filepath& path); diff --git a/ceph/client/SyntheticClient.cc b/ceph/client/SyntheticClient.cc index d514f024846..300d55bb117 100644 --- a/ceph/client/SyntheticClient.cc +++ b/ceph/client/SyntheticClient.cc @@ -276,6 +276,7 @@ int SyntheticClient::random_walk(int num_req) if (op == MDS_OP_UTIME) { struct utimbuf b; + memset(&b, 1, sizeof(b)); if (contents.empty()) r = client->utime( cwd.c_str(), &b ); else diff --git a/ceph/client/SyntheticClient.h b/ceph/client/SyntheticClient.h index 78b925e3272..4d843c78220 100644 --- a/ceph/client/SyntheticClient.h +++ b/ceph/client/SyntheticClient.h @@ -73,6 +73,8 @@ class SyntheticClient { SyntheticClient(Client *client) { this->client = client; thread_id = 0; + + did_readdir = false; } int start_thread(); diff --git a/ceph/common/ThreadPool.h b/ceph/common/ThreadPool.h index f1c4820da7d..814f7c1569d 100644 --- a/ceph/common/ThreadPool.h +++ b/ceph/common/ThreadPool.h @@ -33,15 +33,16 @@ class ThreadPool { void * do_ops(void *nothing) { T* op; - + cout << "Thread "<< pthread_self() << " ready for action\n"; while(1) { q_sem.Get(); op = get_op(); - + if(op == NULL) { - cout << "Thread exiting\n"; - pthread_exit(0); + cout << "Thread exiting\n"; + //pthread_exit(0); + return 0; // like this, i think! } cout << "Thread "<< pthread_self() << " calling the function\n"; func(u, op); @@ -81,11 +82,17 @@ class ThreadPool { ~ThreadPool() { + // put null ops to make threads exit cleanly + for(int i = 0; i < num_threads; i++) + put_op(0); + + // wait for them to die for(int i = 0; i < num_threads; i++) { - cout << "Killing thread " << i << "\n"; - pthread_cancel(thread[i]); + cout << "Joining thread " << i << "\n"; + void *rval = 0; // we don't actually care + pthread_join(thread[i], &rval); } - delete thread; + delete[] thread; } void put_op(T* op) diff --git a/ceph/config.cc b/ceph/config.cc index 6afb54e9a03..1ffb00fb60f 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -59,7 +59,7 @@ md_config_t g_conf = { // --- osd --- osd_fsync: true, - + osd_maxthreads: 10, // --- fakeclient (mds regression testing) --- @@ -151,6 +151,8 @@ void parse_config_options(int argc, char **argv, else if (strcmp(argv[i], "--osd_fsync") == 0) g_conf.osd_fsync = atoi(argv[++i]); + else if (strcmp(argv[i], "--osd_maxthreads") == 0) + g_conf.osd_maxthreads = atoi(argv[++i]); else { //cout << "passing arg " << argv[i] << endl; diff --git a/ceph/config.h b/ceph/config.h index 3d2cc10b23f..d5c3d06cb41 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -48,6 +48,7 @@ struct md_config_t { // osd bool osd_fsync; + int osd_maxthreads; // fake client int num_fakeclient; diff --git a/ceph/fakesyn.cc b/ceph/fakesyn.cc index 60afa92c0a2..8e834e2d7d3 100644 --- a/ceph/fakesyn.cc +++ b/ceph/fakesyn.cc @@ -30,9 +30,9 @@ public: }; -int main(int oargc, char **oargv) { - - //cerr << "mpisyn starting " << myrank << "/" << world << endl; +int main(int oargc, char **oargv) +{ + cerr << "fakesyn starting" << endl; int argc; char **argv; parse_config_options(oargc, oargv, diff --git a/ceph/include/bufferlist.h b/ceph/include/bufferlist.h index 1af1eebf2f3..e3cb5678d2e 100644 --- a/ceph/include/bufferlist.h +++ b/ceph/include/bufferlist.h @@ -5,6 +5,7 @@ #include #include +#include using namespace std; #include @@ -336,6 +337,36 @@ inline void _decode(set& s, bufferlist& bl, int& off) assert(s.size() == n); } +// vector +inline void _encode(vector& s, bufferlist& bl) +{ + int n = s.size(); + bl.append((char*)&n, sizeof(n)); + for (vector::iterator it = s.begin(); + it != s.end(); + it++) { + int v = *it; + bl.append((char*)&v, sizeof(v)); + n--; + } + assert(n==0); +} +inline void _decode(vector& s, bufferlist& bl, int& off) +{ + s.clear(); + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + s = vector(n); + for (int i=0; i #include +#include #include using namespace std; diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index 07d6bbd5f0a..e39b5666d09 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -100,6 +100,7 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) { osdg.num_osds = g_conf.num_osd; for (int i=0; iadd_group(osdg); // diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index 5f0ff0262dd..d9f5a93a25b 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -44,9 +44,20 @@ bool awake = false; bool shutdown = false; pthread_t thread_id; + +class C_FakeKicker : public Context { + void finish(int r) { + dout(18) << "timer kick" << endl; + pending_timer = true; + cond.Signal(); // why not + } +}; + + void *fakemessenger_thread(void *ptr) { - dout(1) << "thread start" << endl; + dout(1) << "thread start, setting timer kicker" << endl; + g_timer.set_messenger_kicker(new C_FakeKicker()); lock.Lock(); while (1) { @@ -62,6 +73,9 @@ void *fakemessenger_thread(void *ptr) } lock.Unlock(); + cout << "unsetting messenger kicker" << endl; + g_timer.unset_messenger_kicker(); + dout(1) << "thread finish (i woke up but no messages, bye)" << endl; } @@ -86,10 +100,6 @@ void fakemessenger_wait() void *ptr; pthread_join(thread_id, &ptr); - - g_timer.unset_messenger_kicker(); - - } @@ -170,22 +180,11 @@ int fakemessenger_do_loop_2() } -// class - -class C_FakeKicker : public Context { - void finish(int r) { - dout(18) << "timer kick" << endl; - pending_timer = true; - cond.Signal(); // why not - } -}; - FakeMessenger::FakeMessenger(long me) : Messenger(me) { whoami = me; directory[ whoami ] = this; - g_timer.set_messenger_kicker(new C_FakeKicker()); cout << "fakemessenger " << whoami << " messenger is " << this << endl; @@ -218,8 +217,10 @@ int FakeMessenger::shutdown() { //cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl; directory.erase(whoami); - if (directory.empty()) + if (directory.empty()) { ::shutdown = true; + cond.Signal(); // why not + } } /* diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index aac912cad2c..2ad415d9161 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -178,9 +178,9 @@ int tcpmessenger_shutdown() // bleh - delete remote_addr; - delete in_sd; - delete out_sd; + delete[] remote_addr; + delete[] in_sd; + delete[] out_sd; } int tcpmessenger_world() diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 0f67bd31717..8b9d8af4c48 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -79,7 +79,7 @@ OSD::OSD(int id, Messenger *m) logger = new Logger(name, (LogType*)&osd_logtype); // Thread pool - threadpool = new ThreadPool(10, (void (*)(OSD*, MOSDOp*))doop, this); + threadpool = new ThreadPool(g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this); } OSD::~OSD() @@ -89,6 +89,7 @@ OSD::~OSD() if (messenger) { delete messenger; messenger = 0; } if (logger) { delete logger; logger = 0; } if (store) { delete store; store = 0; } + if (threadpool) { delete threadpool; threadpool = 0; } } int OSD::init() diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 53fc1783565..6633c611b4a 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -15,6 +15,7 @@ class Messenger; class Message; + // ways to be dirty #define RG_DIRTY_LOCAL_LOG 1 #define RG_DIRTY_LOCAL_SYNC 2 diff --git a/ceph/osd/OSDMap.cc b/ceph/osd/OSDMap.cc index 52664ba8525..2aef9b3fee7 100644 --- a/ceph/osd/OSDMap.cc +++ b/ceph/osd/OSDMap.cc @@ -13,7 +13,7 @@ void OSDCluster::encode(bufferlist& blist) int ngroups = osd_groups.size(); blist.append((char*)&ngroups, sizeof(ngroups)); for (int i=0; i(ngroups); for (int i=0; i osds; // the list of actual osd's + + void _encode(bufferlist& bl) { + bl.append((char*)&num_osds, sizeof(num_osds)); + bl.append((char*)&weight, sizeof(weight)); + bl.append((char*)&osd_size, sizeof(osd_size)); + ::_encode(osds, bl); + } + void _decode(bufferlist& bl, int& off) { + bl.copy(off, sizeof(num_osds), (char*)&num_osds); + off += sizeof(num_osds); + bl.copy(off, sizeof(weight), (char*)&weight); + off += sizeof(weight); + bl.copy(off, sizeof(osd_size), (char*)&osd_size); + off += sizeof(osd_size); + ::_decode(osds, bl, off); + } }; @@ -52,10 +69,10 @@ struct OSDGroup { * for mapping (ino, offset, len) to a (list of) byte extents in objects on osds */ struct OSDExtent { - int osd; - object_t oid; - repgroup_t rg; - size_t offset, len; + int osd; // (acting) primary osd + object_t oid; // object id + repgroup_t rg; // replica group + size_t offset, len; // extent within the object }; diff --git a/ceph/osdc/Filer.cc b/ceph/osdc/Filer.cc index 2817327d6f9..475897b279c 100644 --- a/ceph/osdc/Filer.cc +++ b/ceph/osdc/Filer.cc @@ -354,6 +354,7 @@ int Filer::remove(inodeno_t ino, size_t size, Context *onfinish) size_t off = 0; // ptr into buffer + int n = 0; for (list::iterator it = extents.begin(); it != extents.end(); it++) { @@ -369,8 +370,16 @@ int Filer::remove(inodeno_t ino, size_t size, Context *onfinish) // add to gather set p->outstanding_ops.insert(last_tid); op_removes[last_tid] = p; + n++; } + if (n == 0) { + delete p; + if (onfinish) { + onfinish->finish(0); + delete onfinish; + } + } } diff --git a/ceph/script/find_bufferleaks.pl b/ceph/script/find_bufferleaks.pl new file mode 100755 index 00000000000..8c5fd2a989a --- /dev/null +++ b/ceph/script/find_bufferleaks.pl @@ -0,0 +1,53 @@ +#!/usr/bin/perl + +use strict; +my %buffers; +my %ref; +my %mal; +my $l = 1; +while (<>) { + #print "$l: $_"; + + # cinode:auth_pin on inode [1000000002625 /gnu/blah_client_created. 0x89b7700] count now 1 + 0 + + if (/^buffer\.cons /) { + my ($x) = /(0x\S+)/; + $buffers{$x} = 1; + } + if (/^buffer\.des /) { + my ($x) = /(0x\S+)/; + die "des without cons at $l: $_" unless $buffers{$x}; + delete $buffers{$x}; + die "des with ref>0 at $l: $_" unless $ref{$x} == 0; + delete $ref{$x}; + } + + if (/^buffer\.malloc /) { + my ($x) = /(0x\S+)/; + $mal{$x} = 1; + } + if (/^buffer\.free /) { + my ($x) = /(0x\S+)/; + die "free with malloc at $l: $_" unless $mal{$x}; + delete $mal{$x}; + } + + if (/^buffer\.get /) { + my ($x) = /(0x\S+)/; + $ref{$x}++; + } + if (/^buffer\.get /) { + my ($x) = /(0x\S+)/; + $ref{$x}--; + } + +$l++; +} + +for my $x (keys %buffers) { + print "leaked buffer $x ref $ref{$x}\n"; +} + +for my $x (keys %mal) { + print "leaked buffer dataptr $x ref $ref{$x}\n"; +}