dout(1) << "mounted" << endl;
mounted = true;
+ delete reply;
+
client_lock.Unlock();
}
mounted = false;
dout(1) << "unmounted" << endl;
+ delete reply;
+
client_lock.Unlock();
}
decay(now);
val += a;
}
+ void adjust_down(const DecayCounter& other) {
+ // assume other has same time stamp as us...
+ val -= other.val;
+ }
void set_halflife(double hl) {
half_life = hl;
wrote_header = -1;
open = false;
this->type = type;
+ wrote_header_last = 0;
}
Logger::~Logger()
}
// header?
- if (wrote_header != type->version) {
+ wrote_header_last++;
+ if (wrote_header != type->version ||
+ wrote_header_last > 10) {
out << "#";
for (vector<string>::iterator it = type->keys.begin(); it != type->keys.end(); it++) {
out << "\t" << *it;
}
out << endl;
wrote_header = type->version;
+ wrote_header_last = 0;
}
// write line to log
int last_logged;
int interval;
int wrote_header;
+ int wrote_header_last;
string filename;
g_conf.mds_log_read_inc = atoi(argv[++i]);
else if (strcmp(argv[i], "--mds_log_max_trimming") == 0)
g_conf.mds_log_max_trimming = atoi(argv[++i]);
+
else if (strcmp(argv[i], "--mds_commit_on_shutdown") == 0)
g_conf.mds_commit_on_shutdown = atoi(argv[++i]);
else if (strcmp(argv[i], "--mds_log_flush_on_shutdown") == 0)
g_conf.mds_log_flush_on_shutdown = atoi(argv[++i]);
+
else if (strcmp(argv[i], "--mds_bal_interval") == 0)
g_conf.mds_bal_interval = atoi(argv[++i]);
-
else if (strcmp(argv[i], "--osd_fsync") == 0)
g_conf.osd_fsync = atoi(argv[++i]);
}
}
- fakemessenger_startthread();
-
MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD);
client[i]->init();
// use my argc, argv (make sure you pass a mount point!)
- cout << "mounting" << endl;
+ //cout << "mounting" << endl;
client[i]->mount(mkfs);
- cout << "starting synthetic client " << endl;
+ //cout << "starting synthetic client " << endl;
syn[i] = new SyntheticClient(client[i]);
char s[20];
}
for (int i=0; i<NUMCLIENT; i++) {
- cout << "waiting for synthetic client to finish" << endl;
+ cout << "waiting for synthetic client " << i << " to finish" << endl;
syn[i]->join_thread();
delete syn[i];
client[i]->unmount();
- cout << "unmounted" << endl;
+ //cout << "unmounted" << endl;
client[i]->shutdown();
}
inline ostream& operator<<(ostream& out, bufferptr& bp) {
return out << "bufferptr(len=" << bp._len << ", off=" << bp._off
- //<< ", " << bp.c_str()
+ << ", int=" << *(int*)(bp.c_str())
<< ", " << *bp._buffer
<< ")";
}
st.popularity_justme.take( dir->popularity[MDS_POP_JUSTME] );
st.popularity_curdom.take( dir->popularity[MDS_POP_CURDOM] );
+ dir->popularity[MDS_POP_ANYDOM].adjust_down(st.popularity_curdom);
rep_by = dir->dir_rep_by;
open_by = dir->open_by;
inodeno_t get_ino() { return st.ino; }
__uint64_t get_nden() { return st.nden; }
- void update_dir(CDir *dir) {
+ void update_dir(CDir *dir, timepair_t& now) {
assert(dir->ino() == st.ino);
//dir->nitems = st.nitems;
dir->dir_auth = st.dir_auth;
dir->dir_rep = st.dir_rep;
+ double newcurdom = st.popularity_curdom.get(now) - dir->popularity[MDS_POP_CURDOM].get(now);
dir->popularity[MDS_POP_JUSTME].take( st.popularity_justme );
dir->popularity[MDS_POP_CURDOM].take( st.popularity_curdom );
+ dir->popularity[MDS_POP_ANYDOM].adjust(now, newcurdom);
dir->dir_rep_by = rep_by;
dir->open_by = open_by;
st.popularity_justme.take( in->popularity[MDS_POP_JUSTME] );
st.popularity_curdom.take( in->popularity[MDS_POP_CURDOM] );
+ in->popularity[MDS_POP_ANYDOM].adjust_down(st.popularity_curdom);
// suck up fh's from inode
for (map<fileh_t, CFile*>::iterator it = in->fh_map.begin();
inodeno_t get_ino() { return st.inode.ino; }
- int update_inode(CInode *in) {
+ int update_inode(CInode *in, timepair_t& now) {
in->inode = st.inode;
in->version = st.version;
+
+ double newcurdom = st.popularity_curdom.get(now) - in->popularity[MDS_POP_CURDOM].get(now);
in->popularity[MDS_POP_JUSTME].take( st.popularity_justme );
in->popularity[MDS_POP_CURDOM].take( st.popularity_curdom );
+ in->popularity[MDS_POP_ANYDOM].adjust(now, newcurdom);
if (st.is_dirty)
in->mark_dirty();
// generic log event
class LogEvent {
- protected:
- int type;
+ private:
+ int _type;
public:
- LogEvent(int t) {
- type = t;
- }
+ LogEvent(int t) : _type(t) { }
- int get_type() { return type; }
+ int get_type() { return _type; }
virtual void encode_payload(bufferlist& bl) = 0;
virtual void decode_payload(bufferlist& bl, int& off) = 0;
void encode(bufferlist& bl) {
// type
- bl.append((char*)&type, sizeof(type));
+ assert(_type > 0);
+ bl.append((char*)&_type, sizeof(_type));
// len placeholder
int len = 0; // we don't know just yet...
int off = bl.length();
- bl.append((char*)&type, sizeof(len));
+ bl.append((char*)&len, sizeof(len));
// payload
encode_payload(bl);
size_t elen = bl.length();
// append
- dout(10) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl;
+ dout(15) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl;
off_t off = append_pos;
append_pos += elen;
- dout(15) << "write buf was " << write_buf.length() << endl;
+ //dout(15) << "write buf was " << write_buf.length() << " bl " << write_buf << endl;
write_buf.claim_append(bl);
- dout(15) << "write buf now " << write_buf.length() << endl;
+ //dout(15) << "write buf now " << write_buf.length() << " bl " << write_buf << endl;
return off;
}
void LogStream::_append_2(off_t off)
{
- dout(10) << "sync_pos now " << off << endl;
+ dout(15) << "sync_pos now " << off << endl;
sync_pos = off;
// wake up waiters
read_buf.copy(off, sizeof(__uint32_t), (char*)&length);
off += sizeof(__uint32_t);
- dout(10) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl;
+ dout(15) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl;
assert(type > 0);
*/
void MDBalancer::subtract_export(CDir *dir)
{
-
- // FIXME this is totally wrong
-
timepair_t now = g_clock.gettimepair();
double curdom = -dir->popularity[MDS_POP_CURDOM].get(now);
- double anydom = -dir->popularity[MDS_POP_ANYDOM].get(now);
bool in_domain = !dir->is_import();
while (true) {
CInode *in = dir->inode;
- if (in_domain)
- in->popularity[MDS_POP_CURDOM].adjust(now, curdom);
- if (in->is_auth())
- in->popularity[MDS_POP_ANYDOM].adjust(now, anydom);
+ in->popularity[MDS_POP_ANYDOM].adjust(now, curdom);
+ if (in_domain) in->popularity[MDS_POP_CURDOM].adjust(now, curdom);
dir = in->get_parent_dir();
if (!dir) break;
-
+
if (dir->is_import()) in_domain = false;
- if (in_domain)
- dir->popularity[MDS_POP_CURDOM].adjust(now, curdom);
- if (dir->is_auth())
- dir->popularity[MDS_POP_ANYDOM].adjust(now, anydom);
+ dir->popularity[MDS_POP_ANYDOM].adjust(now, curdom);
+ if (in_domain) dir->popularity[MDS_POP_CURDOM].adjust(now, curdom);
}
}
void MDBalancer::add_import(CDir *dir)
{
+ timepair_t now = g_clock.gettimepair();
+ double curdom = dir->popularity[MDS_POP_CURDOM].get(now);
+
+ bool in_domain = !dir->is_import();
+ while (true) {
+ CInode *in = dir->inode;
+
+ in->popularity[MDS_POP_ANYDOM].adjust(now, curdom);
+ if (in_domain) in->popularity[MDS_POP_CURDOM].adjust(now, curdom);
+
+ dir = in->get_parent_dir();
+ if (!dir) break;
+
+ if (dir->is_import()) in_domain = false;
+
+ dir->popularity[MDS_POP_ANYDOM].adjust(now, curdom);
+ if (in_domain) dir->popularity[MDS_POP_CURDOM].adjust(now, curdom);
+ }
+
}
bool MDCache::shutdown_pass()
{
- static bool did_inode_updates = false;
-
dout(7) << "shutdown_pass" << endl;
//assert(mds->is_shutting_down());
if (mds->is_shut_down()) {
// decode + import inode (into new location start)
int off = 0;
- decode_import_inode(destdn, m->get_inode_state(), off, m->get_source());
+ timepair_t now = g_clock.gettimepair();
+ decode_import_inode(destdn, m->get_inode_state(), off, m->get_source(), now);
CInode *in = destdn->inode;
assert(in);
mds->messenger->send_message(new MExportDirDiscover(dir->inode),
dest, MDS_PORT_CACHE, MDS_PORT_CACHE);
dir->auth_pin(); // pin dir, to hang up our freeze
- mds->logger->inc("ex");
// take away popularity (and pass it on to the context, MExportDir request later)
mds->balancer->subtract_export(dir);
// fill export message with cache data
C_MDS_ExportFinish *fin = new C_MDS_ExportFinish(mds, dir, dest);
- export_dir_walk( req,
- fin,
- dir, // base
- dir, // recur start point
- dest );
+ int num_exported_inodes = export_dir_walk( req,
+ fin,
+ dir, // base
+ dir, // recur start point
+ dest );
// send the export data!
mds->messenger->send_message(req,
// queue up the finisher
dir->add_waiter( CDIR_WAIT_UNFREEZE, fin );
+
+ // stats
+ mds->logger->inc("ex");
+ mds->logger->inc("iex", num_exported_inodes);
+
show_imports();
}
}
-void MDCache::export_dir_walk(MExportDir *req,
+int MDCache::export_dir_walk(MExportDir *req,
C_MDS_ExportFinish *fin,
CDir *basedir,
CDir *dir,
int newauth)
{
+ int num_exported = 0;
+
dout(7) << "export_dir_walk " << *dir << " " << dir->nitems << " items" << endl;
// dir
CDentry *dn = it->second;
CInode *in = dn->inode;
+ num_exported++;
+
// -- dentry
dout(7) << "export_dir_walk exporting " << *dn << endl;
dir_rope.append( it->first.c_str(), it->first.length()+1 );
// subdirs
for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); it++)
- export_dir_walk(req, fin, basedir, *it, newauth);
+ num_exported += export_dir_walk(req, fin, basedir, *it, newauth);
+
+ return num_exported;
}
show_imports();
- mds->logger->inc("im");
// note new authority (locally) in inode
if (dir->inode->is_auth())
imports.erase(ex);
ex->state_clear(CDIR_STATE_IMPORT);
- mds->logger->inc("immyex");
+ mds->logger->inc("imex");
// move nested exports under containing_import
for (set<CDir*>::iterator it = nested_exports[ex].begin();
list<inodeno_t> imported_subdirs;
crope dir_state = m->get_state();
int off = 0;
+ int num_imported_inodes = 0;
+ timepair_t now = g_clock.gettimepair(); // for popularity adjustments
+
for (int i = 0; i < m->get_ndirs(); i++) {
- import_dir_block(dir_state,
- off,
- oldauth,
- dir, // import root
- imported_subdirs);
+ num_imported_inodes +=
+ import_dir_block(dir_state,
+ off,
+ oldauth,
+ dir, // import root
+ imported_subdirs,
+ now);
}
dout(10) << " " << imported_subdirs.size() << " imported subdirs" << endl;
dout(10) << " " << m->get_exports().size() << " imported nested exports" << endl;
}
+ // some stats
+ mds->logger->inc("im");
+ mds->logger->inc("iim", num_imported_inodes);
+ mds->logger->set("nex", exports.size());
+ mds->logger->set("nim", imports.size());
+
+
// FIXME LOG IT
/*
}
-void MDCache::decode_import_inode(CDentry *dn, crope& r, int& off, int oldauth)
+void MDCache::decode_import_inode(CDentry *dn, crope& r, int& off, int oldauth, timepair_t& now)
{
CInodeExport istate;
}
// state
- istate.update_inode(in);
+ istate.update_inode(in, now);
// add inode?
if (added) {
}
-void MDCache::import_dir_block(crope& r,
- int& off,
- int oldauth,
- CDir *import_root,
- list<inodeno_t>& imported_subdirs)
+int MDCache::import_dir_block(crope& r,
+ int& off,
+ int oldauth,
+ CDir *import_root,
+ list<inodeno_t>& imported_subdirs,
+ timepair_t& now)
{
// set up dir
CDirExport dstate;
imported_subdirs.push_back(dir->ino());
// assimilate state
- dstate.update_dir( dir );
+ dstate.update_dir( dir, now );
if (diri->is_auth()) dir->dir_auth = CDIR_AUTH_PARENT; // update_dir may hose dir_auth
// mark (may already be marked from get_or_open_dir() above)
dout(15) << "doing contents" << endl;
// contents
+ int num_imported = 0;
for (long nden = dstate.get_nden(); nden>0; nden--) {
+
+ num_imported++;
+
// dentry
string dname = r.c_str() + off;
off += dname.length()+1;
}
else if (icode == 'I') {
// inode
- decode_import_inode(dn, r, off, oldauth);
+ decode_import_inode(dn, r, off, oldauth, now);
}
// mark dentry dirty? (only _after_ we link the inode!)
}
+ return num_imported;
}
void handle_export_dir_prep_ack(MExportDirPrepAck *m);
void export_dir_go(CDir *dir,
int dest);
- void export_dir_walk(MExportDir *req,
- class C_MDS_ExportFinish *fin,
- CDir *basedir,
- CDir *dir,
- int newauth);
+ int export_dir_walk(MExportDir *req,
+ class C_MDS_ExportFinish *fin,
+ CDir *basedir,
+ CDir *dir,
+ int newauth);
void export_dir_finish(CDir *dir);
void handle_export_dir_notify_ack(MExportDirNotifyAck *m);
void handle_export_dir(MExportDir *m);
void import_dir_finish(CDir *dir);
void handle_export_dir_finish(MExportDirFinish *m);
- void import_dir_block(crope& r,
- int& off,
- int oldauth,
- CDir *import_root,
- list<inodeno_t>& imported_subdirs);
+ int import_dir_block(crope& r,
+ int& off,
+ int oldauth,
+ CDir *import_root,
+ list<inodeno_t>& imported_subdirs,
+ timepair_t& now);
void got_hashed_replica(CDir *import,
inodeno_t dir_ino,
inodeno_t replica_ino);
- void decode_import_inode(CDentry *dn, crope& r, int &off, int oldauth);
+ void decode_import_inode(CDentry *dn, crope& r, int &off, int oldauth, timepair_t& now);
// bystander
void handle_export_dir_warning(MExportDirWarning *m);
num_events = 0;
max_events = 0;
+ waiting_for_read = false;
+
logstream = new LogStream(mds, mds->filer, MDS_INO_LOG_OFFSET + mds->get_nodeid());
char name[80];
shutting_down = false;
shut_down = false;
+ mds_paused = false;
stat_ops = 0;
last_heartbeat = 0;
if (mdstore) { delete mdstore; mdstore = NULL; }
if (mdlog) { delete mdlog; mdlog = NULL; }
if (balancer) { delete balancer; balancer = NULL; }
+ if (osdmonitor) { delete osdmonitor; osdmonitor = 0; }
+ if (idalloc) { delete idalloc; idalloc = NULL; }
+ if (anchormgr) { delete anchormgr; anchormgr = NULL; }
+ if (osdcluster) { delete osdcluster; osdcluster = 0; }
- if (logger) { delete logger; logger = 0; }
-
+ if (filer) { delete filer; filer = 0; }
if (messenger) { delete messenger; messenger = NULL; }
- if (idalloc) { delete idalloc; idalloc = NULL; }
+ if (logger) { delete logger; logger = 0; }
+
}
// flush log
mdlog->flush();
+
+ // trim cache
+ mdcache->trim();
// finish any triggered contexts
dout(7) << *mds << "do_fetch_dir_2 hashcode " << hashcode << " dir " << *dir << endl;
// parse buffer contents into cache
- cout << "bl is " << bl << endl;
+ dout(15) << "bl is " << bl << endl;
size_t size;
bl.copy(0, sizeof(size), (char*)&size);
assert(bl.length() >= size + sizeof(size));
class EAlloc : public LogEvent {
protected:
- int type;
+ int idtype;
idno_t id;
int what;
public:
- EAlloc(int type, idno_t id, int what) :
+ EAlloc(int idtype, idno_t id, int what) :
LogEvent(EVENT_ALLOC) {
- this->type = type;
+ this->idtype = idtype;
this->id = id;
this->what = what;
}
LogEvent(EVENT_ALLOC) {
}
- virtual void encode_payload(bufferlist& bl) {
- bl.append((char*)&type, sizeof(type));
+ void encode_payload(bufferlist& bl) {
+ bl.append((char*)&idtype, sizeof(idtype));
bl.append((char*)&id, sizeof(id));
bl.append((char*)&what, sizeof(what));
}
void decode_payload(bufferlist& bl, int& off) {
- bl.copy(off, sizeof(type), (char*)&type);
- off += sizeof(type);
+ bl.copy(off, sizeof(idtype), (char*)&idtype);
+ off += sizeof(idtype);
bl.copy(off, sizeof(id), (char*)&id);
off += sizeof(id);
bl.copy(off, sizeof(what), (char*)&what);
virtual bool obsolete(MDS *mds) {
- if (mds->idalloc->is_dirty(type,id))
+ if (mds->idalloc->is_dirty(idtype, id))
return false; // still dirty
else
return true; // already flushed
{
int fromport = 0;
- Cond *cond = new Cond();
+ Cond cond;
// make up a pcid that is unique (to me!)
/* NOTE: since request+replies are matched up on pcid's alone, it means that
// add call records
assert(call_cond.count(pcid) == 0); // pcid should be UNIQUE
- call_cond[pcid] = cond;
+ call_cond[pcid] = &cond;
call_reply[pcid] = 0; // no reply yet
// send. drop locks in case send_message is bad and blocks
dout(DEBUGLVL) << "sendrecv waiting for reply on pcid " << pcid << endl;
//cout << "wait start, value = " << sem->Value() << endl;
- cond->Wait(lock);
+ cond.Wait(lock);
} else {
dout(DEBUGLVL) << "sendrecv reply is already here on pcid " << pcid << endl;
}
void HostMonitor::handle_failure_ack(MFailureAck *m)
{
+
+ // FIXME: this doesn't handle failed -> alive transitions gracefully at all..
+
// the higher-up's acknowledged our failure notification, we can stop resending it.
msg_addr_t failed = m->get_failed();
dout(DBL) << "handle_failure_ack " << failed << endl;
unacked_failures.erase(failed);
acked_failures.insert(failed);
- // FIXME: this doesn't handle failed -> alive transitions gracefully at all..
+ delete m;
}
if (shadow) {
is_shadow = true;
shadowdir = shadow;
- }
+ } else
+ is_shadow = false;
}
////
void FakeStore::get_dir(string& dir) {
- static char s[30];
+ char s[30];
sprintf(s, "%d", whoami);
dir = basedir + "/" + s;
}
void FakeStore::get_oname(object_t oid, string& fn, bool shadow) {
- static char s[100];
+ char s[100];
sprintf(s, "%d/%02lld/%lld", whoami, HASH_FUNC(oid), oid);
if (shadow)
fn = shadowdir + "/" + s;
off_t actual = lseek(fd, offset, SEEK_SET);
size_t did = 0;
- if (actual == offset) {
- did = ::write(fd, buffer, len);
- }
+ assert(actual == offset);
+ did = ::write(fd, buffer, len);
// sync to to disk?
if (do_fsync) fsync(fd); // should this be fsync?
messenger = m;
messenger->set_dispatcher(this);
+ osdcluster = 0;
+
// use fake store
store = new FakeStore(osd_base_path, whoami);
OSD::~OSD()
{
+ if (osdcluster) { delete osdcluster; osdcluster = 0; }
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
+ if (store) { delete store; store = 0; }
}
int OSD::init()
// osd
case MSG_SHUTDOWN:
shutdown();
+ delete m;
break;
case MSG_OSD_GETCLUSTERACK:
bptr.set_length(got); // properly size the buffer
// give it to the reply in a bufferlist
- bufferlist bl;
- bl.push_back( bptr );
+ reply->get_data().push_back( bptr );
reply->set_result(0);
reply->set_data(bl);
for (list<bufferptr>::iterator it = bl.buffers().begin();
it != bl.buffers().end();
it++) {
+
int r = store->write(m->get_oid(),
(*it).length(), off,
(*it).c_str(),
public:
OSDCluster() : version(0), rush(0) { }
+ ~OSDCluster() {
+ if (rush) { delete rush; rush = 0; }
+ }
__uint64_t get_version() { return version; }
// finish, clean up
Context *onfinish = p->onfinish;
- delete p; // del pendingOsdRead_t
-
int result = p->read_result->length(); // assume success
+
dout(7) << "read " << result << " bytes " << p->read_result->length() << endl;
// done
+ delete p; // del pendingOsdRead_t
if (onfinish) {
onfinish->finish(result);
delete onfinish;