it != fh_map.end();
it++) {
Fh *fh = it->second;
+ dout(3) << "forcing close of fh " << it->first << " ino " << fh->inode->inode.ino << endl;
put_inode(fh->inode);
delete fh;
}
// empty lru
lru.lru_set_max(0);
- trim_cache();
+ int last = 0;
+ while (lru.lru_get_size() != last) {
+ last = lru.lru_get_size();
+ dout(10) << "trim pass, size is " << last << endl;
+ //dump_cache();
+ trim_cache();
+ }
// close root ino
assert(inode_map.size() <= 1);
}
+
+// debug crapola
+
+void Client::dump_inode(Inode *in, set<Inode*>& did)
+{
+ dout(1) << "inode " << in << " ref " << in->ref << " dir " << in->dir << endl;
+
+ if (in->dir) {
+ dout(1) << " dir size " << in->dir->dentries.size() << endl;
+ for (hash_map<string, Dentry*>::iterator it = in->dir->dentries.begin();
+ it != in->dir->dentries.end();
+ it++) {
+ dout(1) << " dn " << it->first << " ref " << it->second->ref << endl;
+ dump_inode(it->second->inode, did);
+ }
+ }
+}
+
+void Client::dump_cache()
+{
+ set<Inode*> did;
+
+ if (root) dump_inode(root, did);
+
+ for (map<inodeno_t, Inode*>::iterator it = inode_map.begin();
+ it != inode_map.end();
+ it++) {
+ if (did.count(it->second)) continue;
+
+ dout(1) << "inode " << it->first << " ref " << it->second->ref << " dir " << it->second->dir << endl;
+ if (it->second->dir) {
+ dout(1) << " dir size " << it->second->dir->dentries.size() << endl;
+ }
+ }
+
+}
+
+
void Client::init() {
}
if (dn && ((now - dn->inode->last_updated) <= g_conf.client_cache_stat_ttl)) {
inode = dn->inode->inode;
dout(10) << "lstat cache hit, age is " << (now - dn->inode->last_updated) << endl;
+ delete req; // don't need this
} else {
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
//Update metadata cache
this->insert_trace(trace);
- delete reply;
}
+
+ delete reply;
}
if (res == 0) {
assert(diri);
assert(diri->inode.mode & INODE_MODE_DIR);
- Dir *dir = diri->open_dir();
- assert(dir);
- time_t now = time(NULL);
- for (vector<c_inode_info*>::iterator it = reply->get_dir_contents().begin();
- it != reply->get_dir_contents().end();
- it++) {
- // put in cache
- Inode *in = this->insert_inode_info(dir, *it);
- in->last_updated = now;
-
- // contents to caller too!
- contents[(*it)->ref_dn] = &in->inode;
+ if (reply->get_dir_contents().size()) {
+ // only open dir if we're actually adding stuff to it!
+ Dir *dir = diri->open_dir();
+ assert(dir);
+ time_t now = time(NULL);
+ for (vector<c_inode_info*>::iterator it = reply->get_dir_contents().begin();
+ it != reply->get_dir_contents().end();
+ it++) {
+ // put in cache
+ Inode *in = this->insert_inode_info(dir, *it);
+ in->last_updated = now;
+
+ // contents to caller too!
+ contents[(*it)->ref_dn] = &in->inode;
+ }
}
// FIXME: remove items in cache that weren't in my readdir
// trim cache.
void trim_cache();
+ void dump_inode(Inode *in, set<Inode*>& did);
+ void dump_cache(); // debug
// find dentry based on filepath
Dentry *lookup(filepath& path);
if (op == MDS_OP_UTIME) {
struct utimbuf b;
+ memset(&b, 1, sizeof(b));
if (contents.empty())
r = client->utime( cwd.c_str(), &b );
else
SyntheticClient(Client *client) {
this->client = client;
thread_id = 0;
+
+ did_readdir = false;
}
int start_thread();
void * do_ops(void *nothing)
{
T* op;
-
+
cout << "Thread "<< pthread_self() << " ready for action\n";
while(1) {
q_sem.Get();
op = get_op();
-
+
if(op == NULL) {
- cout << "Thread exiting\n";
- pthread_exit(0);
+ cout << "Thread exiting\n";
+ //pthread_exit(0);
+ return 0; // like this, i think!
}
cout << "Thread "<< pthread_self() << " calling the function\n";
func(u, op);
~ThreadPool()
{
+ // put null ops to make threads exit cleanly
+ for(int i = 0; i < num_threads; i++)
+ put_op(0);
+
+ // wait for them to die
for(int i = 0; i < num_threads; i++) {
- cout << "Killing thread " << i << "\n";
- pthread_cancel(thread[i]);
+ cout << "Joining thread " << i << "\n";
+ void *rval = 0; // we don't actually care
+ pthread_join(thread[i], &rval);
}
- delete thread;
+ delete[] thread;
}
void put_op(T* op)
// --- osd ---
osd_fsync: true,
-
+ osd_maxthreads: 10,
// --- fakeclient (mds regression testing) ---
else if (strcmp(argv[i], "--osd_fsync") == 0)
g_conf.osd_fsync = atoi(argv[++i]);
+ else if (strcmp(argv[i], "--osd_maxthreads") == 0)
+ g_conf.osd_maxthreads = atoi(argv[++i]);
else {
//cout << "passing arg " << argv[i] << endl;
// osd
bool osd_fsync;
+ int osd_maxthreads;
// fake client
int num_fakeclient;
};
-int main(int oargc, char **oargv) {
-
- //cerr << "mpisyn starting " << myrank << "/" << world << endl;
+int main(int oargc, char **oargv)
+{
+ cerr << "fakesyn starting" << endl;
int argc;
char **argv;
parse_config_options(oargc, oargv,
#include <list>
#include <set>
+#include <vector>
using namespace std;
#include <ext/rope>
assert(s.size() == n);
}
+// vector<int>
+inline void _encode(vector<int>& s, bufferlist& bl)
+{
+ int n = s.size();
+ bl.append((char*)&n, sizeof(n));
+ for (vector<int>::iterator it = s.begin();
+ it != s.end();
+ it++) {
+ int v = *it;
+ bl.append((char*)&v, sizeof(v));
+ n--;
+ }
+ assert(n==0);
+}
+inline void _decode(vector<int>& s, bufferlist& bl, int& off)
+{
+ s.clear();
+ int n;
+ bl.copy(off, sizeof(n), (char*)&n);
+ off += sizeof(n);
+ s = vector<int>(n);
+ for (int i=0; i<n; i++) {
+ int v;
+ bl.copy(off, sizeof(v), (char*)&v);
+ off += sizeof(v);
+ s[i] = v;
+ }
+ assert(s.size() == n);
+}
+
#include <string>
#include <set>
+#include <vector>
#include <iostream>
using namespace std;
osdg.num_osds = g_conf.num_osd;
for (int i=0; i<osdg.num_osds; i++) osdg.osds.push_back(i);
osdg.weight = 100;
+ osdg.osd_size = 100; // not used yet?
osdcluster->add_group(osdg);
// </HACK>
bool shutdown = false;
pthread_t thread_id;
+
+class C_FakeKicker : public Context {
+ void finish(int r) {
+ dout(18) << "timer kick" << endl;
+ pending_timer = true;
+ cond.Signal(); // why not
+ }
+};
+
+
void *fakemessenger_thread(void *ptr)
{
- dout(1) << "thread start" << endl;
+ dout(1) << "thread start, setting timer kicker" << endl;
+ g_timer.set_messenger_kicker(new C_FakeKicker());
lock.Lock();
while (1) {
}
lock.Unlock();
+ cout << "unsetting messenger kicker" << endl;
+ g_timer.unset_messenger_kicker();
+
dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
}
void *ptr;
pthread_join(thread_id, &ptr);
-
- g_timer.unset_messenger_kicker();
-
-
}
}
-// class
-
-class C_FakeKicker : public Context {
- void finish(int r) {
- dout(18) << "timer kick" << endl;
- pending_timer = true;
- cond.Signal(); // why not
- }
-};
-
FakeMessenger::FakeMessenger(long me) : Messenger(me)
{
whoami = me;
directory[ whoami ] = this;
- g_timer.set_messenger_kicker(new C_FakeKicker());
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
{
//cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
directory.erase(whoami);
- if (directory.empty())
+ if (directory.empty()) {
::shutdown = true;
+ cond.Signal(); // why not
+ }
}
/*
// bleh
- delete remote_addr;
- delete in_sd;
- delete out_sd;
+ delete[] remote_addr;
+ delete[] in_sd;
+ delete[] out_sd;
}
int tcpmessenger_world()
logger = new Logger(name, (LogType*)&osd_logtype);
// Thread pool
- threadpool = new ThreadPool<OSD, MOSDOp>(10, (void (*)(OSD*, MOSDOp*))doop, this);
+ threadpool = new ThreadPool<OSD, MOSDOp>(g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this);
}
OSD::~OSD()
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
if (store) { delete store; store = 0; }
+ if (threadpool) { delete threadpool; threadpool = 0; }
}
int OSD::init()
class Message;
+
// ways to be dirty
#define RG_DIRTY_LOCAL_LOG 1
#define RG_DIRTY_LOCAL_SYNC 2
int ngroups = osd_groups.size();
blist.append((char*)&ngroups, sizeof(ngroups));
for (int i=0; i<ngroups; i++) {
- blist.append((char*)&osd_groups[i], sizeof(OSDGroup));
+ osd_groups[i]._encode(blist);
}
_encode(down_osds, blist);
osd_groups = vector<OSDGroup>(ngroups);
for (int i=0; i<ngroups; i++) {
- blist.copy(off, sizeof(OSDGroup), (char*)&osd_groups[i]);
- off += sizeof(OSDGroup);
+ osd_groups[i]._decode(blist, off);
}
_decode(down_osds, blist, off);
/** OSDGroup
* a group of identical disks added to the OSD cluster
*/
-struct OSDGroup {
+class OSDGroup {
+ public:
int num_osds; // num disks in this group (aka num_disks_in_cluster[])
float weight; // weight (for data migration etc.) (aka weight_cluster[])
size_t osd_size; // osd size (in MB)
vector<int> osds; // the list of actual osd's
+
+ void _encode(bufferlist& bl) {
+ bl.append((char*)&num_osds, sizeof(num_osds));
+ bl.append((char*)&weight, sizeof(weight));
+ bl.append((char*)&osd_size, sizeof(osd_size));
+ ::_encode(osds, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(num_osds), (char*)&num_osds);
+ off += sizeof(num_osds);
+ bl.copy(off, sizeof(weight), (char*)&weight);
+ off += sizeof(weight);
+ bl.copy(off, sizeof(osd_size), (char*)&osd_size);
+ off += sizeof(osd_size);
+ ::_decode(osds, bl, off);
+ }
};
* for mapping (ino, offset, len) to a (list of) byte extents in objects on osds
*/
struct OSDExtent {
- int osd;
- object_t oid;
- repgroup_t rg;
- size_t offset, len;
+ int osd; // (acting) primary osd
+ object_t oid; // object id
+ repgroup_t rg; // replica group
+ size_t offset, len; // extent within the object
};
size_t off = 0; // ptr into buffer
+ int n = 0;
for (list<OSDExtent>::iterator it = extents.begin();
it != extents.end();
it++) {
// add to gather set
p->outstanding_ops.insert(last_tid);
op_removes[last_tid] = p;
+ n++;
}
+ if (n == 0) {
+ delete p;
+ if (onfinish) {
+ onfinish->finish(0);
+ delete onfinish;
+ }
+ }
}
--- /dev/null
+#!/usr/bin/perl
+
+use strict;
+my %buffers;
+my %ref;
+my %mal;
+my $l = 1;
+while (<>) {
+ #print "$l: $_";
+
+ # cinode:auth_pin on inode [1000000002625 /gnu/blah_client_created. 0x89b7700] count now 1 + 0
+
+ if (/^buffer\.cons /) {
+ my ($x) = /(0x\S+)/;
+ $buffers{$x} = 1;
+ }
+ if (/^buffer\.des /) {
+ my ($x) = /(0x\S+)/;
+ die "des without cons at $l: $_" unless $buffers{$x};
+ delete $buffers{$x};
+ die "des with ref>0 at $l: $_" unless $ref{$x} == 0;
+ delete $ref{$x};
+ }
+
+ if (/^buffer\.malloc /) {
+ my ($x) = /(0x\S+)/;
+ $mal{$x} = 1;
+ }
+ if (/^buffer\.free /) {
+ my ($x) = /(0x\S+)/;
+ die "free with malloc at $l: $_" unless $mal{$x};
+ delete $mal{$x};
+ }
+
+ if (/^buffer\.get /) {
+ my ($x) = /(0x\S+)/;
+ $ref{$x}++;
+ }
+ if (/^buffer\.get /) {
+ my ($x) = /(0x\S+)/;
+ $ref{$x}--;
+ }
+
+$l++;
+}
+
+for my $x (keys %buffers) {
+ print "leaked buffer $x ref $ref{$x}\n";
+}
+
+for my $x (keys %mal) {
+ print "leaked buffer dataptr $x ref $ref{$x}\n";
+}