if (messenger) { delete messenger; messenger = 0; }
if (filer) { delete filer; filer = 0; }
if (osdcluster) { delete osdcluster; osdcluster = 0; }
+
+ tear_down_cache();
+}
+
+void Client::tear_down_cache()
+{
+ // fh's
+ for (map<fileh_t, Fh*>::iterator it = fh_map.begin();
+ it != fh_map.end();
+ it++) {
+ Fh *fh = it->second;
+ put_inode(fh->inode);
+ delete fh;
+ }
+ fh_map.clear();
+
+ // empty lru
+ lru.lru_set_max(0);
+ trim_cache();
+
+ // close root ino
+ assert(inode_map.size() <= 1);
+ if (root && inode_map.size() == 1) {
+ delete root;
+ root = 0;
+ inode_map.clear();
+ }
+
+ assert(inode_map.empty());
}
// ===================
// metadata cache stuff
+void Client::trim_cache()
+{
+ while (lru.lru_get_size() > lru.lru_get_max()) {
+ Dentry *dn = (Dentry*)lru.lru_expire();
+ if (!dn) break; // done
+
+ //dout(10) << "unlinking dn " << dn->name << " in dir " << dn->dir->inode->inode.ino << endl;
+ unlink(dn);
+ }
+}
// insert inode info into metadata cache
// success?
if (result > 0) {
// yay
+ fileh_t fh = reply->get_result(); // FIXME?
Fh *f = new Fh;
memset(f, 0, sizeof(*f));
f->mds = reply->get_source();
f->inode = inode_map[trace[trace.size()-1]->inode.ino];
f->caps = reply->get_file_caps();
- fh_map[reply->get_result()] = f;
+ assert(fh_map.count(fh) == 0);
+ fh_map[fh] = f;
- dout(3) << "open success, fh is " << reply->get_result() << " caps " << f->caps << " fh size " << f->size << endl;
+ dout(3) << "open success, fh is " << fh << " caps " << f->caps << " fh size " << f->size << endl;
}
delete reply;
/* mds may ack our close() after reissuing same fh to another open; remove from
fh_map _before_ sending request. */
fh_map.erase(fh);
+ delete f;
release_inode_buffers(in);
// unlink from inode
dn->inode = 0;
in->dn = 0;
- in->put();
+ put_inode(in);
// unlink from dir
dn->dir->dentries.erase(dn->name);
void touch_dn(Dentry *dn) { lru.lru_touch(dn); }
// trim cache.
- void trim_cache() {
- while (lru.lru_get_size() > lru.lru_get_max()) {
- Dentry *dn = (Dentry*)lru.lru_expire();
- if (!dn) break; // done
-
- unlink(dn);
- }
- }
+ void trim_cache();
// find dentry based on filepath
Dentry *lookup(filepath& path);
public:
Client(MDCluster *mdc, int id, Messenger *m);
~Client();
+ void tear_down_cache();
int get_nodeid() { return whoami; }
-int SyntheticClient::write_file(string& fn, int size, int wrsize) // size is in MB
+int SyntheticClient::write_file(string& fn, int size, int wrsize) // size is in MB, wrsize in bytes
{
//__uint64_t wrsize = 1024*256;
char *buf = new char[wrsize]; // 1 MB
+ memset(buf, 1, wrsize);
__uint64_t chunks = (__uint64_t)size * (__uint64_t)(1024*1024) / (__uint64_t)wrsize;
int fd = client->open(fn.c_str(), O_WRONLY|O_CREAT);
if (!event_times.count(callback)) {
dout(DBL) << "cancel_event " << callback << " wasn't scheduled?" << endl;
lock.Unlock();
+ assert(0);
return false; // wasn't scheduled.
}
thread_stop = false;
}
~Timer() {
- // cancel any wakeup/thread crap
+ // scheduled
+ for (map< timepair_t, set<Context*> >::iterator it = scheduled.begin();
+ it != scheduled.end();
+ it++) {
+ for (set<Context*>::iterator sit = it->second.begin();
+ sit != it->second.end();
+ sit++)
+ delete *sit;
+ }
+ scheduled.clear();
+
+ // pending
+ for (map< timepair_t, set<Context*> >::iterator it = pending.begin();
+ it != pending.end();
+ it++) {
+ for (set<Context*>::iterator sit = it->second.begin();
+ sit != it->second.end();
+ sit++)
+ delete *sit;
+ }
+ pending.clear();
+ }
+
+ void init() {
+ register_timer();
+ }
+ void shutdown() {
cancel_timer();
-
- // clean up pending events
- // ** FIXME **
}
void set_messenger(Messenger *m);
delete client[i];
}
delete mdc;
-
+
+ free(argv);
+ delete[] nargv;
return 0;
}
int MDCache::issue_file_caps(CInode *in,
int mode,
- Context *onwait)
+ MClientRequest *req)
{
dout(7) << "issue_file_caps for mode " << mode << " on " << *in << endl;
}
}
- in->add_waiter(CINODE_WAIT_CAPS, onwait);
+ in->add_waiter(CINODE_WAIT_CAPS, new C_MDS_RetryRequest(mds, req, in));
return 0;
}
// we're okay!
int caps = my_want & allowed;
dout(7) << " issuing caps " << caps << " (i want " << my_want << ", allowed " << allowed << ")" << endl;
- assert(caps);
+ assert(caps > 0);
+
return caps;
}
// -- file i/o --
- int issue_file_caps(CInode *in, int mode, Context *onwait);
+ int issue_file_caps(CInode *in, int mode, MClientRequest *req);
void eval_file_caps(CInode *in);
void handle_client_file_caps(class MClientFileCaps *m);
// can we issue the caps they want?
- int caps = mdcache->issue_file_caps(cur, mode,
- new C_MDS_RetryRequest(this, req, cur));
+ int caps = mdcache->issue_file_caps(cur, mode, req);
if (!caps) return; // can't issue (yet), so wait!
// create fh
__uint64_t version;
public:
- bufferlist bl;
-
MDDoCommitDirContext(MDStore *ms, CDir *dir, Context *c, int w) : Context() {
this->ms = ms;
this->dir = dir;
dout(14) << "num " << num << endl;
// put count in buffer
+ bufferlist bl;
size_t size = sizeof(num) + dirdata.length();
- fin->bl.append((char*)&size, sizeof(size));
- fin->bl.append((char*)&num, sizeof(num));
- fin->bl.append(dirdata.c_str(), dirdata.length());
- assert(fin->bl.length() == size + sizeof(size));
+ bl.append((char*)&size, sizeof(size));
+ bl.append((char*)&num, sizeof(num));
+ bl.append(dirdata.c_str(), dirdata.length());
+ assert(bl.length() == size + sizeof(size));
// pin inode
dir->auth_pin();
// submit to osd
- int osd;
- object_t oid;
- if (hashcode >= 0) {
- // hashed
- osd = mds->mdcluster->get_hashdir_meta_osd(dir->ino(), hashcode);
- oid = mds->mdcluster->get_hashdir_meta_oid(dir->ino(), hashcode);
- } else {
- // normal
- osd = mds->mdcluster->get_meta_osd(dir->ino());
- oid = mds->mdcluster->get_meta_oid(dir->ino());
- }
-
-
mds->filer->write( dir->ino(),
- fin->bl.length(), 0,
- fin->bl,
+ bl.length(), 0,
+ bl,
0, //OSD_OP_FLAGS_TRUNCATE, // truncate file/object after end of this write
fin );
}
}
lock.Unlock();
- // shutdown underlying messenger.
+ // shutdown and delete underlying messenger.
messenger->shutdown();
+ delete messenger;
}
lock.Lock();
fakemessenger_do_loop_2();
lock.Unlock();
+
+ g_timer.shutdown();
}
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
+ /*
string name;
name = "m.";
name += MSG_ADDR_TYPE(whoami);
if (w >= 10) name += ('0' + ((w/10)%10));
name += ('0' + ((w/1)%10));
- logger = new Logger(name, (LogType*)&fakemsg_logtype);
- loggers[ whoami ] = logger;
+ loggers[ whoami ] = new Logger(name, (LogType*)&fakemsg_logtype);
+ */
}
FakeMessenger::~FakeMessenger()
{
shutdown();
- delete logger;
+ if (loggers[whoami]) {
+ delete loggers[whoami];
+ loggers.erase(whoami);
+ }
}
}
void finish(int r) {
//cout << "HEARTBEAT" << endl;
+ hm->pending_events.erase(this);
hm->initiate_heartbeat();
}
};
}
void finish(int r) {
//cout << "CHECK" << endl;
+ hm->pending_events.erase(this);
hm->check_heartbeat();
}
};
it != pending_events.end();
it++) {
g_timer.cancel_event(*it);
+ delete *it;
}
pending_events.clear();
}
int max_heartbeat_misses; // how many misses before i tell
float notify_retry_interval; // how often to retry failure notification
+ public:
set<Context*> pending_events;
+ private:
void schedule_heartbeat();
public:
dout(5) << "finishing async sends" << endl;
mpi_finish_sends();
+ g_timer.shutdown();
+
dout(5) << "mpimessenger_loop exiting loop" << endl;
}
{
DIR *dir = opendir(mydir.c_str());
if (dir) {
- dout(1) << "wiping " << mydir << endl;
+ dout(10) << "wiping " << mydir << endl;
struct dirent *ent = 0;
while (ent = readdir(dir)) {
// make sure my dir exists
r = ::stat(mydir.c_str(), &st);
if (r != 0) {
- dout(1) << "creating " << mydir << endl;
+ dout(10) << "creating " << mydir << endl;
mkdir(mydir.c_str(), 0755);
r = ::stat(mydir.c_str(), &st);
if (r != 0) {
OSD::~OSD()
{
if (osdcluster) { delete osdcluster; osdcluster = 0; }
+ if (monitor) { delete monitor; monitor = 0; }
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
if (store) { delete store; store = 0; }