fakemessenger_serialize: true,
debug: 15,
+ debug_mds_balancer: 1,
+ debug_mds_log: 1,
// --- client ---
client_cache_size: 400,
mds_log: true,
mds_log_max_len: 10000,//MDS_CACHE_SIZE / 3,
- mds_log_max_trimming: 16,
+ mds_log_max_trimming: 32,
mds_log_read_inc: 65536,
mds_log_before_reply: true,
mds_log_flush_on_shutdown: true,
mds_bal_replicate_threshold: 500,
mds_bal_unreplicate_threshold: 200,
- mds_bal_interval: 200,
+ mds_bal_interval: 500,
mds_commit_on_shutdown: true,
else if (strcmp(argv[i], "--debug") == 0)
g_conf.debug = atoi(argv[++i]);
+ else if (strcmp(argv[i], "--debug_mds_balancer") == 0)
+ g_conf.debug_mds_balancer = atoi(argv[++i]);
+ else if (strcmp(argv[i], "--debug_mds_log") == 0)
+ g_conf.debug_mds_log = atoi(argv[++i]);
else if (strcmp(argv[i], "--log") == 0)
g_conf.log_name = argv[++i];
g_conf.mds_log_before_reply = atoi(argv[++i]);
else if (strcmp(argv[i], "--mds_log_max_len") == 0)
g_conf.mds_log_max_len = atoi(argv[++i]);
+ else if (strcmp(argv[i], "--mds_log_read_inc") == 0)
+ g_conf.mds_log_read_inc = atoi(argv[++i]);
else if (strcmp(argv[i], "--mds_log_max_trimming") == 0)
g_conf.mds_log_max_trimming = atoi(argv[++i]);
else if (strcmp(argv[i], "--mds_commit_on_shutdown") == 0)
bool fake_clock;
bool fakemessenger_serialize;
- int debug;
-
+ int debug;
+ int debug_mds_balancer;
+ int debug_mds_log;
// client
int client_cache_size;
cout << "mounting" << endl;
client[i]->mount(mkfs);
- cout << "starting synthatic client " << endl;
+ cout << "starting synthetic client " << endl;
syn[i] = new SyntheticClient(client[i]);
char s[20];
int length() {
return _len;
}
+ int offset() {
+ return _off;
+ }
// modifiers
void set_offset(int off) {
- assert(off <= _len);
+ assert(off <= _buffer->_alloc_len);
_off = off;
}
void set_length(int len) {
_buffer->_len += len;
_len += len;
}
- void copy(int off, int len, char *dest) {
+ void copy_out(int off, int len, char *dest) {
assert(off >= 0 && off <= _len);
assert(len >= 0 && off + len <= _len);
memcpy(dest, c_str() + off, len);
}
+ void copy_in(int off, int len, char *src) {
+ assert(off >= 0 && off <= _len);
+ assert(len >= 0 && off + len <= _len);
+ memcpy(c_str() + off, src, len);
+ }
friend ostream& operator<<(ostream& out, bufferptr& bp);
};
inline ostream& operator<<(ostream& out, bufferptr& bp) {
return out << "bufferptr(len=" << bp._len << ", off=" << bp._off
- << ", " << bp.c_str()
- //<< ", " << *bp._buffer
+ //<< ", " << bp.c_str()
+ << ", " << *bp._buffer
<< ")";
}
while (len > 0) {
// is the rest ALL in this buffer?
if (off + len <= (*curbuf).length()) {
- (*curbuf).copy(off, len, dest); // yup, last bit!
+ (*curbuf).copy_out(off, len, dest); // yup, last bit!
break;
}
// get as much as we can from this buffer.
int howmuch = (*curbuf).length() - off;
- (*curbuf).copy(off, howmuch, dest);
+ (*curbuf).copy_out(off, howmuch, dest);
dest += howmuch;
len -= howmuch;
assert(curbuf != _buffers.end());
}
}
+
+ void copy_in(int off, int len, char *src) {
+ assert(off >= 0);
+ assert(off + len <= length());
+
+ // advance to off
+ list<bufferptr>::iterator curbuf = _buffers.begin();
+
+ // skip off
+ while (off > 0) {
+ assert(curbuf != _buffers.end());
+ if (off >= (*curbuf).length()) {
+ // skip this buffer
+ off -= (*curbuf).length();
+ curbuf++;
+ } else {
+ // somewhere in this buffer!
+ break;
+ }
+ }
+
+ // copy
+ while (len > 0) {
+ // is the rest ALL in this buffer?
+ if (off + len <= (*curbuf).length()) {
+ (*curbuf).copy_in(off, len, src); // yup, last bit!
+ break;
+ }
+
+ // get as much as we can from this buffer.
+ int howmuch = (*curbuf).length() - off;
+ (*curbuf).copy_in(off, howmuch, src);
+
+ src += howmuch;
+ len -= howmuch;
+ off = 0;
+ curbuf++;
+ assert(curbuf != _buffers.end());
+ }
+ }
+
+
void append(const char *data, int len) {
if (len == 0) return;
//cout << "keeping end of " << *curbuf << ", losing first " << off+len << endl;
if (claim_by)
claim_by->append( *curbuf, len, off );
- (*curbuf).set_offset( off + len ); // ignore beginning big
+ (*curbuf).set_offset( off + len + (*curbuf).offset() ); // ignore beginning big
(*curbuf).set_length( (*curbuf).length() - len - off );
//cout << " now " << *curbuf << endl;
break;
#include "IdAllocator.h"
#include "MDS.h"
+#include "MDLog.h"
+#include "events/EAlloc.h"
+
#include "osd/Filer.h"
#include "osd/OSDCluster.h"
idno_t id = free[type].first();
free[type].erase(id);
dout(DBLEVEL) << "idalloc " << this << ": getid type " << type << " is " << id << endl;
- //free[type].dump();
- //save();
+
+ mds->mdlog->submit_entry(new EAlloc(type, id, EALLOC_EV_ALLOC));
+ dirty[type].insert(id);
+
return id;
}
dout(DBLEVEL) << "idalloc " << this << ": reclaim type " << type << " id " << id << endl;
free[type].insert(id);
//free[type].dump();
- //save();
+
+ mds->mdlog->submit_entry(new EAlloc(type, id, EALLOC_EV_FREE));
+ dirty[type].insert(id);
}
-void IdAllocator::save()
+void IdAllocator::save(Context *onfinish)
{
crope data;
assert(mapsize == 0);
}
+ // reset dirty list .. FIXME this is optimistic, i'm assuming the write succeeds.
+ dirty.clear();
+
// turn into bufferlist
bufferlist bl;
bl.append(data.c_str(), data.length());
0,
bl,
0,
- NULL);
+ onfinish);
}
class IdAllocator {
MDS *mds;
- map< int, rangeset<idno_t> > free; // type -> rangeset
-
+ map< int, rangeset<idno_t> > free; // type -> rangeset
+ map< int, set<idno_t> > dirty; // dirty ids
+
bool opened, opening;
public:
bool is_open() { return opened; }
bool is_opening() { return opening; }
+ bool is_dirty(int type, idno_t id) {
+ return dirty[type].count(id) ? true:false;
+ }
+
void reset();
+ void save(Context *onfinish=0);
void shutdown() {
- if (is_open()) save();
+ if (is_open()) save(0);
}
- void save();
void load(Context *onfinish);
void load_2(int, bufferlist&, Context *onfinish);
#define EVENT_STRING 1
#define EVENT_INODEUPDATE 2
#define EVENT_UNLINK 3
+#define EVENT_ALLOC 4
// generic log event
class LogEvent {
int get_type() { return type; }
- virtual crope get_payload() = 0; // children overload this
-
- crope get_serialized() {
- crope s;
+ virtual void encode_payload(bufferlist& bl) = 0;
+ virtual void decode_payload(bufferlist& bl, int& off) = 0;
+ void encode(bufferlist& bl) {
// type
- __uint32_t ptype = type;
- s.append((char*)&ptype, sizeof(ptype));
-
- // len+payload
- crope payload = get_payload();
- __uint32_t plen = payload.length();
- s.append((char*)&plen, sizeof(plen));
- s.append(payload);
- return s;
+ bl.append((char*)&type, sizeof(type));
+
+ // len placeholder
+ int len = 0; // we don't know just yet...
+ int off = bl.length();
+ bl.append((char*)&type, sizeof(len));
+
+ // payload
+ encode_payload(bl);
+ len = bl.length() - off - sizeof(len);
+ bl.copy_in(off, sizeof(len), (char*)&len);
}
virtual bool obsolete(MDS *m) {
#include "events/EString.h"
#include "events/EInodeUpdate.h"
#include "events/EUnlink.h"
+#include "events/EAlloc.h"
#include <iostream>
using namespace std;
#include "include/config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".logstream "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".logstream "
+
+// ----------------------------
// writing
-int LogStream::append(LogEvent *e, Context *c)
-{
- // serialize
- crope buffer = e->get_serialized();
- size_t buflen = buffer.length();
+class C_LS_Append : public Context {
+ LogStream *ls;
+ off_t off;
+public:
+ C_LS_Append(LogStream *ls, off_t off) {
+ this->ls = ls;
+ this->off = off;
+ }
+ void finish(int r) {
+ ls->_append_2(off);
+ }
+};
+
- // into a bufferlist.. FIXME
+off_t LogStream::append(LogEvent *e)
+{
+ // serialize FIXME ********
bufferlist bl;
- bl.append(buffer.c_str(), buffer.length());
+ e->encode(bl);
+ size_t elen = bl.length();
+
+ // append
+ dout(10) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl;
+
+ off_t off = append_pos;
+ append_pos += elen;
+ dout(15) << "write buf was " << write_buf.length() << endl;
+ write_buf.claim_append(bl);
+ dout(15) << "write buf now " << write_buf.length() << endl;
+
+ return off;
+}
+void LogStream::_append_2(off_t off)
+{
+ dout(10) << "sync_pos now " << off << endl;
+ sync_pos = off;
+
+ // wake up waiters
+ map< off_t, list<Context*> >::iterator it = waiting_for_sync.begin();
+ while (it != waiting_for_sync.end()) {
+ if (it->first > sync_pos) break;
+
+ // wake them up!
+ dout(15) << it->second.size() << " waiters at offset " << it->first << " <= " << sync_pos << endl;
+ for (list<Context*>::iterator cit = it->second.begin();
+ cit != it->second.end();
+ cit++)
+ mds->finished_queue.push_back(*cit);
+
+ // continue
+ waiting_for_sync.erase(it++);
+ }
+}
- dout(10) << "append event type " << e->get_type() << " size " << buflen << " at log offset " << append_pos << endl;
+void LogStream::wait_for_sync(Context *c, off_t off)
+{
+ if (off == 0) off = append_pos;
+ assert(off > sync_pos);
- // advance ptr for later
- append_pos += buffer.length();
+ dout(15) << "sync " << c << " waiting for " << off << " (sync_pos currently " << sync_pos << ")" << endl;
+ waiting_for_sync[off].push_back(c);
- // submit write
- mds->filer->write(log_ino, // FIXME
- buflen, append_pos-buflen,
- bl,
- 0,
- c);
- return 0;
+ // initiate flush now? (since we have a waiter...)
+ if (autoflush) flush();
+}
+
+void LogStream::flush()
+{
+ // write to disk
+ if (write_buf.length()) {
+ dout(15) << "flush flush_pos " << flush_pos << " < append_pos " << append_pos << ", writing " << write_buf.length() << " bytes" << endl;
+
+ assert(write_buf.length() == append_pos - flush_pos);
+
+ mds->filer->write(log_ino,
+ write_buf.length(), flush_pos,
+ write_buf,
+ 0,
+ new C_LS_Append(this, append_pos));
+
+ write_buf.clear();
+ flush_pos = append_pos;
+ } else {
+ dout(15) << "flush flush_pos " << flush_pos << " == append_pos " << append_pos << ", nothing to do" << endl;
+ }
+
}
+
+
+
+// -------------------------------------------------
// reading
-/*
-class C_LS_ReadNext : public Context {
- LogStream *ls;
- LogEvent **le;
- Context *c;
-public:
- C_LS_ReadNext(LogStream *ls, LogEvent **le, Context *c) {
- this->ls = ls;
- this->le = le;
- this->c = c;
- }
- void finish(int result) {
- ls->read_next(le,c,2);
+
+LogEvent *LogStream::get_next_event()
+{
+ if (read_buf.length() < 2*sizeof(__uint32_t))
+ return 0;
+
+ // parse type, length
+ int off = 0;
+ __uint32_t type, length;
+ read_buf.copy(off, sizeof(__uint32_t), (char*)&type);
+ off += sizeof(__uint32_t);
+ read_buf.copy(off, sizeof(__uint32_t), (char*)&length);
+ off += sizeof(__uint32_t);
+
+ dout(10) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl;
+
+ assert(type > 0);
+
+ if (read_buf.length() < off + length)
+ return 0;
+
+ // create event
+ LogEvent *le;
+ switch (type) {
+ case EVENT_STRING: // string
+ le = new EString();
+ break;
+
+ case EVENT_INODEUPDATE:
+ le = new EInodeUpdate();
+ break;
+
+ case EVENT_UNLINK:
+ le = new EUnlink();
+ break;
+
+ case EVENT_ALLOC:
+ le = new EAlloc();
+ break;
+
+ default:
+ dout(1) << "uh oh, unknown event type " << type << endl;
+ assert(0);
}
-};
-*/
+
+ // decode
+ le->decode_payload(read_buf, off);
+
+ // discard front of read_buf
+ read_pos += off;
+ read_buf.splice(0, off);
+
+ dout(15) << "get_next_event got event, read_pos now " << read_pos << " (append_pos is " << append_pos << ")" << endl;
+
+ // ok!
+ return le;
+}
+
+
class C_LS_ReadChunk : public Context {
public:
bufferlist bl;
LogStream *ls;
- LogEvent **le;
- Context *c;
- C_LS_ReadChunk(LogStream *ls, LogEvent **le, Context *c) {
+
+ C_LS_ReadChunk(LogStream *ls) {
this->ls = ls;
- this->le = le;
- this->c = c;
}
void finish(int result) {
- crope next_bit;
- next_bit.append(bl.c_str(), bl.length());
- ls->did_read_bit(next_bit, le, c);
+ ls->_did_read(bl);
}
};
-void LogStream::did_read_bit(crope& next_bit, LogEvent **le, Context *c)
-{
- // add to our buffer
- buffer.append(next_bit);
- reading_block = false;
-
- // throw off beginning part
- if (buffer.length() > g_conf.mds_log_read_inc*2) {
- int trim = buffer.length() - g_conf.mds_log_read_inc*2;
- buf_start += trim;
- buffer = buffer.substr(trim, buffer.length() - trim);
- dout(10) << "did_read_bit adjusting buf_start now +" << trim << " = " << buf_start << " len " << buffer.length() << endl;
- }
-
- // continue at step 2
- read_next(le, c, 2);
-}
-int LogStream::read_next(LogEvent **le, Context *c, int step)
+void LogStream::wait_for_next_event(Context *c)
{
- if (step == 1) {
- // does buffer have what we want?
- //if (buf_start > cur_pos ||
- //buf_start+buffer.length() < cur_pos+4) {
- if (buf_start+buffer.length() < cur_pos+ g_conf.mds_log_read_inc/2) {
-
- // make sure block is being read
- if (reading_block) {
- dout(10) << "read_next already reading log head from disk, offset " << cur_pos << endl;
- assert(0);
- } else {
- off_t start = buf_start+buffer.length();
- dout(10) << "read_next reading log head from disk, offset " << start << " len " << g_conf.mds_log_read_inc << endl;
- // nope. read a chunk
- C_LS_ReadChunk *readc = new C_LS_ReadChunk(this, le, c);
- reading_block = true;
- mds->filer->read(log_ino, // FIXME
- g_conf.mds_log_read_inc, start,
- &readc->bl,
- readc);
- }
- return 0;
- }
- step = 2;
+ // add waiter
+ if (c) waiting_for_read.push_back(c);
+
+ // issue read
+ off_t tail = read_pos + read_buf.length();
+ size_t size = g_conf.mds_log_read_inc;
+ if (tail + size > sync_pos) {
+ size = sync_pos - tail;
+ assert(size > 0); // bleh, wait for sync, etc.
}
+ dout(15) << "wait_for_next_event reading from pos " << tail << " len " << size << endl;
+ C_LS_ReadChunk *readc = new C_LS_ReadChunk(this);
+ mds->filer->read(log_ino,
+ g_conf.mds_log_read_inc, tail,
+ &readc->bl,
+ readc);
+}
- if (step == 2) {
- reading_block = false;
-
- // decode event
- unsigned off = cur_pos-buf_start;
- __uint32_t type, length;
- buffer.copy(off, sizeof(__uint32_t), (char*)&type);
- buffer.copy(off+sizeof(__uint32_t), sizeof(__uint32_t), (char*)&length);
- off += sizeof(type) + sizeof(length);
-
- dout(10) << "read_next got event type " << type << " size " << length << " at log offset " << cur_pos << endl;
- cur_pos += sizeof(type) + sizeof(length) + length;
-
- switch (type) {
-
- case EVENT_STRING: // string
- *le = new EString(buffer.substr(off,length));
- break;
-
- case EVENT_INODEUPDATE:
- *le = new EInodeUpdate(buffer.substr(off,length));
- break;
-
- case EVENT_UNLINK:
- *le = new EUnlink(buffer.substr(off,length));
- break;
-
+void LogStream::_did_read(bufferlist& blist)
+{
+ dout(15) << "_did_read got " << blist.length() << " bytes" << endl;
+ read_buf.claim_append(blist);
- default:
- dout(1) << "uh oh, unknown event type " << type << endl;
- assert(0);
- }
-
- // finish
- if (c) {
- c->finish(0);
- delete c;
- }
-
- /*
- // any other waiters too!
- list<Context*> finished = waiting_for_read_block;
- waiting_for_read_block.clear();
- for (list<Context*>::iterator it = finished.begin();
- it != finished.end();
- it++) {
- Context *c = *it;
- if (c) {
- c->finish(0);
- delete c;
- }
- }
- */
-
- }
+ list<Context*> finished;
+ finished.splice(finished.begin(), waiting_for_read);
+ finish_contexts(finished, 0);
}
+
#include "include/types.h"
#include "include/Context.h"
-#include <ext/rope>
-using namespace __gnu_cxx;
+#include "include/buffer.h"
+#include "include/bufferlist.h"
+
+#include <map>
+#include <list>
class LogEvent;
+class Filer;
class MDS;
class LogStream {
protected:
MDS *mds;
- off_t cur_pos, append_pos;
+ Filer *filer;
+
inodeno_t log_ino;
- object_t oid;
- bool reading_block;
- //list<Context*> waiting_for_read_block;
+ // writing
+ off_t sync_pos; // first non-written byte
+ off_t flush_pos; // first non-writing byte, beginning of write_buf
+ off_t append_pos; // where next event will be written
+ bufferlist write_buf; // unwritten (between flush_pos and append_pos)
+
+ // reading
+ off_t read_pos; // abs position in file
+ //off_t read_buf_start; // where read buf begins
+ bufferlist read_buf;
+ bool reading;
+
+ std::map< off_t, std::list<Context*> > waiting_for_sync;
+ std::list<Context*> waiting_for_read;
+
- crope buffer;
- off_t buf_start;
+ bool autoflush;
+
public:
- LogStream(MDS *mds, inodeno_t log_ino) {
+ LogStream(MDS *mds, Filer *filer, inodeno_t log_ino) {
this->mds = mds;
+ this->filer = filer;
this->log_ino = log_ino;
- cur_pos = 0;
- append_pos = 0; // fixme
- buf_start = 0;
- reading_block = false;
- }
- off_t seek(off_t offset) {
- cur_pos = offset;
- }
+ // wr
+ sync_pos = flush_pos = append_pos = 0;
+ autoflush = true;
- off_t get_pos() {
- return cur_pos;
+ // rd
+ read_pos = 0;
+ reading = false;
}
+ // write (append to end)
+ off_t append(LogEvent *e); // returns offset it will be written to
+ void _append_2(off_t off);
+ void wait_for_sync(Context *c, off_t off=0); // wait for flush
+ void flush(); // initiate flush
+
+ // read (from front)
+ //bool has_next_event();
+ LogEvent *get_next_event();
+ void wait_for_next_event(Context *c);
+ void _did_read(bufferlist& blist);
+
+
+ // old interface
+ /*
// WARNING: non-reentrant; single reader only
int read_next(LogEvent **le, Context *c, int step=1);
void did_read_bit(crope& next_bit, LogEvent **le, Context *c) ;
int append(LogEvent *e, Context *c); // append at cur_pos, mind you!
+ */
};
#endif
#include "include/config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".bal "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << "mds" << mds->get_nodeid() << ".bal "
#define MIN_LOAD 50 // ??
#define MIN_REEXPORT 5 // will automatically reexport
void MDBalancer::handle_heartbeat(MHeartbeat *m)
{
- dout(5) << " got heartbeat " << m->get_beat() << " from " << m->get_source() << " " << m->get_load() << endl;
+ dout(5) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source() << " " << m->get_load() << endl;
if (!mds->mdcache->get_root()) {
dout(10) << "no root on handle" << endl;
beat_epoch = m->get_beat();
send_heartbeat();
- mds->mdcache->show_imports();
+ show_imports();
}
mds_load[ m->get_source() ] = m->get_load();
if (my_load < target_load.root_pop) {
dout(5) << " i am underloaded, doing nothing." << endl;
- mds->mdcache->show_imports();
+ show_imports();
return;
}
dout(5) << " sending " << amount << " to " << target << endl;
- mds->mdcache->show_imports();
+ show_imports();
// search imports from target
if (import_from_map.count(target)) {
}
dout(5) << "rebalance done" << endl;
- mds->mdcache->show_imports();
+ show_imports();
}
+
+
+
+void MDBalancer::show_imports(bool external)
+{
+ int db = 7; //debug level
+
+ if (mds->mdcache->imports.size() == 0) {
+ dout(db) << "no imports/exports" << endl;
+ return;
+ }
+ dout(db) << "imports/exports:" << endl;
+
+ set<CDir*> ecopy = mds->mdcache->exports;
+
+ timepair_t now = g_clock.gettimepair();
+
+ for (set<CDir*>::iterator it = mds->mdcache->imports.begin();
+ it != mds->mdcache->imports.end();
+ it++) {
+ CDir *im = *it;
+ dout(db) << " + import (" << im->popularity[MDS_POP_CURDOM].get(now) << "/" << im->popularity[MDS_POP_ANYDOM].get(now) << ") " << *im << endl;
+ assert( im->is_import() );
+ assert( im->is_auth() );
+
+ for (set<CDir*>::iterator p = mds->mdcache->nested_exports[im].begin();
+ p != mds->mdcache->nested_exports[im].end();
+ p++) {
+ CDir *exp = *p;
+ dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl;
+ assert( exp->is_export() );
+ assert( !exp->is_auth() );
+
+ if ( mds->mdcache->get_containing_import(exp) != im ) {
+ dout(1) << "uh oh, containing import is " << mds->mdcache->get_containing_import(exp) << endl;
+ dout(1) << "uh oh, containing import is " << *mds->mdcache->get_containing_import(exp) << endl;
+ assert( mds->mdcache->get_containing_import(exp) == im );
+ }
+
+ if (ecopy.count(exp) != 1) {
+ dout(1) << " nested_export " << *exp << " not in exports" << endl;
+ assert(0);
+ }
+ ecopy.erase(exp);
+ }
+ }
+
+ if (ecopy.size()) {
+ for (set<CDir*>::iterator it = ecopy.begin();
+ it != ecopy.end();
+ it++)
+ dout(1) << " stray item in exports: " << **it << endl;
+ assert(ecopy.size() == 0);
+ }
+}
+
+
+
/* replicate?
float dir_pop = dir->get_popularity();
void hit_recursive(class CDir *dir, timepair_t& now);
+ void show_imports(bool external=false);
};
// remove from map
active_requests.erase(req);
+
+
+ // log some stats *****
+ mds->logger->set("c", lru.lru_get_size());
+ mds->logger->set("cmax", lru.lru_get_max());
+
+
}
void MDCache::request_finish(Message *req)
request_cleanup(req);
delete req; // delete req
- mds->logger->inc("rep");
+ mds->logger->inc("reply");
//dump();
}
void MDCache::show_imports()
{
- if (imports.size() == 0) {
- dout(7) << "no imports/exports" << endl;
- return;
- }
- dout(7) << "imports/exports:" << endl;
-
- set<CDir*> ecopy = exports;
-
- timepair_t now = g_clock.gettimepair();
-
- for (set<CDir*>::iterator it = imports.begin();
- it != imports.end();
- it++) {
- CDir *im = *it;
- dout(7) << " + import (" << im->popularity[MDS_POP_CURDOM].get(now) << "/" << im->popularity[MDS_POP_ANYDOM].get(now) << ") " << *im << endl;
- assert( im->is_import() );
- assert( im->is_auth() );
-
- for (set<CDir*>::iterator p = nested_exports[im].begin();
- p != nested_exports[im].end();
- p++) {
- CDir *exp = *p;
- dout(7) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl;
- assert( exp->is_export() );
- assert( !exp->is_auth() );
-
- if ( get_containing_import(exp) != im ) {
- dout(7) << "uh oh, containing import is " << get_containing_import(exp) << endl;
- dout(7) << "uh oh, containing import is " << *get_containing_import(exp) << endl;
- assert( get_containing_import(exp) == im );
- }
-
- if (ecopy.count(exp) != 1) {
- dout(7) << " nested_export " << *exp << " not in exports" << endl;
- assert(0);
- }
- ecopy.erase(exp);
- }
- }
-
- if (ecopy.size()) {
- for (set<CDir*>::iterator it = ecopy.begin();
- it != ecopy.end();
- it++)
- dout(7) << " stray item in exports: " << **it << endl;
- assert(ecopy.size() == 0);
- }
-
-
+ //mds->balancer->show_imports(true);
}
#include "include/config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".log "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".log "
// cons/des
mds = m;
num_events = 0;
max_events = 0;
- trim_reading = false;
- reader = new LogStream(mds,
- 100 + mds->get_nodeid());
- writer = new LogStream(mds,
- 100 + mds->get_nodeid());
-
- string name;
- name = "mds";
- int w = mds->get_nodeid();
- if (w >= 1000) name += ('0' + ((w/1000)%10));
- if (w >= 100) name += ('0' + ((w/100)%10));
- if (w >= 10) name += ('0' + ((w/10)%10));
- name += ('0' + ((w/1)%10));
- name += ".log";
+
+ logstream = new LogStream(mds, mds->filer, MDS_INO_LOG_OFFSET + mds->get_nodeid());
+
+ char name[80];
+ sprintf(name, "mds%d.log", mds->get_nodeid());
logger = new Logger(name, (LogType*)&mdlog_logtype);
}
MDLog::~MDLog()
{
- if (reader) { delete reader; reader = 0; }
- if (writer) { delete writer; writer = 0; }
+ if (logstream) { delete logstream; logstream = 0; }
if (logger) { delete logger; logger = 0; }
}
-class C_MDL_SubmitEntry : public Context {
-protected:
- MDLog *mdl;
-public:
- Context *c;
- LogEvent *le;
-
- C_MDL_SubmitEntry(MDLog *m, LogEvent *e, Context *c) {
- mdl = m;
- le = e;
- this->c = c;
- }
- void finish(int res) {
- mdl->submit_entry_2(le,c);
- }
-};
int MDLog::submit_entry( LogEvent *e,
Context *c )
{
- if (!g_conf.mds_log) {
+ dout(5) << "submit_entry at " << num_events << endl;
+
+ if (g_conf.mds_log) {
+ off_t off = logstream->append(e);
+ delete e;
+ num_events++;
+
+ logger->inc("add");
+ logger->set("size", num_events);
+
+ if (c)
+ logstream->wait_for_sync(c, off);
+ } else {
+ // hack: log is disabled..
if (c) {
c->finish(0);
delete c;
}
- return 0;
}
-
- dout(5) << "submit_entry" << endl;
-
- // write it
- writer->append(e, new C_MDL_SubmitEntry(this, e, c));
- logger->inc("add");
+
}
-int MDLog::submit_entry_2( LogEvent *e,
- Context *c )
+void MDLog::wait_for_sync( Context *c )
{
- dout(5) << "submit_entry done, log size " << num_events << endl;
-
- // written!
- num_events++;
- delete e;
- logger->set("len", num_events);
-
- if (c) {
+ if (g_conf.mds_log) {
+ // wait
+ logstream->wait_for_sync(c);
+ } else {
+ // hack: bypass
c->finish(0);
delete c;
}
-
+}
+
+void MDLog::flush()
+{
+ logstream->flush();
+
// trim
- trim(NULL); // FIXME probably not every time?
+ trim(NULL);
}
-class C_MDL_Trim : public Context {
-protected:
- MDLog *mdl;
+
+
+
+// trim
+
+class C_MDL_Trimmed : public Context {
public:
+ MDLog *mdl;
LogEvent *le;
- int step;
- C_MDL_Trim(MDLog *m, LogEvent *e = 0, int s=2) {
+ C_MDL_Trimmed(MDLog *mdl, LogEvent *le) {
+ this->mdl = mdl;
+ this->le = le;
+ }
+ void finish(int res) {
+ mdl->_trimmed(le);
+ }
+};
+
+class C_MDL_Reading : public Context {
+public:
+ MDLog *mdl;
+ C_MDL_Reading(MDLog *m) {
mdl = m;
- step = s; le = e;
}
void finish(int res) {
- if (step == 2)
- mdl->trim_2_didread(le);
- else if (step == 3)
- mdl->trim_3_didretire(le);
+ mdl->waiting_for_read = false;
+ mdl->trim(0);
}
};
-int MDLog::trim(Context *c)
+void MDLog::trim(Context *c)
{
- if (num_events - trimming.size() > max_events) {
- dout(5) << "trimming. num_events " << num_events << ", trimming " << trimming.size() << " max " << max_events << endl;
+ // add waiter
+ if (c) trim_waiters.push_back(c);
- // add this waiter
- if (c) trim_waiters.push_back(c);
+ // trim!
+ while (num_events - trimming.size() > max_events) {
+ dout(5) << "trim: num_events " << num_events << " - trimming " << trimming.size() << " > max " << max_events << endl;
- trim_readnext(); // read next event off end of log.
- return 0;
- }
-
- // no trimming to be done.
- if (c) {
- c->finish(0);
- delete c;
- }
-}
-
-void MDLog::trim_readnext()
-{
- if (trim_reading) {
- //dout(10) << "trim_readnext already reading." << endl;
- return;
+ LogEvent *le = logstream->get_next_event();
+
+ if (le) {
+ // we just read an event.
+ if (le->obsolete(mds) == true) {
+ // obsolete
+ dout(7) << " obsolete " << le << endl;
+ delete le;
+ num_events--;
+ logger->inc("obs");
+ } else {
+ if (trimming.size() < g_conf.mds_log_max_trimming) {
+ // trim!
+ dout(7) << " trimming " << le << endl;
+ trimming.insert(le);
+ le->retire(mds, new C_MDL_Trimmed(this, le));
+ logger->inc("retire");
+ } else {
+ dout(7) << " already trimming max, waiting" << endl;
+ return;
+ }
+ }
+ } else {
+ // need to read!
+ if (!waiting_for_read) {
+ waiting_for_read = true;
+ dout(7) << " waiting for read" << endl;
+ logstream->wait_for_next_event(new C_MDL_Reading(this));
+ } else {
+ dout(7) << " already waiting for read" << endl;
+ }
+ return;
+ }
}
-
- dout(10) << "trim_readnext" << endl;
- trim_reading = true;
- C_MDL_Trim *readfin = new C_MDL_Trim(this);
- reader->read_next(&readfin->le, readfin);
- logger->inc("read");
+
+ // done!
+ list<Context*> finished;
+ finished.splice(finished.begin(), trim_waiters);
+
+ finish_contexts(finished, 0);
}
-
-// trim_2 : just read an event
-int MDLog::trim_2_didread(LogEvent *le)
+void MDLog::_trimmed(LogEvent *le)
{
- dout(10) << "trim_2_didread " << le << endl;
-
- trim_reading = false;
+ dout(7) << " trimmed " << le << endl;
+ trimming.erase(le);
+ delete le;
+ num_events--;
- // we just read an event.
- if (le->obsolete(mds) == true) {
- trim_3_didretire(le); // we can discard this event and be done.
- logger->inc("obs");
- } else {
- dout(10) << "retire " << le << " ";
- trimming.push_back(le); // add to limbo list
- le->retire(mds, new C_MDL_Trim(this, le, 3)); // retire entry
- logger->inc("retire");
- }
-
- // read another event? FIXME: max_trimming maybe? would need to restructure this again.
- if (num_events - trimming.size() > max_events &&
- trimming.size() < g_conf.mds_log_max_trimming) {
- trim_readnext();
- }
+ trim(0);
}
-int MDLog::trim_3_didretire(LogEvent *le)
-{
- //dout(10) << "trim_2_didretire " << le << endl;
-
- // done with this le.
- if (le) {
- num_events--;
- trimming.remove(le);
- delete le;
- }
-
- // read more?
- if (trim_reading == false &&
- num_events - trimming.size() > max_events) {
- trim_readnext();
- }
-
- // last one?
- if (trimming.size() == 0 && // none mid-retire,
- trim_reading == false) { // and not mid-read
-
- dout(5) << "retired " << le << ", trim done, log size now " << num_events << endl;
-
- // we're done.
- list<Context*> finished = trim_waiters;
- trim_waiters.clear();
-
- list<Context*>::iterator it = finished.begin();
- while (it != finished.end()) {
- Context *c = *it;
- if (c) {
- c->finish(0);
- delete c;
- }
- }
- } else {
- dout(5) << "retired " << le << ", still trimming, log size now " << num_events << endl;
- }
-}
-
size_t num_events; // in events
size_t max_events;
- LogStream *reader;
- LogStream *writer;
+ LogStream *logstream;
- list<LogEvent*> trimming; // events currently being trimmed
+ set<LogEvent*> trimming; // events currently being trimmed
list<Context*> trim_waiters; // contexts waiting for trim
bool trim_reading;
+ bool waiting_for_read;
+ friend class C_MDL_Reading;
+
Logger *logger;
public:
}
int submit_entry( LogEvent *e,
- Context *c );
- int submit_entry_2( LogEvent *e,
- Context *c );
-
- int trim(Context *c); // want to trim
- void trim_readnext(); // read next event
- int trim_2_didread(LogEvent *e); // read log event
- int trim_3_didretire(LogEvent *e); // finished retiring log event
-
+ Context *c = 0 );
+ void wait_for_sync( Context *c );
+ void flush();
+ void trim(Context *c);
+ void _trimmed(LogEvent *le);
};
#endif
void MDS::proc_message(Message *m)
{
+
switch (m->get_type()) {
// OSD ===============
case MSG_OSD_OPREPLY:
did_heartbeat_hack = true;
}
*/
+
+
+ // flush log
+ mdlog->flush();
// finish any triggered contexts
MClientRequest *req;
MClientReply *reply;
CInode *tracei; // inode to include a trace for
- bool pinned;
LogEvent *event;
+
public:
C_MDS_CommitRequest(MDS *mds,
- MClientRequest *req, MClientReply *reply, CInode *tracei,
- LogEvent *event = 0,
- bool pinned=false) {
+ MClientRequest *req, MClientReply *reply, CInode *tracei,
+ LogEvent *event=0) {
this->mds = mds;
this->req = req;
this->tracei = tracei;
- this->pinned = pinned;
this->reply = reply;
this->event = event;
}
void finish(int r) {
- if (r == 0) {
- // success. log and reply.
- mds->commit_request(req, reply, tracei, event);
- } else {
+ if (r != 0) {
// failure. set failure code and reply.
reply->set_result(r);
+ }
+ if (event) {
+ mds->commit_request(req, reply, tracei, event);
+ } else {
+ // reply.
mds->reply_request(req, reply, tracei);
}
}
};
+
/*
* send generic response (just and error code)
*/
reply_request(req, new MClientReply(req, r), tracei);
}
+
/*
* send given reply
* include a trace to tracei
stat_ops++;
}
+
/*
* commit event(s) to the metadata journal, then reply.
* or, be sloppy and do it concurrently (see g_conf.mds_log_before_reply)
LogEvent *event,
LogEvent *event2)
{
- if (g_conf.mds_log_before_reply) {
+ // log
+ if (event) mdlog->submit_entry(event);
+ if (event2) mdlog->submit_entry(event2);
+
+ if (g_conf.mds_log_before_reply && g_conf.mds_log) {
// SAFE mode!
-
- if (event) {
- // log, then reply
- // pin inode so it doesn't go away!
- if (tracei) mdcache->request_pin_inode(req, tracei);
-
- // pass event2 as event1 (so we chain together!)
- /*
- WARNING: by chaining back to CommitRequest we may get
- something not quite right if the log commit fails. what
- happens (to the whole system!) then? ** FIXME **
- */
- dout(10) << "commit_request submitting log entry" << endl;
- mdlog->submit_entry(event,
- new C_MDS_CommitRequest(this, req, reply, tracei, event2, true)); // inode is pinned
- }
- else {
- // just reply, no log entry (anymore).
- reply_request(req, reply, tracei);
- }
- } else {
- // SLOPPY mode!
-
- // log
- if (event) mdlog->submit_entry(event, NULL);
- if (event2) mdlog->submit_entry(event2, NULL);
+ // pin inode so it doesn't go away!
+ if (tracei) mdcache->request_pin_inode(req, tracei);
- // reply
+ // wait for log sync
+ mdlog->wait_for_sync(new C_MDS_CommitRequest(this, req, reply, tracei));
+ return;
+ }
+ else {
+ // just reply
reply_request(req, reply, tracei);
}
}
--- /dev/null
+#ifndef __EALLOC_H
+#define __EALLOC_H
+
+#include <assert.h>
+#include "include/config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+#include "../IdAllocator.h"
+
+#define EALLOC_EV_ALLOC 1
+#define EALLOC_EV_FREE 2
+
+class EAlloc : public LogEvent {
+ protected:
+ int type;
+ idno_t id;
+ int what;
+
+ public:
+ EAlloc(int type, idno_t id, int what) :
+ LogEvent(EVENT_ALLOC) {
+ this->type = type;
+ this->id = id;
+ this->what = what;
+ }
+ EAlloc() :
+ LogEvent(EVENT_ALLOC) {
+ }
+
+ virtual void encode_payload(bufferlist& bl) {
+ bl.append((char*)&type, sizeof(type));
+ bl.append((char*)&id, sizeof(id));
+ bl.append((char*)&what, sizeof(what));
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(type), (char*)&type);
+ off += sizeof(type);
+ bl.copy(off, sizeof(id), (char*)&id);
+ off += sizeof(id);
+ bl.copy(off, sizeof(what), (char*)&what);
+ off += sizeof(what);
+ }
+
+
+ virtual bool obsolete(MDS *mds) {
+ if (mds->idalloc->is_dirty(type,id))
+ return false; // still dirty
+ else
+ return true; // already flushed
+ }
+
+ virtual void retire(MDS *mds, Context *c) {
+ mds->idalloc->save(c);
+ }
+
+};
+
+#endif
this->inode = in->inode;
version = in->get_version();
}
- EInodeUpdate(crope s) :
+ EInodeUpdate() :
LogEvent(EVENT_INODEUPDATE) {
- s.copy(0, sizeof(version), (char*)&version);
- s.copy(sizeof(version), sizeof(inode), (char*)&inode);
}
- virtual crope get_payload() {
- crope r;
- r.append((char*)&version, sizeof(version));
- r.append((char*)&inode, sizeof(inode));
- return r;
+ virtual void encode_payload(bufferlist& bl) {
+ bl.append((char*)&version, sizeof(version));
+ bl.append((char*)&inode, sizeof(inode));
}
+ void decode_payload(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(version), (char*)&version);
+ off += sizeof(version);
+ bl.copy(off, sizeof(inode), (char*)&inode);
+ off += sizeof(inode);
+ }
+
virtual bool obsolete(MDS *mds) {
// am i obsolete?
LogEvent(EVENT_STRING) {
event = e;
}
- EString(crope s) :
+ EString() :
LogEvent(EVENT_STRING) {
- event = s.c_str();
+ }
+
+ void decode_payload(bufferlist& bl, int& off) {
+ event = bl.c_str() + off;
+ off += event.length() + 1;
}
- // note: LogEvent owns serialized buffer
- virtual crope get_payload() {
- return crope(event.c_str());
+ void encode_payload(bufferlist& bl) {
+ bl.append(event.c_str(), event.length()+1);
}
};
this->dname = dn->get_name();
this->version = dir->get_version();
}
- EUnlink(crope s) :
+ EUnlink() :
LogEvent(EVENT_UNLINK) {
- s.copy(0, sizeof(dir_ino), (char*)&dir_ino);
- s.copy(sizeof(dir_ino), sizeof(version), (char*)&version);
- dname = s.c_str() + sizeof(dir_ino) + sizeof(version);
}
- virtual crope get_payload() {
- crope r;
- r.append((char*)&dir_ino, sizeof(dir_ino));
- r.append((char*)&version, sizeof(version));
- r.append((char*)dname.c_str(), dname.length() + 1);
- return r;
+ virtual void encode_payload(bufferlist& bl) {
+ bl.append((char*)&dir_ino, sizeof(dir_ino));
+ bl.append((char*)&version, sizeof(version));
+ bl.append((char*)dname.c_str(), dname.length() + 1);
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(dir_ino), (char*)&dir_ino);
+ off += sizeof(dir_ino);
+ bl.copy(off, sizeof(version), (char*)&version);
+ off += sizeof(version);
+ dname = bl.c_str() + off;
+ off += dname.length() + 1;
}
virtual bool obsolete(MDS *mds) {