From: sage Date: Tue, 14 Jun 2005 01:56:34 +0000 (+0000) Subject: rewrote mds log X-Git-Tag: v0.1~2073 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cd9c673d79277ba472b51a9e17c16aae3b65230b;p=ceph.git rewrote mds log git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@308 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/config.cc b/ceph/config.cc index c3fa8e4a46f..ab98f9a891a 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -25,6 +25,8 @@ md_config_t g_conf = { fakemessenger_serialize: true, debug: 15, + debug_mds_balancer: 1, + debug_mds_log: 1, // --- client --- client_cache_size: 400, @@ -38,14 +40,14 @@ md_config_t g_conf = { mds_log: true, mds_log_max_len: 10000,//MDS_CACHE_SIZE / 3, - mds_log_max_trimming: 16, + mds_log_max_trimming: 32, mds_log_read_inc: 65536, mds_log_before_reply: true, mds_log_flush_on_shutdown: true, mds_bal_replicate_threshold: 500, mds_bal_unreplicate_threshold: 200, - mds_bal_interval: 200, + mds_bal_interval: 500, mds_commit_on_shutdown: true, @@ -114,6 +116,10 @@ void parse_config_options(int argc, char **argv, else if (strcmp(argv[i], "--debug") == 0) g_conf.debug = atoi(argv[++i]); + else if (strcmp(argv[i], "--debug_mds_balancer") == 0) + g_conf.debug_mds_balancer = atoi(argv[++i]); + else if (strcmp(argv[i], "--debug_mds_log") == 0) + g_conf.debug_mds_log = atoi(argv[++i]); else if (strcmp(argv[i], "--log") == 0) g_conf.log_name = argv[++i]; @@ -126,6 +132,8 @@ void parse_config_options(int argc, char **argv, g_conf.mds_log_before_reply = atoi(argv[++i]); else if (strcmp(argv[i], "--mds_log_max_len") == 0) g_conf.mds_log_max_len = atoi(argv[++i]); + else if (strcmp(argv[i], "--mds_log_read_inc") == 0) + g_conf.mds_log_read_inc = atoi(argv[++i]); else if (strcmp(argv[i], "--mds_log_max_trimming") == 0) g_conf.mds_log_max_trimming = atoi(argv[++i]); else if (strcmp(argv[i], "--mds_commit_on_shutdown") == 0) diff --git a/ceph/config.h b/ceph/config.h index 3f4cbdb213f..2715abfb82e 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -15,8 +15,9 @@ struct md_config_t { bool fake_clock; bool fakemessenger_serialize; - int debug; - + int debug; + int debug_mds_balancer; + int debug_mds_log; // client int client_cache_size; diff --git a/ceph/fakesyn.cc b/ceph/fakesyn.cc index 03592a3fcff..e7e05094ac8 100644 --- a/ceph/fakesyn.cc +++ b/ceph/fakesyn.cc @@ -144,7 +144,7 @@ int main(int oargc, char **oargv) { cout << "mounting" << endl; client[i]->mount(mkfs); - cout << "starting synthatic client " << endl; + cout << "starting synthetic client " << endl; syn[i] = new SyntheticClient(client[i]); char s[20]; diff --git a/ceph/include/buffer.h b/ceph/include/buffer.h index cabfa9baf44..f2a3f902855 100644 --- a/ceph/include/buffer.h +++ b/ceph/include/buffer.h @@ -177,11 +177,14 @@ class bufferptr { int length() { return _len; } + int offset() { + return _off; + } // modifiers void set_offset(int off) { - assert(off <= _len); + assert(off <= _buffer->_alloc_len); _off = off; } void set_length(int len) { @@ -201,11 +204,16 @@ class bufferptr { _buffer->_len += len; _len += len; } - void copy(int off, int len, char *dest) { + void copy_out(int off, int len, char *dest) { assert(off >= 0 && off <= _len); assert(len >= 0 && off + len <= _len); memcpy(dest, c_str() + off, len); } + void copy_in(int off, int len, char *src) { + assert(off >= 0 && off <= _len); + assert(len >= 0 && off + len <= _len); + memcpy(c_str() + off, src, len); + } friend ostream& operator<<(ostream& out, bufferptr& bp); }; @@ -213,8 +221,8 @@ class bufferptr { inline ostream& operator<<(ostream& out, bufferptr& bp) { return out << "bufferptr(len=" << bp._len << ", off=" << bp._off - << ", " << bp.c_str() - //<< ", " << *bp._buffer + //<< ", " << bp.c_str() + << ", " << *bp._buffer << ")"; } diff --git a/ceph/include/bufferlist.h b/ceph/include/bufferlist.h index 35512395968..1af1eebf2f3 100644 --- a/ceph/include/bufferlist.h +++ b/ceph/include/bufferlist.h @@ -99,13 +99,13 @@ class bufferlist { while (len > 0) { // is the rest ALL in this buffer? if (off + len <= (*curbuf).length()) { - (*curbuf).copy(off, len, dest); // yup, last bit! + (*curbuf).copy_out(off, len, dest); // yup, last bit! break; } // get as much as we can from this buffer. int howmuch = (*curbuf).length() - off; - (*curbuf).copy(off, howmuch, dest); + (*curbuf).copy_out(off, howmuch, dest); dest += howmuch; len -= howmuch; @@ -114,6 +114,48 @@ class bufferlist { assert(curbuf != _buffers.end()); } } + + void copy_in(int off, int len, char *src) { + assert(off >= 0); + assert(off + len <= length()); + + // advance to off + list::iterator curbuf = _buffers.begin(); + + // skip off + while (off > 0) { + assert(curbuf != _buffers.end()); + if (off >= (*curbuf).length()) { + // skip this buffer + off -= (*curbuf).length(); + curbuf++; + } else { + // somewhere in this buffer! + break; + } + } + + // copy + while (len > 0) { + // is the rest ALL in this buffer? + if (off + len <= (*curbuf).length()) { + (*curbuf).copy_in(off, len, src); // yup, last bit! + break; + } + + // get as much as we can from this buffer. + int howmuch = (*curbuf).length() - off; + (*curbuf).copy_in(off, howmuch, src); + + src += howmuch; + len -= howmuch; + off = 0; + curbuf++; + assert(curbuf != _buffers.end()); + } + } + + void append(const char *data, int len) { if (len == 0) return; @@ -228,7 +270,7 @@ class bufferlist { //cout << "keeping end of " << *curbuf << ", losing first " << off+len << endl; if (claim_by) claim_by->append( *curbuf, len, off ); - (*curbuf).set_offset( off + len ); // ignore beginning big + (*curbuf).set_offset( off + len + (*curbuf).offset() ); // ignore beginning big (*curbuf).set_length( (*curbuf).length() - len - off ); //cout << " now " << *curbuf << endl; break; diff --git a/ceph/mds/IdAllocator.cc b/ceph/mds/IdAllocator.cc index 3eea14a1bc9..c609cb62646 100644 --- a/ceph/mds/IdAllocator.cc +++ b/ceph/mds/IdAllocator.cc @@ -3,6 +3,9 @@ #include "IdAllocator.h" #include "MDS.h" +#include "MDLog.h" +#include "events/EAlloc.h" + #include "osd/Filer.h" #include "osd/OSDCluster.h" @@ -23,8 +26,10 @@ idno_t IdAllocator::get_id(int type) idno_t id = free[type].first(); free[type].erase(id); dout(DBLEVEL) << "idalloc " << this << ": getid type " << type << " is " << id << endl; - //free[type].dump(); - //save(); + + mds->mdlog->submit_entry(new EAlloc(type, id, EALLOC_EV_ALLOC)); + dirty[type].insert(id); + return id; } @@ -35,14 +40,16 @@ void IdAllocator::reclaim_id(int type, idno_t id) dout(DBLEVEL) << "idalloc " << this << ": reclaim type " << type << " id " << id << endl; free[type].insert(id); //free[type].dump(); - //save(); + + mds->mdlog->submit_entry(new EAlloc(type, id, EALLOC_EV_FREE)); + dirty[type].insert(id); } -void IdAllocator::save() +void IdAllocator::save(Context *onfinish) { crope data; @@ -73,6 +80,9 @@ void IdAllocator::save() assert(mapsize == 0); } + // reset dirty list .. FIXME this is optimistic, i'm assuming the write succeeds. + dirty.clear(); + // turn into bufferlist bufferlist bl; bl.append(data.c_str(), data.length()); @@ -83,7 +93,7 @@ void IdAllocator::save() 0, bl, 0, - NULL); + onfinish); } diff --git a/ceph/mds/IdAllocator.h b/ceph/mds/IdAllocator.h index 93f1e587e43..15dadd56953 100644 --- a/ceph/mds/IdAllocator.h +++ b/ceph/mds/IdAllocator.h @@ -16,8 +16,9 @@ typedef __uint64_t idno_t; class IdAllocator { MDS *mds; - map< int, rangeset > free; // type -> rangeset - + map< int, rangeset > free; // type -> rangeset + map< int, set > dirty; // dirty ids + bool opened, opening; public: @@ -36,13 +37,17 @@ class IdAllocator { bool is_open() { return opened; } bool is_opening() { return opening; } + bool is_dirty(int type, idno_t id) { + return dirty[type].count(id) ? true:false; + } + void reset(); + void save(Context *onfinish=0); void shutdown() { - if (is_open()) save(); + if (is_open()) save(0); } - void save(); void load(Context *onfinish); void load_2(int, bufferlist&, Context *onfinish); diff --git a/ceph/mds/LogEvent.h b/ceph/mds/LogEvent.h index 18f15e7ba4b..b2ad458e8ec 100644 --- a/ceph/mds/LogEvent.h +++ b/ceph/mds/LogEvent.h @@ -10,6 +10,7 @@ using namespace std; #define EVENT_STRING 1 #define EVENT_INODEUPDATE 2 #define EVENT_UNLINK 3 +#define EVENT_ALLOC 4 // generic log event class LogEvent { @@ -23,21 +24,22 @@ class LogEvent { int get_type() { return type; } - virtual crope get_payload() = 0; // children overload this - - crope get_serialized() { - crope s; + virtual void encode_payload(bufferlist& bl) = 0; + virtual void decode_payload(bufferlist& bl, int& off) = 0; + void encode(bufferlist& bl) { // type - __uint32_t ptype = type; - s.append((char*)&ptype, sizeof(ptype)); - - // len+payload - crope payload = get_payload(); - __uint32_t plen = payload.length(); - s.append((char*)&plen, sizeof(plen)); - s.append(payload); - return s; + bl.append((char*)&type, sizeof(type)); + + // len placeholder + int len = 0; // we don't know just yet... + int off = bl.length(); + bl.append((char*)&type, sizeof(len)); + + // payload + encode_payload(bl); + len = bl.length() - off - sizeof(len); + bl.copy_in(off, sizeof(len), (char*)&len); } virtual bool obsolete(MDS *m) { diff --git a/ceph/mds/LogStream.cc b/ceph/mds/LogStream.cc index e24016cf5d8..306597b92f4 100644 --- a/ceph/mds/LogStream.cc +++ b/ceph/mds/LogStream.cc @@ -8,182 +8,220 @@ #include "events/EString.h" #include "events/EInodeUpdate.h" #include "events/EUnlink.h" +#include "events/EAlloc.h" #include using namespace std; #include "include/config.h" #undef dout -#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".logstream " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".logstream " + +// ---------------------------- // writing -int LogStream::append(LogEvent *e, Context *c) -{ - // serialize - crope buffer = e->get_serialized(); - size_t buflen = buffer.length(); +class C_LS_Append : public Context { + LogStream *ls; + off_t off; +public: + C_LS_Append(LogStream *ls, off_t off) { + this->ls = ls; + this->off = off; + } + void finish(int r) { + ls->_append_2(off); + } +}; + - // into a bufferlist.. FIXME +off_t LogStream::append(LogEvent *e) +{ + // serialize FIXME ******** bufferlist bl; - bl.append(buffer.c_str(), buffer.length()); + e->encode(bl); + size_t elen = bl.length(); + + // append + dout(10) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl; + + off_t off = append_pos; + append_pos += elen; + dout(15) << "write buf was " << write_buf.length() << endl; + write_buf.claim_append(bl); + dout(15) << "write buf now " << write_buf.length() << endl; + + return off; +} +void LogStream::_append_2(off_t off) +{ + dout(10) << "sync_pos now " << off << endl; + sync_pos = off; + + // wake up waiters + map< off_t, list >::iterator it = waiting_for_sync.begin(); + while (it != waiting_for_sync.end()) { + if (it->first > sync_pos) break; + + // wake them up! + dout(15) << it->second.size() << " waiters at offset " << it->first << " <= " << sync_pos << endl; + for (list::iterator cit = it->second.begin(); + cit != it->second.end(); + cit++) + mds->finished_queue.push_back(*cit); + + // continue + waiting_for_sync.erase(it++); + } +} - dout(10) << "append event type " << e->get_type() << " size " << buflen << " at log offset " << append_pos << endl; +void LogStream::wait_for_sync(Context *c, off_t off) +{ + if (off == 0) off = append_pos; + assert(off > sync_pos); - // advance ptr for later - append_pos += buffer.length(); + dout(15) << "sync " << c << " waiting for " << off << " (sync_pos currently " << sync_pos << ")" << endl; + waiting_for_sync[off].push_back(c); - // submit write - mds->filer->write(log_ino, // FIXME - buflen, append_pos-buflen, - bl, - 0, - c); - return 0; + // initiate flush now? (since we have a waiter...) + if (autoflush) flush(); +} + +void LogStream::flush() +{ + // write to disk + if (write_buf.length()) { + dout(15) << "flush flush_pos " << flush_pos << " < append_pos " << append_pos << ", writing " << write_buf.length() << " bytes" << endl; + + assert(write_buf.length() == append_pos - flush_pos); + + mds->filer->write(log_ino, + write_buf.length(), flush_pos, + write_buf, + 0, + new C_LS_Append(this, append_pos)); + + write_buf.clear(); + flush_pos = append_pos; + } else { + dout(15) << "flush flush_pos " << flush_pos << " == append_pos " << append_pos << ", nothing to do" << endl; + } + } + + + +// ------------------------------------------------- // reading -/* -class C_LS_ReadNext : public Context { - LogStream *ls; - LogEvent **le; - Context *c; -public: - C_LS_ReadNext(LogStream *ls, LogEvent **le, Context *c) { - this->ls = ls; - this->le = le; - this->c = c; - } - void finish(int result) { - ls->read_next(le,c,2); + +LogEvent *LogStream::get_next_event() +{ + if (read_buf.length() < 2*sizeof(__uint32_t)) + return 0; + + // parse type, length + int off = 0; + __uint32_t type, length; + read_buf.copy(off, sizeof(__uint32_t), (char*)&type); + off += sizeof(__uint32_t); + read_buf.copy(off, sizeof(__uint32_t), (char*)&length); + off += sizeof(__uint32_t); + + dout(10) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl; + + assert(type > 0); + + if (read_buf.length() < off + length) + return 0; + + // create event + LogEvent *le; + switch (type) { + case EVENT_STRING: // string + le = new EString(); + break; + + case EVENT_INODEUPDATE: + le = new EInodeUpdate(); + break; + + case EVENT_UNLINK: + le = new EUnlink(); + break; + + case EVENT_ALLOC: + le = new EAlloc(); + break; + + default: + dout(1) << "uh oh, unknown event type " << type << endl; + assert(0); } -}; -*/ + + // decode + le->decode_payload(read_buf, off); + + // discard front of read_buf + read_pos += off; + read_buf.splice(0, off); + + dout(15) << "get_next_event got event, read_pos now " << read_pos << " (append_pos is " << append_pos << ")" << endl; + + // ok! + return le; +} + + class C_LS_ReadChunk : public Context { public: bufferlist bl; LogStream *ls; - LogEvent **le; - Context *c; - C_LS_ReadChunk(LogStream *ls, LogEvent **le, Context *c) { + + C_LS_ReadChunk(LogStream *ls) { this->ls = ls; - this->le = le; - this->c = c; } void finish(int result) { - crope next_bit; - next_bit.append(bl.c_str(), bl.length()); - ls->did_read_bit(next_bit, le, c); + ls->_did_read(bl); } }; -void LogStream::did_read_bit(crope& next_bit, LogEvent **le, Context *c) -{ - // add to our buffer - buffer.append(next_bit); - reading_block = false; - - // throw off beginning part - if (buffer.length() > g_conf.mds_log_read_inc*2) { - int trim = buffer.length() - g_conf.mds_log_read_inc*2; - buf_start += trim; - buffer = buffer.substr(trim, buffer.length() - trim); - dout(10) << "did_read_bit adjusting buf_start now +" << trim << " = " << buf_start << " len " << buffer.length() << endl; - } - - // continue at step 2 - read_next(le, c, 2); -} -int LogStream::read_next(LogEvent **le, Context *c, int step) +void LogStream::wait_for_next_event(Context *c) { - if (step == 1) { - // does buffer have what we want? - //if (buf_start > cur_pos || - //buf_start+buffer.length() < cur_pos+4) { - if (buf_start+buffer.length() < cur_pos+ g_conf.mds_log_read_inc/2) { - - // make sure block is being read - if (reading_block) { - dout(10) << "read_next already reading log head from disk, offset " << cur_pos << endl; - assert(0); - } else { - off_t start = buf_start+buffer.length(); - dout(10) << "read_next reading log head from disk, offset " << start << " len " << g_conf.mds_log_read_inc << endl; - // nope. read a chunk - C_LS_ReadChunk *readc = new C_LS_ReadChunk(this, le, c); - reading_block = true; - mds->filer->read(log_ino, // FIXME - g_conf.mds_log_read_inc, start, - &readc->bl, - readc); - } - return 0; - } - step = 2; + // add waiter + if (c) waiting_for_read.push_back(c); + + // issue read + off_t tail = read_pos + read_buf.length(); + size_t size = g_conf.mds_log_read_inc; + if (tail + size > sync_pos) { + size = sync_pos - tail; + assert(size > 0); // bleh, wait for sync, etc. } + dout(15) << "wait_for_next_event reading from pos " << tail << " len " << size << endl; + C_LS_ReadChunk *readc = new C_LS_ReadChunk(this); + mds->filer->read(log_ino, + g_conf.mds_log_read_inc, tail, + &readc->bl, + readc); +} - if (step == 2) { - reading_block = false; - - // decode event - unsigned off = cur_pos-buf_start; - __uint32_t type, length; - buffer.copy(off, sizeof(__uint32_t), (char*)&type); - buffer.copy(off+sizeof(__uint32_t), sizeof(__uint32_t), (char*)&length); - off += sizeof(type) + sizeof(length); - - dout(10) << "read_next got event type " << type << " size " << length << " at log offset " << cur_pos << endl; - cur_pos += sizeof(type) + sizeof(length) + length; - - switch (type) { - - case EVENT_STRING: // string - *le = new EString(buffer.substr(off,length)); - break; - - case EVENT_INODEUPDATE: - *le = new EInodeUpdate(buffer.substr(off,length)); - break; - - case EVENT_UNLINK: - *le = new EUnlink(buffer.substr(off,length)); - break; - +void LogStream::_did_read(bufferlist& blist) +{ + dout(15) << "_did_read got " << blist.length() << " bytes" << endl; + read_buf.claim_append(blist); - default: - dout(1) << "uh oh, unknown event type " << type << endl; - assert(0); - } - - // finish - if (c) { - c->finish(0); - delete c; - } - - /* - // any other waiters too! - list finished = waiting_for_read_block; - waiting_for_read_block.clear(); - for (list::iterator it = finished.begin(); - it != finished.end(); - it++) { - Context *c = *it; - if (c) { - c->finish(0); - delete c; - } - } - */ - - } + list finished; + finished.splice(finished.begin(), waiting_for_read); + finish_contexts(finished, 0); } + diff --git a/ceph/mds/LogStream.h b/ceph/mds/LogStream.h index 70881a05deb..2aecc9dbe2a 100644 --- a/ceph/mds/LogStream.h +++ b/ceph/mds/LogStream.h @@ -4,47 +4,77 @@ #include "include/types.h" #include "include/Context.h" -#include -using namespace __gnu_cxx; +#include "include/buffer.h" +#include "include/bufferlist.h" + +#include +#include class LogEvent; +class Filer; class MDS; class LogStream { protected: MDS *mds; - off_t cur_pos, append_pos; + Filer *filer; + inodeno_t log_ino; - object_t oid; - bool reading_block; - //list waiting_for_read_block; + // writing + off_t sync_pos; // first non-written byte + off_t flush_pos; // first non-writing byte, beginning of write_buf + off_t append_pos; // where next event will be written + bufferlist write_buf; // unwritten (between flush_pos and append_pos) + + // reading + off_t read_pos; // abs position in file + //off_t read_buf_start; // where read buf begins + bufferlist read_buf; + bool reading; + + std::map< off_t, std::list > waiting_for_sync; + std::list waiting_for_read; + - crope buffer; - off_t buf_start; + bool autoflush; + public: - LogStream(MDS *mds, inodeno_t log_ino) { + LogStream(MDS *mds, Filer *filer, inodeno_t log_ino) { this->mds = mds; + this->filer = filer; this->log_ino = log_ino; - cur_pos = 0; - append_pos = 0; // fixme - buf_start = 0; - reading_block = false; - } - off_t seek(off_t offset) { - cur_pos = offset; - } + // wr + sync_pos = flush_pos = append_pos = 0; + autoflush = true; - off_t get_pos() { - return cur_pos; + // rd + read_pos = 0; + reading = false; } + // write (append to end) + off_t append(LogEvent *e); // returns offset it will be written to + void _append_2(off_t off); + void wait_for_sync(Context *c, off_t off=0); // wait for flush + void flush(); // initiate flush + + // read (from front) + //bool has_next_event(); + LogEvent *get_next_event(); + void wait_for_next_event(Context *c); + void _did_read(bufferlist& blist); + + + // old interface + /* // WARNING: non-reentrant; single reader only int read_next(LogEvent **le, Context *c, int step=1); void did_read_bit(crope& next_bit, LogEvent **le, Context *c) ; int append(LogEvent *e, Context *c); // append at cur_pos, mind you! + */ }; #endif diff --git a/ceph/mds/MDBalancer.cc b/ceph/mds/MDBalancer.cc index 1733bccf00c..24df995b4c9 100644 --- a/ceph/mds/MDBalancer.cc +++ b/ceph/mds/MDBalancer.cc @@ -17,7 +17,7 @@ using namespace std; #include "include/config.h" #undef dout -#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".bal " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << "mds" << mds->get_nodeid() << ".bal " #define MIN_LOAD 50 // ?? #define MIN_REEXPORT 5 // will automatically reexport @@ -101,7 +101,7 @@ void MDBalancer::send_heartbeat() void MDBalancer::handle_heartbeat(MHeartbeat *m) { - dout(5) << " got heartbeat " << m->get_beat() << " from " << m->get_source() << " " << m->get_load() << endl; + dout(5) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source() << " " << m->get_load() << endl; if (!mds->mdcache->get_root()) { dout(10) << "no root on handle" << endl; @@ -114,7 +114,7 @@ void MDBalancer::handle_heartbeat(MHeartbeat *m) beat_epoch = m->get_beat(); send_heartbeat(); - mds->mdcache->show_imports(); + show_imports(); } mds_load[ m->get_source() ] = m->get_load(); @@ -173,7 +173,7 @@ void MDBalancer::do_rebalance() if (my_load < target_load.root_pop) { dout(5) << " i am underloaded, doing nothing." << endl; - mds->mdcache->show_imports(); + show_imports(); return; } @@ -241,7 +241,7 @@ void MDBalancer::do_rebalance() dout(5) << " sending " << amount << " to " << target << endl; - mds->mdcache->show_imports(); + show_imports(); // search imports from target if (import_from_map.count(target)) { @@ -310,7 +310,7 @@ void MDBalancer::do_rebalance() } dout(5) << "rebalance done" << endl; - mds->mdcache->show_imports(); + show_imports(); } @@ -511,6 +511,64 @@ void MDBalancer::add_import(CDir *dir) + + + +void MDBalancer::show_imports(bool external) +{ + int db = 7; //debug level + + if (mds->mdcache->imports.size() == 0) { + dout(db) << "no imports/exports" << endl; + return; + } + dout(db) << "imports/exports:" << endl; + + set ecopy = mds->mdcache->exports; + + timepair_t now = g_clock.gettimepair(); + + for (set::iterator it = mds->mdcache->imports.begin(); + it != mds->mdcache->imports.end(); + it++) { + CDir *im = *it; + dout(db) << " + import (" << im->popularity[MDS_POP_CURDOM].get(now) << "/" << im->popularity[MDS_POP_ANYDOM].get(now) << ") " << *im << endl; + assert( im->is_import() ); + assert( im->is_auth() ); + + for (set::iterator p = mds->mdcache->nested_exports[im].begin(); + p != mds->mdcache->nested_exports[im].end(); + p++) { + CDir *exp = *p; + dout(db) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl; + assert( exp->is_export() ); + assert( !exp->is_auth() ); + + if ( mds->mdcache->get_containing_import(exp) != im ) { + dout(1) << "uh oh, containing import is " << mds->mdcache->get_containing_import(exp) << endl; + dout(1) << "uh oh, containing import is " << *mds->mdcache->get_containing_import(exp) << endl; + assert( mds->mdcache->get_containing_import(exp) == im ); + } + + if (ecopy.count(exp) != 1) { + dout(1) << " nested_export " << *exp << " not in exports" << endl; + assert(0); + } + ecopy.erase(exp); + } + } + + if (ecopy.size()) { + for (set::iterator it = ecopy.begin(); + it != ecopy.end(); + it++) + dout(1) << " stray item in exports: " << **it << endl; + assert(ecopy.size() == 0); + } +} + + + /* replicate? float dir_pop = dir->get_popularity(); diff --git a/ceph/mds/MDBalancer.h b/ceph/mds/MDBalancer.h index c5e2e3405e9..4e1022d8e15 100644 --- a/ceph/mds/MDBalancer.h +++ b/ceph/mds/MDBalancer.h @@ -53,6 +53,7 @@ class MDBalancer { void hit_recursive(class CDir *dir, timepair_t& now); + void show_imports(bool external=false); }; diff --git a/ceph/mds/MDCache.cc b/ceph/mds/MDCache.cc index 660316ada06..eca7000c7cd 100644 --- a/ceph/mds/MDCache.cc +++ b/ceph/mds/MDCache.cc @@ -1465,6 +1465,13 @@ void MDCache::request_cleanup(Message *req) // remove from map active_requests.erase(req); + + + // log some stats ***** + mds->logger->set("c", lru.lru_get_size()); + mds->logger->set("cmax", lru.lru_get_max()); + + } void MDCache::request_finish(Message *req) @@ -1473,7 +1480,7 @@ void MDCache::request_finish(Message *req) request_cleanup(req); delete req; // delete req - mds->logger->inc("rep"); + mds->logger->inc("reply"); //dump(); } @@ -7224,55 +7231,7 @@ void MDCache::handle_unhash_dir_finish(CDir *dir, int auth) void MDCache::show_imports() { - if (imports.size() == 0) { - dout(7) << "no imports/exports" << endl; - return; - } - dout(7) << "imports/exports:" << endl; - - set ecopy = exports; - - timepair_t now = g_clock.gettimepair(); - - for (set::iterator it = imports.begin(); - it != imports.end(); - it++) { - CDir *im = *it; - dout(7) << " + import (" << im->popularity[MDS_POP_CURDOM].get(now) << "/" << im->popularity[MDS_POP_ANYDOM].get(now) << ") " << *im << endl; - assert( im->is_import() ); - assert( im->is_auth() ); - - for (set::iterator p = nested_exports[im].begin(); - p != nested_exports[im].end(); - p++) { - CDir *exp = *p; - dout(7) << " - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl; - assert( exp->is_export() ); - assert( !exp->is_auth() ); - - if ( get_containing_import(exp) != im ) { - dout(7) << "uh oh, containing import is " << get_containing_import(exp) << endl; - dout(7) << "uh oh, containing import is " << *get_containing_import(exp) << endl; - assert( get_containing_import(exp) == im ); - } - - if (ecopy.count(exp) != 1) { - dout(7) << " nested_export " << *exp << " not in exports" << endl; - assert(0); - } - ecopy.erase(exp); - } - } - - if (ecopy.size()) { - for (set::iterator it = ecopy.begin(); - it != ecopy.end(); - it++) - dout(7) << " stray item in exports: " << **it << endl; - assert(ecopy.size() == 0); - } - - + //mds->balancer->show_imports(true); } diff --git a/ceph/mds/MDLog.cc b/ceph/mds/MDLog.cc index 4bbf1e05c08..2cf8c67716d 100644 --- a/ceph/mds/MDLog.cc +++ b/ceph/mds/MDLog.cc @@ -14,7 +14,7 @@ LogType mdlog_logtype; #include "include/config.h" #undef dout -#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".log " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".log " // cons/des @@ -23,203 +23,159 @@ MDLog::MDLog(MDS *m) mds = m; num_events = 0; max_events = 0; - trim_reading = false; - reader = new LogStream(mds, - 100 + mds->get_nodeid()); - writer = new LogStream(mds, - 100 + mds->get_nodeid()); - - string name; - name = "mds"; - int w = mds->get_nodeid(); - if (w >= 1000) name += ('0' + ((w/1000)%10)); - if (w >= 100) name += ('0' + ((w/100)%10)); - if (w >= 10) name += ('0' + ((w/10)%10)); - name += ('0' + ((w/1)%10)); - name += ".log"; + + logstream = new LogStream(mds, mds->filer, MDS_INO_LOG_OFFSET + mds->get_nodeid()); + + char name[80]; + sprintf(name, "mds%d.log", mds->get_nodeid()); logger = new Logger(name, (LogType*)&mdlog_logtype); } MDLog::~MDLog() { - if (reader) { delete reader; reader = 0; } - if (writer) { delete writer; writer = 0; } + if (logstream) { delete logstream; logstream = 0; } if (logger) { delete logger; logger = 0; } } -class C_MDL_SubmitEntry : public Context { -protected: - MDLog *mdl; -public: - Context *c; - LogEvent *le; - - C_MDL_SubmitEntry(MDLog *m, LogEvent *e, Context *c) { - mdl = m; - le = e; - this->c = c; - } - void finish(int res) { - mdl->submit_entry_2(le,c); - } -}; int MDLog::submit_entry( LogEvent *e, Context *c ) { - if (!g_conf.mds_log) { + dout(5) << "submit_entry at " << num_events << endl; + + if (g_conf.mds_log) { + off_t off = logstream->append(e); + delete e; + num_events++; + + logger->inc("add"); + logger->set("size", num_events); + + if (c) + logstream->wait_for_sync(c, off); + } else { + // hack: log is disabled.. if (c) { c->finish(0); delete c; } - return 0; } - - dout(5) << "submit_entry" << endl; - - // write it - writer->append(e, new C_MDL_SubmitEntry(this, e, c)); - logger->inc("add"); + } -int MDLog::submit_entry_2( LogEvent *e, - Context *c ) +void MDLog::wait_for_sync( Context *c ) { - dout(5) << "submit_entry done, log size " << num_events << endl; - - // written! - num_events++; - delete e; - logger->set("len", num_events); - - if (c) { + if (g_conf.mds_log) { + // wait + logstream->wait_for_sync(c); + } else { + // hack: bypass c->finish(0); delete c; } - +} + +void MDLog::flush() +{ + logstream->flush(); + // trim - trim(NULL); // FIXME probably not every time? + trim(NULL); } -class C_MDL_Trim : public Context { -protected: - MDLog *mdl; + + + +// trim + +class C_MDL_Trimmed : public Context { public: + MDLog *mdl; LogEvent *le; - int step; - C_MDL_Trim(MDLog *m, LogEvent *e = 0, int s=2) { + C_MDL_Trimmed(MDLog *mdl, LogEvent *le) { + this->mdl = mdl; + this->le = le; + } + void finish(int res) { + mdl->_trimmed(le); + } +}; + +class C_MDL_Reading : public Context { +public: + MDLog *mdl; + C_MDL_Reading(MDLog *m) { mdl = m; - step = s; le = e; } void finish(int res) { - if (step == 2) - mdl->trim_2_didread(le); - else if (step == 3) - mdl->trim_3_didretire(le); + mdl->waiting_for_read = false; + mdl->trim(0); } }; -int MDLog::trim(Context *c) +void MDLog::trim(Context *c) { - if (num_events - trimming.size() > max_events) { - dout(5) << "trimming. num_events " << num_events << ", trimming " << trimming.size() << " max " << max_events << endl; + // add waiter + if (c) trim_waiters.push_back(c); - // add this waiter - if (c) trim_waiters.push_back(c); + // trim! + while (num_events - trimming.size() > max_events) { + dout(5) << "trim: num_events " << num_events << " - trimming " << trimming.size() << " > max " << max_events << endl; - trim_readnext(); // read next event off end of log. - return 0; - } - - // no trimming to be done. - if (c) { - c->finish(0); - delete c; - } -} - -void MDLog::trim_readnext() -{ - if (trim_reading) { - //dout(10) << "trim_readnext already reading." << endl; - return; + LogEvent *le = logstream->get_next_event(); + + if (le) { + // we just read an event. + if (le->obsolete(mds) == true) { + // obsolete + dout(7) << " obsolete " << le << endl; + delete le; + num_events--; + logger->inc("obs"); + } else { + if (trimming.size() < g_conf.mds_log_max_trimming) { + // trim! + dout(7) << " trimming " << le << endl; + trimming.insert(le); + le->retire(mds, new C_MDL_Trimmed(this, le)); + logger->inc("retire"); + } else { + dout(7) << " already trimming max, waiting" << endl; + return; + } + } + } else { + // need to read! + if (!waiting_for_read) { + waiting_for_read = true; + dout(7) << " waiting for read" << endl; + logstream->wait_for_next_event(new C_MDL_Reading(this)); + } else { + dout(7) << " already waiting for read" << endl; + } + return; + } } - - dout(10) << "trim_readnext" << endl; - trim_reading = true; - C_MDL_Trim *readfin = new C_MDL_Trim(this); - reader->read_next(&readfin->le, readfin); - logger->inc("read"); + + // done! + list finished; + finished.splice(finished.begin(), trim_waiters); + + finish_contexts(finished, 0); } - -// trim_2 : just read an event -int MDLog::trim_2_didread(LogEvent *le) +void MDLog::_trimmed(LogEvent *le) { - dout(10) << "trim_2_didread " << le << endl; - - trim_reading = false; + dout(7) << " trimmed " << le << endl; + trimming.erase(le); + delete le; + num_events--; - // we just read an event. - if (le->obsolete(mds) == true) { - trim_3_didretire(le); // we can discard this event and be done. - logger->inc("obs"); - } else { - dout(10) << "retire " << le << " "; - trimming.push_back(le); // add to limbo list - le->retire(mds, new C_MDL_Trim(this, le, 3)); // retire entry - logger->inc("retire"); - } - - // read another event? FIXME: max_trimming maybe? would need to restructure this again. - if (num_events - trimming.size() > max_events && - trimming.size() < g_conf.mds_log_max_trimming) { - trim_readnext(); - } + trim(0); } -int MDLog::trim_3_didretire(LogEvent *le) -{ - //dout(10) << "trim_2_didretire " << le << endl; - - // done with this le. - if (le) { - num_events--; - trimming.remove(le); - delete le; - } - - // read more? - if (trim_reading == false && - num_events - trimming.size() > max_events) { - trim_readnext(); - } - - // last one? - if (trimming.size() == 0 && // none mid-retire, - trim_reading == false) { // and not mid-read - - dout(5) << "retired " << le << ", trim done, log size now " << num_events << endl; - - // we're done. - list finished = trim_waiters; - trim_waiters.clear(); - - list::iterator it = finished.begin(); - while (it != finished.end()) { - Context *c = *it; - if (c) { - c->finish(0); - delete c; - } - } - } else { - dout(5) << "retired " << le << ", still trimming, log size now " << num_events << endl; - } -} - diff --git a/ceph/mds/MDLog.h b/ceph/mds/MDLog.h index 486e08e7424..6420422954c 100644 --- a/ceph/mds/MDLog.h +++ b/ceph/mds/MDLog.h @@ -39,13 +39,15 @@ class MDLog { size_t num_events; // in events size_t max_events; - LogStream *reader; - LogStream *writer; + LogStream *logstream; - list trimming; // events currently being trimmed + set trimming; // events currently being trimmed list trim_waiters; // contexts waiting for trim bool trim_reading; + bool waiting_for_read; + friend class C_MDL_Reading; + Logger *logger; public: @@ -63,16 +65,12 @@ class MDLog { } int submit_entry( LogEvent *e, - Context *c ); - int submit_entry_2( LogEvent *e, - Context *c ); - - int trim(Context *c); // want to trim - void trim_readnext(); // read next event - int trim_2_didread(LogEvent *e); // read log event - int trim_3_didretire(LogEvent *e); // finished retiring log event - + Context *c = 0 ); + void wait_for_sync( Context *c ); + void flush(); + void trim(Context *c); + void _trimmed(LogEvent *le); }; #endif diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index b3a0b7716cc..faba9facc81 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -258,6 +258,7 @@ public: void MDS::proc_message(Message *m) { + switch (m->get_type()) { // OSD =============== case MSG_OSD_OPREPLY: @@ -387,6 +388,10 @@ void MDS::my_dispatch(Message *m) did_heartbeat_hack = true; } */ + + + // flush log + mdlog->flush(); // finish any triggered contexts @@ -534,32 +539,33 @@ class C_MDS_CommitRequest : public Context { MClientRequest *req; MClientReply *reply; CInode *tracei; // inode to include a trace for - bool pinned; LogEvent *event; + public: C_MDS_CommitRequest(MDS *mds, - MClientRequest *req, MClientReply *reply, CInode *tracei, - LogEvent *event = 0, - bool pinned=false) { + MClientRequest *req, MClientReply *reply, CInode *tracei, + LogEvent *event=0) { this->mds = mds; this->req = req; this->tracei = tracei; - this->pinned = pinned; this->reply = reply; this->event = event; } void finish(int r) { - if (r == 0) { - // success. log and reply. - mds->commit_request(req, reply, tracei, event); - } else { + if (r != 0) { // failure. set failure code and reply. reply->set_result(r); + } + if (event) { + mds->commit_request(req, reply, tracei, event); + } else { + // reply. mds->reply_request(req, reply, tracei); } } }; + /* * send generic response (just and error code) */ @@ -568,6 +574,7 @@ void MDS::reply_request(MClientRequest *req, int r, CInode *tracei) reply_request(req, new MClientReply(req, r), tracei); } + /* * send given reply * include a trace to tracei @@ -591,6 +598,7 @@ void MDS::reply_request(MClientRequest *req, MClientReply *reply, CInode *tracei stat_ops++; } + /* * commit event(s) to the metadata journal, then reply. * or, be sloppy and do it concurrently (see g_conf.mds_log_before_reply) @@ -601,37 +609,22 @@ void MDS::commit_request(MClientRequest *req, LogEvent *event, LogEvent *event2) { - if (g_conf.mds_log_before_reply) { + // log + if (event) mdlog->submit_entry(event); + if (event2) mdlog->submit_entry(event2); + + if (g_conf.mds_log_before_reply && g_conf.mds_log) { // SAFE mode! - - if (event) { - // log, then reply - // pin inode so it doesn't go away! - if (tracei) mdcache->request_pin_inode(req, tracei); - - // pass event2 as event1 (so we chain together!) - /* - WARNING: by chaining back to CommitRequest we may get - something not quite right if the log commit fails. what - happens (to the whole system!) then? ** FIXME ** - */ - dout(10) << "commit_request submitting log entry" << endl; - mdlog->submit_entry(event, - new C_MDS_CommitRequest(this, req, reply, tracei, event2, true)); // inode is pinned - } - else { - // just reply, no log entry (anymore). - reply_request(req, reply, tracei); - } - } else { - // SLOPPY mode! - - // log - if (event) mdlog->submit_entry(event, NULL); - if (event2) mdlog->submit_entry(event2, NULL); + // pin inode so it doesn't go away! + if (tracei) mdcache->request_pin_inode(req, tracei); - // reply + // wait for log sync + mdlog->wait_for_sync(new C_MDS_CommitRequest(this, req, reply, tracei)); + return; + } + else { + // just reply reply_request(req, reply, tracei); } } diff --git a/ceph/mds/events/EAlloc.h b/ceph/mds/events/EAlloc.h new file mode 100644 index 00000000000..f686204c0d4 --- /dev/null +++ b/ceph/mds/events/EAlloc.h @@ -0,0 +1,59 @@ +#ifndef __EALLOC_H +#define __EALLOC_H + +#include +#include "include/config.h" +#include "include/types.h" + +#include "../LogEvent.h" +#include "../IdAllocator.h" + +#define EALLOC_EV_ALLOC 1 +#define EALLOC_EV_FREE 2 + +class EAlloc : public LogEvent { + protected: + int type; + idno_t id; + int what; + + public: + EAlloc(int type, idno_t id, int what) : + LogEvent(EVENT_ALLOC) { + this->type = type; + this->id = id; + this->what = what; + } + EAlloc() : + LogEvent(EVENT_ALLOC) { + } + + virtual void encode_payload(bufferlist& bl) { + bl.append((char*)&type, sizeof(type)); + bl.append((char*)&id, sizeof(id)); + bl.append((char*)&what, sizeof(what)); + } + void decode_payload(bufferlist& bl, int& off) { + bl.copy(off, sizeof(type), (char*)&type); + off += sizeof(type); + bl.copy(off, sizeof(id), (char*)&id); + off += sizeof(id); + bl.copy(off, sizeof(what), (char*)&what); + off += sizeof(what); + } + + + virtual bool obsolete(MDS *mds) { + if (mds->idalloc->is_dirty(type,id)) + return false; // still dirty + else + return true; // already flushed + } + + virtual void retire(MDS *mds, Context *c) { + mds->idalloc->save(c); + } + +}; + +#endif diff --git a/ceph/mds/events/EInodeUpdate.h b/ceph/mds/events/EInodeUpdate.h index e819f7a1dec..a09f6e5cdbb 100644 --- a/ceph/mds/events/EInodeUpdate.h +++ b/ceph/mds/events/EInodeUpdate.h @@ -22,18 +22,21 @@ class EInodeUpdate : public LogEvent { this->inode = in->inode; version = in->get_version(); } - EInodeUpdate(crope s) : + EInodeUpdate() : LogEvent(EVENT_INODEUPDATE) { - s.copy(0, sizeof(version), (char*)&version); - s.copy(sizeof(version), sizeof(inode), (char*)&inode); } - virtual crope get_payload() { - crope r; - r.append((char*)&version, sizeof(version)); - r.append((char*)&inode, sizeof(inode)); - return r; + virtual void encode_payload(bufferlist& bl) { + bl.append((char*)&version, sizeof(version)); + bl.append((char*)&inode, sizeof(inode)); } + void decode_payload(bufferlist& bl, int& off) { + bl.copy(off, sizeof(version), (char*)&version); + off += sizeof(version); + bl.copy(off, sizeof(inode), (char*)&inode); + off += sizeof(inode); + } + virtual bool obsolete(MDS *mds) { // am i obsolete? diff --git a/ceph/mds/events/EString.h b/ceph/mds/events/EString.h index 3c9c2749ff7..face114bfea 100644 --- a/ceph/mds/events/EString.h +++ b/ceph/mds/events/EString.h @@ -18,14 +18,17 @@ class EString : public LogEvent { LogEvent(EVENT_STRING) { event = e; } - EString(crope s) : + EString() : LogEvent(EVENT_STRING) { - event = s.c_str(); + } + + void decode_payload(bufferlist& bl, int& off) { + event = bl.c_str() + off; + off += event.length() + 1; } - // note: LogEvent owns serialized buffer - virtual crope get_payload() { - return crope(event.c_str()); + void encode_payload(bufferlist& bl) { + bl.append(event.c_str(), event.length()+1); } }; diff --git a/ceph/mds/events/EUnlink.h b/ceph/mds/events/EUnlink.h index 7b57007b4b8..051b92cd95d 100644 --- a/ceph/mds/events/EUnlink.h +++ b/ceph/mds/events/EUnlink.h @@ -24,19 +24,22 @@ class EUnlink : public LogEvent { this->dname = dn->get_name(); this->version = dir->get_version(); } - EUnlink(crope s) : + EUnlink() : LogEvent(EVENT_UNLINK) { - s.copy(0, sizeof(dir_ino), (char*)&dir_ino); - s.copy(sizeof(dir_ino), sizeof(version), (char*)&version); - dname = s.c_str() + sizeof(dir_ino) + sizeof(version); } - virtual crope get_payload() { - crope r; - r.append((char*)&dir_ino, sizeof(dir_ino)); - r.append((char*)&version, sizeof(version)); - r.append((char*)dname.c_str(), dname.length() + 1); - return r; + virtual void encode_payload(bufferlist& bl) { + bl.append((char*)&dir_ino, sizeof(dir_ino)); + bl.append((char*)&version, sizeof(version)); + bl.append((char*)dname.c_str(), dname.length() + 1); + } + void decode_payload(bufferlist& bl, int& off) { + bl.copy(off, sizeof(dir_ino), (char*)&dir_ino); + off += sizeof(dir_ino); + bl.copy(off, sizeof(version), (char*)&version); + off += sizeof(version); + dname = bl.c_str() + off; + off += dname.length() + 1; } virtual bool obsolete(MDS *mds) {