]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rewrote mds log
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 14 Jun 2005 01:56:34 +0000 (01:56 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 14 Jun 2005 01:56:34 +0000 (01:56 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@308 29311d96-e01e-0410-9327-a35deaab8ce9

20 files changed:
ceph/config.cc
ceph/config.h
ceph/fakesyn.cc
ceph/include/buffer.h
ceph/include/bufferlist.h
ceph/mds/IdAllocator.cc
ceph/mds/IdAllocator.h
ceph/mds/LogEvent.h
ceph/mds/LogStream.cc
ceph/mds/LogStream.h
ceph/mds/MDBalancer.cc
ceph/mds/MDBalancer.h
ceph/mds/MDCache.cc
ceph/mds/MDLog.cc
ceph/mds/MDLog.h
ceph/mds/MDS.cc
ceph/mds/events/EAlloc.h [new file with mode: 0644]
ceph/mds/events/EInodeUpdate.h
ceph/mds/events/EString.h
ceph/mds/events/EUnlink.h

index c3fa8e4a46f02d2537d3ea8d111d168145b7a34d..ab98f9a891a4485406caccfc6a4c908c706e34c6 100644 (file)
@@ -25,6 +25,8 @@ md_config_t g_conf = {
   fakemessenger_serialize: true,
 
   debug: 15,
+  debug_mds_balancer: 1,
+  debug_mds_log: 1,
   
   // --- client ---
   client_cache_size: 400,
@@ -38,14 +40,14 @@ md_config_t g_conf = {
 
   mds_log: true,
   mds_log_max_len:  10000,//MDS_CACHE_SIZE / 3,
-  mds_log_max_trimming: 16,
+  mds_log_max_trimming: 32,
   mds_log_read_inc: 65536,
   mds_log_before_reply: true,
   mds_log_flush_on_shutdown: true,
 
   mds_bal_replicate_threshold: 500,
   mds_bal_unreplicate_threshold: 200,
-  mds_bal_interval: 200,
+  mds_bal_interval: 500,
 
   mds_commit_on_shutdown: true,
 
@@ -114,6 +116,10 @@ void parse_config_options(int argc, char **argv,
 
        else if (strcmp(argv[i], "--debug") == 0) 
          g_conf.debug = atoi(argv[++i]);
+       else if (strcmp(argv[i], "--debug_mds_balancer") == 0) 
+         g_conf.debug_mds_balancer = atoi(argv[++i]);
+       else if (strcmp(argv[i], "--debug_mds_log") == 0) 
+         g_conf.debug_mds_log = atoi(argv[++i]);
        else if (strcmp(argv[i], "--log") == 0) 
          g_conf.log_name = argv[++i];
 
@@ -126,6 +132,8 @@ void parse_config_options(int argc, char **argv,
          g_conf.mds_log_before_reply = atoi(argv[++i]);
        else if (strcmp(argv[i], "--mds_log_max_len") == 0) 
          g_conf.mds_log_max_len = atoi(argv[++i]);
+       else if (strcmp(argv[i], "--mds_log_read_inc") == 0) 
+         g_conf.mds_log_read_inc = atoi(argv[++i]);
        else if (strcmp(argv[i], "--mds_log_max_trimming") == 0) 
          g_conf.mds_log_max_trimming = atoi(argv[++i]);
        else if (strcmp(argv[i], "--mds_commit_on_shutdown") == 0) 
index 3f4cbdb213f0fcbe6c1f62ee6accb831ff4d5e37..2715abfb82ef23d336c9962013579cc69fafb401 100644 (file)
@@ -15,8 +15,9 @@ struct md_config_t {
   bool fake_clock;
   bool fakemessenger_serialize;
 
-  int  debug;
+  int debug;
+  int debug_mds_balancer;
+  int debug_mds_log;
 
   // client
   int      client_cache_size;
index 03592a3fcff40a557f049aee47c2356e4e781ecd..e7e05094ac8723a544d90b4d915f2161df4df1f0 100644 (file)
@@ -144,7 +144,7 @@ int main(int oargc, char **oargv) {
        cout << "mounting" << endl;
        client[i]->mount(mkfs);
        
-       cout << "starting synthatic client  " << endl;
+       cout << "starting synthetic client  " << endl;
        syn[i] = new SyntheticClient(client[i]);
 
        char s[20];
index cabfa9baf4439951c97d561a44fa3e235412c0f9..f2a3f9028551dca112429305a65a55101c8a26e5 100644 (file)
@@ -177,11 +177,14 @@ class bufferptr {
   int length() {
        return _len;
   }
+  int offset() {
+       return _off;
+  }
 
 
   // modifiers
   void set_offset(int off) {
-       assert(off <= _len);
+       assert(off <= _buffer->_alloc_len);
        _off = off;
   }
   void set_length(int len) {
@@ -201,11 +204,16 @@ class bufferptr {
        _buffer->_len += len;
        _len += len;
   }
-  void copy(int off, int len, char *dest) {
+  void copy_out(int off, int len, char *dest) {
        assert(off >= 0 && off <= _len);
        assert(len >= 0 && off + len <= _len);
        memcpy(dest, c_str() + off, len);
   }
+  void copy_in(int off, int len, char *src) {
+       assert(off >= 0 && off <= _len);
+       assert(len >= 0 && off + len <= _len);
+       memcpy(c_str() + off, src, len);
+  }
 
   friend ostream& operator<<(ostream& out, bufferptr& bp);
 };
@@ -213,8 +221,8 @@ class bufferptr {
 
 inline ostream& operator<<(ostream& out, bufferptr& bp) {
   return out << "bufferptr(len=" << bp._len << ", off=" << bp._off 
-                        << ", " << bp.c_str() 
-       //<< ", " << *bp._buffer 
+       //<< ", " << bp.c_str() 
+                        << ", " << *bp._buffer 
                         << ")";
 }
 
index 355123959686855a24e5c6bf263235874e8d104e..1af1eebf2f3432785bcd0f92cdeecfbc6e6c6b61 100644 (file)
@@ -99,13 +99,13 @@ class bufferlist {
        while (len > 0) {
          // is the rest ALL in this buffer?
          if (off + len <= (*curbuf).length()) {
-               (*curbuf).copy(off, len, dest);        // yup, last bit!
+               (*curbuf).copy_out(off, len, dest);        // yup, last bit!
                break;
          }
 
          // get as much as we can from this buffer.
          int howmuch = (*curbuf).length() - off;
-         (*curbuf).copy(off, howmuch, dest);
+         (*curbuf).copy_out(off, howmuch, dest);
 
          dest += howmuch;
          len -= howmuch;
@@ -114,6 +114,48 @@ class bufferlist {
          assert(curbuf != _buffers.end());
        }
   }
+
+  void copy_in(int off, int len, char *src) {
+       assert(off >= 0);
+       assert(off + len <= length());
+
+       // advance to off
+       list<bufferptr>::iterator curbuf = _buffers.begin();
+
+       // skip off
+       while (off > 0) {
+         assert(curbuf != _buffers.end());
+         if (off >= (*curbuf).length()) {
+               // skip this buffer
+               off -= (*curbuf).length();
+               curbuf++;
+         } else {
+               // somewhere in this buffer!
+               break;
+         }
+       }
+       
+       // copy
+       while (len > 0) {
+         // is the rest ALL in this buffer?
+         if (off + len <= (*curbuf).length()) {
+               (*curbuf).copy_in(off, len, src);        // yup, last bit!
+               break;
+         }
+
+         // get as much as we can from this buffer.
+         int howmuch = (*curbuf).length() - off;
+         (*curbuf).copy_in(off, howmuch, src);
+
+         src += howmuch;
+         len -= howmuch;
+         off = 0;
+         curbuf++;
+         assert(curbuf != _buffers.end());
+       }
+  }
+
+
   void append(const char *data, int len) {
        if (len == 0) return;
        
@@ -228,7 +270,7 @@ class bufferlist {
                //cout << "keeping end of " << *curbuf << ", losing first " << off+len << endl;
                if (claim_by) 
                  claim_by->append( *curbuf, len, off );
-               (*curbuf).set_offset( off + len );    // ignore beginning big
+               (*curbuf).set_offset( off + len + (*curbuf).offset() );    // ignore beginning big
                (*curbuf).set_length( (*curbuf).length() - len - off );
                //cout << " now " << *curbuf << endl;
                break;
index 3eea14a1bc9f111496e821fb399b7ea35c657c24..c609cb62646b4273042d4e460b72832444e283b8 100644 (file)
@@ -3,6 +3,9 @@
 
 #include "IdAllocator.h"
 #include "MDS.h"
+#include "MDLog.h"
+#include "events/EAlloc.h"
+
 #include "osd/Filer.h"
 #include "osd/OSDCluster.h"
 
@@ -23,8 +26,10 @@ idno_t IdAllocator::get_id(int type)
   idno_t id = free[type].first();
   free[type].erase(id);
   dout(DBLEVEL) << "idalloc " << this << ": getid type " << type << " is " << id << endl;
-  //free[type].dump();
-  //save();
+
+  mds->mdlog->submit_entry(new EAlloc(type, id, EALLOC_EV_ALLOC));
+  dirty[type].insert(id);
+
   return id;
 }
 
@@ -35,14 +40,16 @@ void IdAllocator::reclaim_id(int type, idno_t id)
   dout(DBLEVEL) << "idalloc " << this << ": reclaim type " << type << " id " << id << endl;
   free[type].insert(id);
   //free[type].dump();
-  //save();
+
+  mds->mdlog->submit_entry(new EAlloc(type, id, EALLOC_EV_FREE));
+  dirty[type].insert(id);
 }
 
 
 
 
 
-void IdAllocator::save()
+void IdAllocator::save(Context *onfinish)
 {
   crope data;
 
@@ -73,6 +80,9 @@ void IdAllocator::save()
        assert(mapsize == 0);
   }
 
+  // reset dirty list   .. FIXME this is optimistic, i'm assuming the write succeeds.
+  dirty.clear();
+
   // turn into bufferlist
   bufferlist bl;
   bl.append(data.c_str(), data.length());
@@ -83,7 +93,7 @@ void IdAllocator::save()
                                        0,
                                        bl,
                                        0,
-                                       NULL);
+                                       onfinish);
 }
 
 
index 93f1e587e43e946939a39a0b20016168c8b5732a..15dadd56953c0ba10b0a781a9743f02c700e4efc 100644 (file)
@@ -16,8 +16,9 @@ typedef __uint64_t idno_t;
 class IdAllocator {
   MDS *mds;
 
-  map< int, rangeset<idno_t> > free;  // type -> rangeset
-
+  map< int, rangeset<idno_t> > free;   // type -> rangeset
+  map< int, set<idno_t> >      dirty;  // dirty ids
+  
   bool opened, opening;
   
  public:
@@ -36,13 +37,17 @@ class IdAllocator {
   bool is_open() { return opened; }
   bool is_opening() { return opening; }
 
+  bool is_dirty(int type, idno_t id) {
+       return dirty[type].count(id) ? true:false;
+  }
+
   void reset();
+  void save(Context *onfinish=0);
 
   void shutdown() {
-       if (is_open()) save();
+       if (is_open()) save(0);
   }
 
-  void save();
   void load(Context *onfinish);
   void load_2(int, bufferlist&, Context *onfinish);
 
index 18f15e7ba4bed28f6d6683c259a2633ebfadca20..b2ad458e8ec584105d394bb0aa5aac212925e330 100644 (file)
@@ -10,6 +10,7 @@ using namespace std;
 #define EVENT_STRING       1
 #define EVENT_INODEUPDATE  2
 #define EVENT_UNLINK       3
+#define EVENT_ALLOC        4
 
 // generic log event
 class LogEvent {
@@ -23,21 +24,22 @@ class LogEvent {
   
   int get_type() { return type; }
 
-  virtual crope get_payload() = 0;   // children overload this
-
-  crope get_serialized() {
-       crope s;
+  virtual void encode_payload(bufferlist& bl) = 0;
+  virtual void decode_payload(bufferlist& bl, int& off) = 0;
 
+  void encode(bufferlist& bl) {
        // type
-       __uint32_t ptype = type;
-       s.append((char*)&ptype, sizeof(ptype));
-
-       // len+payload
-       crope payload = get_payload();
-       __uint32_t plen = payload.length();
-       s.append((char*)&plen, sizeof(plen));
-       s.append(payload);
-       return s;
+       bl.append((char*)&type, sizeof(type));
+
+       // len placeholder
+       int len = 0;   // we don't know just yet...
+       int off = bl.length();
+       bl.append((char*)&type, sizeof(len)); 
+
+       // payload
+       encode_payload(bl);
+       len = bl.length() - off - sizeof(len);
+       bl.copy_in(off, sizeof(len), (char*)&len);
   }
   
   virtual bool obsolete(MDS *m) {
index e24016cf5d874ede39d65fb7682beb1c0bd7bd54..306597b92f4c489638b1bd758278726243891fc2 100644 (file)
 #include "events/EString.h"
 #include "events/EInodeUpdate.h"
 #include "events/EUnlink.h"
+#include "events/EAlloc.h"
 
 #include <iostream>
 using namespace std;
 
 #include "include/config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".logstream "
+#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".logstream "
 
+
+// ----------------------------
 // writing
 
-int LogStream::append(LogEvent *e, Context *c)
-{
-  // serialize
-  crope buffer = e->get_serialized();
-  size_t buflen = buffer.length();
+class C_LS_Append : public Context {
+  LogStream *ls;
+  off_t off;
+public:
+  C_LS_Append(LogStream *ls, off_t off) {
+       this->ls = ls;
+       this->off = off;
+  }
+  void finish(int r) {
+       ls->_append_2(off);
+  }
+};
+
 
-  // into a bufferlist.. FIXME
+off_t LogStream::append(LogEvent *e)
+{
+  // serialize FIXME  ********
   bufferlist bl;
-  bl.append(buffer.c_str(), buffer.length());
+  e->encode(bl);
+  size_t elen = bl.length();
+  
+  // append
+  dout(10) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl;
+  
+  off_t off = append_pos;
+  append_pos += elen;
+  dout(15) << "write buf was " << write_buf.length() << endl;
+  write_buf.claim_append(bl);
+  dout(15) << "write buf now " << write_buf.length() << endl;
+
+  return off;
+}
 
+void LogStream::_append_2(off_t off)
+{
+  dout(10) << "sync_pos now " << off << endl;
+  sync_pos = off;
+  
+  // wake up waiters
+  map< off_t, list<Context*> >::iterator it = waiting_for_sync.begin();
+  while (it != waiting_for_sync.end()) {
+       if (it->first > sync_pos) break;
+       
+       // wake them up!
+       dout(15) << it->second.size() << " waiters at offset " << it->first << " <= " << sync_pos << endl;
+       for (list<Context*>::iterator cit = it->second.begin();
+                cit != it->second.end();
+                cit++) 
+         mds->finished_queue.push_back(*cit);
+       
+       // continue
+       waiting_for_sync.erase(it++);
+  }
+}
 
-  dout(10) << "append event type " << e->get_type() << " size " << buflen << " at log offset " << append_pos << endl;
 
+void LogStream::wait_for_sync(Context *c, off_t off) 
+{
+  if (off == 0) off = append_pos;
+  assert(off > sync_pos);
   
-  // advance ptr for later
-  append_pos += buffer.length();
+  dout(15) << "sync " << c << " waiting for " << off << "  (sync_pos currently " << sync_pos << ")" << endl;
+  waiting_for_sync[off].push_back(c);
   
-  // submit write
-  mds->filer->write(log_ino,   // FIXME
-                                       buflen, append_pos-buflen,
-                                       bl,
-                                       0,
-                                       c);
-  return 0;
+  // initiate flush now?  (since we have a waiter...)
+  if (autoflush) flush();
+}
+
+void LogStream::flush()
+{
+  // write to disk
+  if (write_buf.length()) {
+       dout(15) << "flush flush_pos " << flush_pos << " < append_pos " << append_pos << ", writing " << write_buf.length() << " bytes" << endl;
+       
+       assert(write_buf.length() == append_pos - flush_pos);
+       
+       mds->filer->write(log_ino, 
+                                         write_buf.length(), flush_pos,
+                                         write_buf,
+                                         0,
+                                         new C_LS_Append(this, append_pos));
+       
+       write_buf.clear();
+       flush_pos = append_pos;
+  } else {
+       dout(15) << "flush flush_pos " << flush_pos << " == append_pos " << append_pos << ", nothing to do" << endl;
+  }
+
 }
 
 
 
+
+
+
+// -------------------------------------------------
 // reading
 
-/*
-class C_LS_ReadNext : public Context {
-  LogStream *ls;
-  LogEvent **le;
-  Context *c;
-public:
-  C_LS_ReadNext(LogStream *ls, LogEvent **le, Context *c) {
-       this->ls = ls;
-       this->le = le;
-       this->c = c;
-  }
-  void finish(int result) {
-       ls->read_next(le,c,2);
+
+LogEvent *LogStream::get_next_event()
+{
+  if (read_buf.length() < 2*sizeof(__uint32_t)) 
+       return 0;
+  
+  // parse type, length
+  int off = 0;
+  __uint32_t type, length;
+  read_buf.copy(off, sizeof(__uint32_t), (char*)&type);
+  off += sizeof(__uint32_t);
+  read_buf.copy(off, sizeof(__uint32_t), (char*)&length);
+  off += sizeof(__uint32_t);
+
+  dout(10) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl;
+
+  assert(type > 0);
+
+  if (read_buf.length() < off + length)
+       return 0;
+  
+  // create event
+  LogEvent *le;
+  switch (type) {
+  case EVENT_STRING:  // string
+       le = new EString();
+       break;
+       
+  case EVENT_INODEUPDATE:
+       le = new EInodeUpdate();
+       break;
+       
+  case EVENT_UNLINK:
+       le = new EUnlink();
+       break;
+       
+  case EVENT_ALLOC:
+       le = new EAlloc();
+       break;
+
+  default:
+       dout(1) << "uh oh, unknown event type " << type << endl;
+       assert(0);
   }
-};
-*/
+
+  // decode
+  le->decode_payload(read_buf, off);
+
+  // discard front of read_buf
+  read_pos += off;
+  read_buf.splice(0, off);  
+
+  dout(15) << "get_next_event got event, read_pos now " << read_pos << " (append_pos is " << append_pos << ")" << endl;
+
+  // ok!
+  return le;
+}
+
+
 
 class C_LS_ReadChunk : public Context {
 public:
   bufferlist bl;
   LogStream *ls;
-  LogEvent **le;
-  Context *c;
-  C_LS_ReadChunk(LogStream *ls, LogEvent **le, Context *c) {
+
+  C_LS_ReadChunk(LogStream *ls) {
        this->ls = ls;
-       this->le = le;
-       this->c = c;
   }
   void finish(int result) {
-       crope next_bit;
-       next_bit.append(bl.c_str(), bl.length());
-       ls->did_read_bit(next_bit, le, c);
+       ls->_did_read(bl);
   }
 };
 
-void LogStream::did_read_bit(crope& next_bit, LogEvent **le, Context *c) 
-{
-  // add to our buffer
-  buffer.append(next_bit);
-  reading_block = false;
-  
-  // throw off beginning part
-  if (buffer.length() > g_conf.mds_log_read_inc*2) {
-       int trim = buffer.length() - g_conf.mds_log_read_inc*2;
-       buf_start += trim;
-       buffer = buffer.substr(trim, buffer.length() - trim);
-       dout(10) << "did_read_bit adjusting buf_start now +" << trim << " = " << buf_start << " len " << buffer.length() << endl;
-  }
-  
-  // continue at step 2
-  read_next(le, c, 2);
-}
 
-int LogStream::read_next(LogEvent **le, Context *c, int step) 
+void LogStream::wait_for_next_event(Context *c)
 {
-  if (step == 1) {
-       // does buffer have what we want?
-       //if (buf_start > cur_pos ||
-       //buf_start+buffer.length() < cur_pos+4) {
-       if (buf_start+buffer.length() < cur_pos+ g_conf.mds_log_read_inc/2) {
-
-         // make sure block is being read
-         if (reading_block) {
-               dout(10) << "read_next already reading log head from disk, offset " << cur_pos << endl;
-               assert(0);  
-         } else {
-               off_t start = buf_start+buffer.length();
-               dout(10) << "read_next reading log head from disk, offset " << start << " len " << g_conf.mds_log_read_inc << endl;
-               // nope.  read a chunk
-               C_LS_ReadChunk *readc = new C_LS_ReadChunk(this, le, c);
-               reading_block = true;
-               mds->filer->read(log_ino,  // FIXME
-                                                g_conf.mds_log_read_inc, start,
-                                                &readc->bl,
-                                                readc);
-         }
-         return 0;
-       }
-       step = 2;
+  // add waiter
+  if (c) waiting_for_read.push_back(c);
+
+  // issue read
+  off_t tail = read_pos + read_buf.length();
+  size_t size = g_conf.mds_log_read_inc;
+  if (tail + size > sync_pos) {
+       size = sync_pos - tail;
+       assert(size > 0);   // bleh, wait for sync, etc.
   }
 
+  dout(15) << "wait_for_next_event reading from pos " << tail << " len " << size << endl;
+  C_LS_ReadChunk *readc = new C_LS_ReadChunk(this);
+  mds->filer->read(log_ino,  
+                                  g_conf.mds_log_read_inc, tail,
+                                  &readc->bl,
+                                  readc);
+}
 
-  if (step == 2) {
-       reading_block = false;
-
-       // decode event
-       unsigned off = cur_pos-buf_start;
-       __uint32_t type, length;
-       buffer.copy(off, sizeof(__uint32_t), (char*)&type);
-       buffer.copy(off+sizeof(__uint32_t), sizeof(__uint32_t), (char*)&length);
-       off += sizeof(type) + sizeof(length);
-
-       dout(10) << "read_next got event type " << type << " size " << length << " at log offset " << cur_pos << endl;
-       cur_pos += sizeof(type) + sizeof(length) + length;
-
-       switch (type) {
-         
-       case EVENT_STRING:  // string
-         *le = new EString(buffer.substr(off,length));
-         break;
-         
-       case EVENT_INODEUPDATE:
-         *le = new EInodeUpdate(buffer.substr(off,length));
-         break;
-
-       case EVENT_UNLINK:
-         *le = new EUnlink(buffer.substr(off,length));
-         break;
 
-         
+void LogStream::_did_read(bufferlist& blist)
+{
+  dout(15) << "_did_read got " << blist.length() << " bytes" << endl;
+  read_buf.claim_append(blist);
 
-       default:
-         dout(1) << "uh oh, unknown event type " << type << endl;
-         assert(0);
-       }
-       
-       // finish
-       if (c) {
-         c->finish(0);
-         delete c;
-       }
-
-       /*
-       // any other waiters too!
-       list<Context*> finished = waiting_for_read_block;
-       waiting_for_read_block.clear();
-       for (list<Context*>::iterator it = finished.begin();
-                it != finished.end();
-                it++) {
-         Context *c = *it;
-         if (c) {
-               c->finish(0);
-               delete c;
-         }
-       }
-       */
-       
-  }
+  list<Context*> finished;
+  finished.splice(finished.begin(), waiting_for_read);
+  finish_contexts(finished, 0);
 }
+
index 70881a05deb2f596360bf3279d440162cd768c91..2aecc9dbe2a54eb000f1162aced424deaca40d91 100644 (file)
@@ -4,47 +4,77 @@
 #include "include/types.h"
 #include "include/Context.h"
 
-#include <ext/rope>
-using namespace __gnu_cxx;
+#include "include/buffer.h"
+#include "include/bufferlist.h"
+
+#include <map>
+#include <list>
 
 class LogEvent;
+class Filer;
 class MDS;
 
 class LogStream {
  protected:
   MDS *mds;
-  off_t cur_pos, append_pos;
+  Filer *filer;
+
   inodeno_t log_ino;
-  object_t oid;
 
-  bool reading_block;
-  //list<Context*> waiting_for_read_block;
+  // writing
+  off_t sync_pos;        // first non-written byte
+  off_t flush_pos;       // first non-writing byte, beginning of write_buf
+  off_t append_pos;      // where next event will be written
+  bufferlist write_buf;  // unwritten (between flush_pos and append_pos)
+
+  // reading
+  off_t read_pos;        // abs position in file
+  //off_t read_buf_start;  // where read buf begins
+  bufferlist read_buf;
+  bool reading;
+
+  std::map< off_t, std::list<Context*> > waiting_for_sync;
+  std::list<Context*>                    waiting_for_read;
+
 
-  crope buffer;
-  off_t buf_start;
+  bool autoflush;
+  
  public:
-  LogStream(MDS *mds, inodeno_t log_ino) {
+  LogStream(MDS *mds, Filer *filer, inodeno_t log_ino) {
        this->mds = mds;
+       this->filer = filer;
        this->log_ino = log_ino;
-       cur_pos = 0;
-       append_pos = 0; // fixme
-       buf_start = 0;
-       reading_block = false;
-  }
 
-  off_t seek(off_t offset) {
-       cur_pos = offset;
-  }
+       // wr
+       sync_pos = flush_pos = append_pos = 0;
+       autoflush = true;
 
-  off_t get_pos() {
-       return cur_pos;
+       // rd
+       read_pos = 0;
+       reading = false;
   }
 
+  // write (append to end)
+  off_t append(LogEvent *e);          // returns offset it will be written to
+  void  _append_2(off_t off);     
+  void  wait_for_sync(Context *c, off_t off=0);  // wait for flush
+  void  flush();                                 // initiate flush
+  
+  // read (from front)
+  //bool      has_next_event();
+  LogEvent *get_next_event();
+  void      wait_for_next_event(Context *c);
+  void      _did_read(bufferlist& blist);
+
+
+  // old interface
+  /*
   // WARNING: non-reentrant; single reader only
   int read_next(LogEvent **le, Context *c, int step=1);
   void did_read_bit(crope& next_bit, LogEvent **le, Context *c) ;
 
   int append(LogEvent *e, Context *c);  // append at cur_pos, mind you!
+  */
 };
 
 #endif
index 1733bccf00c0e6fccacbae9d0ba28709b6ff59f6..24df995b4c91b25d03780a13c88a4cc9cb418aa3 100644 (file)
@@ -17,7 +17,7 @@ using namespace std;
 
 #include "include/config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".bal "
+#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_mds_balancer) cout << "mds" << mds->get_nodeid() << ".bal "
 
 #define MIN_LOAD    50   //  ??
 #define MIN_REEXPORT 5  // will automatically reexport
@@ -101,7 +101,7 @@ void MDBalancer::send_heartbeat()
 
 void MDBalancer::handle_heartbeat(MHeartbeat *m)
 {
-  dout(5) << " got heartbeat " << m->get_beat() << " from " << m->get_source() << " " << m->get_load() << endl;
+  dout(5) << "=== got heartbeat " << m->get_beat() << " from " << m->get_source() << " " << m->get_load() << endl;
   
   if (!mds->mdcache->get_root()) {
        dout(10) << "no root on handle" << endl;
@@ -114,7 +114,7 @@ void MDBalancer::handle_heartbeat(MHeartbeat *m)
        beat_epoch = m->get_beat();
        send_heartbeat();
 
-       mds->mdcache->show_imports();
+       show_imports();
   }
   
   mds_load[ m->get_source() ] = m->get_load();
@@ -173,7 +173,7 @@ void MDBalancer::do_rebalance()
   
   if (my_load < target_load.root_pop) {
        dout(5) << "  i am underloaded, doing nothing." << endl;
-       mds->mdcache->show_imports();
+       show_imports();
        return;
   }  
 
@@ -241,7 +241,7 @@ void MDBalancer::do_rebalance()
 
        dout(5) << " sending " << amount << " to " << target << endl;
        
-       mds->mdcache->show_imports();
+       show_imports();
 
        // search imports from target
        if (import_from_map.count(target)) {
@@ -310,7 +310,7 @@ void MDBalancer::do_rebalance()
   }
 
   dout(5) << "rebalance done" << endl;
-  mds->mdcache->show_imports();
+  show_imports();
   
 }
 
@@ -511,6 +511,64 @@ void MDBalancer::add_import(CDir *dir)
 
 
 
+
+
+
+void MDBalancer::show_imports(bool external)
+{
+  int db = 7; //debug level
+
+  if (mds->mdcache->imports.size() == 0) {
+       dout(db) << "no imports/exports" << endl;
+       return;
+  }
+  dout(db) << "imports/exports:" << endl;
+
+  set<CDir*> ecopy = mds->mdcache->exports;
+
+  timepair_t now = g_clock.gettimepair();
+
+  for (set<CDir*>::iterator it = mds->mdcache->imports.begin();
+          it != mds->mdcache->imports.end();
+          it++) {
+       CDir *im = *it;
+       dout(db) << "  + import (" << im->popularity[MDS_POP_CURDOM].get(now) << "/" << im->popularity[MDS_POP_ANYDOM].get(now) << ")  " << *im << endl;
+       assert( im->is_import() );
+       assert( im->is_auth() );
+       
+       for (set<CDir*>::iterator p = mds->mdcache->nested_exports[im].begin();
+                p != mds->mdcache->nested_exports[im].end();
+                p++) {
+         CDir *exp = *p;
+         dout(db) << "      - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl;
+         assert( exp->is_export() );
+         assert( !exp->is_auth() );
+         
+         if ( mds->mdcache->get_containing_import(exp) != im ) {
+               dout(1) << "uh oh, containing import is " << mds->mdcache->get_containing_import(exp) << endl;
+               dout(1) << "uh oh, containing import is " << *mds->mdcache->get_containing_import(exp) << endl;
+               assert( mds->mdcache->get_containing_import(exp) == im );
+         }
+         
+         if (ecopy.count(exp) != 1) {
+               dout(1) << " nested_export " << *exp << " not in exports" << endl;
+               assert(0);
+         }
+         ecopy.erase(exp);
+       }
+  }
+  
+  if (ecopy.size()) {
+       for (set<CDir*>::iterator it = ecopy.begin();
+                it != ecopy.end();
+                it++) 
+         dout(1) << " stray item in exports: " << **it << endl;
+       assert(ecopy.size() == 0);
+  }
+}
+
+
+
 /*  replicate?
 
          float dir_pop = dir->get_popularity();
index c5e2e3405e94e0ee082b86cd045653259515b85f..4e1022d8e15fa87b524675fcc88e1033a078f151 100644 (file)
@@ -53,6 +53,7 @@ class MDBalancer {
   void hit_recursive(class CDir *dir, timepair_t& now);
 
 
+  void show_imports(bool external=false);
 
 };
 
index 660316ada06e24be3cc1c1c4744d3c445f74cf7f..eca7000c7cd710510adf00a8d47f8317ca7e3433 100644 (file)
@@ -1465,6 +1465,13 @@ void MDCache::request_cleanup(Message *req)
 
   // remove from map
   active_requests.erase(req);
+
+
+  // log some stats *****
+  mds->logger->set("c", lru.lru_get_size());
+  mds->logger->set("cmax", lru.lru_get_max());
+
+
 }
 
 void MDCache::request_finish(Message *req)
@@ -1473,7 +1480,7 @@ void MDCache::request_finish(Message *req)
   request_cleanup(req);
   delete req;  // delete req
   
-  mds->logger->inc("rep");
+  mds->logger->inc("reply");
 
   //dump();
 }
@@ -7224,55 +7231,7 @@ void MDCache::handle_unhash_dir_finish(CDir *dir, int auth)
 
 void MDCache::show_imports()
 {
-  if (imports.size() == 0) {
-       dout(7) << "no imports/exports" << endl;
-       return;
-  }
-  dout(7) << "imports/exports:" << endl;
-
-  set<CDir*> ecopy = exports;
-
-  timepair_t now = g_clock.gettimepair();
-
-  for (set<CDir*>::iterator it = imports.begin();
-          it != imports.end();
-          it++) {
-       CDir *im = *it;
-       dout(7) << "  + import (" << im->popularity[MDS_POP_CURDOM].get(now) << "/" << im->popularity[MDS_POP_ANYDOM].get(now) << ")  " << *im << endl;
-       assert( im->is_import() );
-       assert( im->is_auth() );
-       
-       for (set<CDir*>::iterator p = nested_exports[im].begin();
-                p != nested_exports[im].end();
-                p++) {
-         CDir *exp = *p;
-         dout(7) << "      - ex (" << exp->popularity[MDS_POP_NESTED].get(now) << ", " << exp->popularity[MDS_POP_ANYDOM].get(now) << ")" << *exp << " to " << exp->dir_auth << endl;
-         assert( exp->is_export() );
-         assert( !exp->is_auth() );
-         
-         if ( get_containing_import(exp) != im ) {
-               dout(7) << "uh oh, containing import is " << get_containing_import(exp) << endl;
-               dout(7) << "uh oh, containing import is " << *get_containing_import(exp) << endl;
-               assert( get_containing_import(exp) == im );
-         }
-         
-         if (ecopy.count(exp) != 1) {
-               dout(7) << " nested_export " << *exp << " not in exports" << endl;
-               assert(0);
-         }
-         ecopy.erase(exp);
-       }
-  }
-  
-  if (ecopy.size()) {
-       for (set<CDir*>::iterator it = ecopy.begin();
-                it != ecopy.end();
-                it++) 
-         dout(7) << " stray item in exports: " << **it << endl;
-       assert(ecopy.size() == 0);
-  }
-  
-
+  //mds->balancer->show_imports(true);
 }
 
 
index 4bbf1e05c08baafc333ab88d8fda0cc5c9b1111d..2cf8c67716d88b305ef57680f59b650d4eafd188 100644 (file)
@@ -14,7 +14,7 @@ LogType mdlog_logtype;
 
 #include "include/config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".log "
+#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".log "
 
 // cons/des
 
@@ -23,203 +23,159 @@ MDLog::MDLog(MDS *m)
   mds = m;
   num_events = 0;
   max_events = 0;
-  trim_reading = false;
-  reader = new LogStream(mds, 
-                                                100 + mds->get_nodeid());
-  writer = new LogStream(mds,
-                                                100 + mds->get_nodeid());
-
-  string name;
-  name = "mds";
-  int w = mds->get_nodeid();
-  if (w >= 1000) name += ('0' + ((w/1000)%10));
-  if (w >= 100) name += ('0' + ((w/100)%10));
-  if (w >= 10) name += ('0' + ((w/10)%10));
-  name += ('0' + ((w/1)%10));
-  name += ".log";
+
+  logstream = new LogStream(mds, mds->filer, MDS_INO_LOG_OFFSET + mds->get_nodeid());
+
+  char name[80];
+  sprintf(name, "mds%d.log", mds->get_nodeid());
   logger = new Logger(name, (LogType*)&mdlog_logtype);
 }
 
 
 MDLog::~MDLog()
 {
-  if (reader) { delete reader; reader = 0; }
-  if (writer) { delete writer; writer = 0; }
+  if (logstream) { delete logstream; logstream = 0; }
   if (logger) { delete logger; logger = 0; }
 }
 
 
-class C_MDL_SubmitEntry : public Context {
-protected:
-  MDLog *mdl;
-public:
-  Context *c;
-  LogEvent *le;
-
-  C_MDL_SubmitEntry(MDLog *m, LogEvent *e, Context *c) {
-       mdl = m; 
-       le = e;
-       this->c = c;
-  }
-  void finish(int res) {
-       mdl->submit_entry_2(le,c);
-  }
-};
 
 int MDLog::submit_entry( LogEvent *e,
                                                 Context *c ) 
 {
-  if (!g_conf.mds_log) {
+  dout(5) << "submit_entry at " << num_events << endl;
+
+  if (g_conf.mds_log) {
+       off_t off = logstream->append(e);
+       delete e;
+       num_events++;
+
+       logger->inc("add");
+       logger->set("size", num_events);
+
+       if (c) 
+         logstream->wait_for_sync(c, off);
+  } else {
+       // hack: log is disabled..
        if (c) {
          c->finish(0);
          delete c;
        }
-       return 0;
   }
-
-  dout(5) << "submit_entry" << endl;
-
-  // write it
-  writer->append(e, new C_MDL_SubmitEntry(this, e, c));
-  logger->inc("add");
+  
 }
 
-int MDLog::submit_entry_2( LogEvent *e,
-                                                  Context *c ) 
+void MDLog::wait_for_sync( Context *c )
 {
-  dout(5) << "submit_entry done, log size " << num_events << endl;
-
-  // written!
-  num_events++;
-  delete e;
-  logger->set("len", num_events);
-
-  if (c) {
+  if (g_conf.mds_log) {
+       // wait
+       logstream->wait_for_sync(c);
+  } else {
+       // hack: bypass
        c->finish(0);
        delete c;
   }
-  
+}
+
+void MDLog::flush()
+{
+  logstream->flush();
+
   // trim
-  trim(NULL);    // FIXME probably not every time?
+  trim(NULL);
 }
 
-class C_MDL_Trim : public Context {
-protected:
-  MDLog *mdl;
+
+
+
+// trim
+
+class C_MDL_Trimmed : public Context {
 public:
+  MDLog *mdl;
   LogEvent *le;
-  int step;
 
-  C_MDL_Trim(MDLog *m, LogEvent *e = 0, int s=2) {
+  C_MDL_Trimmed(MDLog *mdl, LogEvent *le) {
+       this->mdl = mdl; 
+       this->le = le;
+  }
+  void finish(int res) {
+       mdl->_trimmed(le);
+  }
+};
+
+class C_MDL_Reading : public Context {
+public:
+  MDLog *mdl;
+  C_MDL_Reading(MDLog *m) {
        mdl = m; 
-       step = s; le = e;
   }
   void finish(int res) {
-       if (step == 2) 
-         mdl->trim_2_didread(le);
-       else if (step == 3)
-         mdl->trim_3_didretire(le);
+       mdl->waiting_for_read = false;
+       mdl->trim(0);
   }
 };
 
 
-int MDLog::trim(Context *c)
+void MDLog::trim(Context *c)
 {
-  if (num_events - trimming.size() > max_events) {
-       dout(5) << "trimming.  num_events " << num_events << ", trimming " << trimming.size() << " max " << max_events << endl;
+  // add waiter
+  if (c) trim_waiters.push_back(c);
 
-       // add this waiter
-       if (c) trim_waiters.push_back(c);
+  // trim!
+  while (num_events - trimming.size() > max_events) {
+       dout(5) << "trim: num_events " << num_events << " - trimming " << trimming.size() << " > max " << max_events << endl;
        
-       trim_readnext();   // read next event off end of log.
-       return 0;
-  } 
-
-  // no trimming to be done.
-  if (c) {
-       c->finish(0);
-       delete c;
-  }
-}
-
-void MDLog::trim_readnext()
-{
-  if (trim_reading) {
-       //dout(10) << "trim_readnext already reading." << endl;
-       return;
+       LogEvent *le = logstream->get_next_event();
+
+       if (le) {
+         // we just read an event.
+         if (le->obsolete(mds) == true) {
+               // obsolete
+               dout(7) << "  obsolete " << le << endl;
+               delete le;
+               num_events--;
+               logger->inc("obs");
+         } else {
+               if (trimming.size() < g_conf.mds_log_max_trimming) {
+                 // trim!
+                 dout(7) << "  trimming " << le << endl;
+                 trimming.insert(le);
+                 le->retire(mds, new C_MDL_Trimmed(this, le));
+                 logger->inc("retire");
+               } else {
+                 dout(7) << "  already trimming max, waiting" << endl;
+                 return;
+               }
+         }
+       } else {
+         // need to read!
+         if (!waiting_for_read) {
+               waiting_for_read = true;
+               dout(7) << "  waiting for read" << endl;
+               logstream->wait_for_next_event(new C_MDL_Reading(this));
+         } else {
+               dout(7) << "  already waiting for read" << endl;
+         }
+         return;
+       }
   }
-
-  dout(10) << "trim_readnext" << endl;
-  trim_reading = true;
-  C_MDL_Trim *readfin = new C_MDL_Trim(this);
-  reader->read_next(&readfin->le, readfin);
-  logger->inc("read");
+  
+  // done!
+  list<Context*> finished;
+  finished.splice(finished.begin(), trim_waiters);
+  
+  finish_contexts(finished, 0);
 }
 
-
-// trim_2 : just read an event
-int MDLog::trim_2_didread(LogEvent *le)
+void MDLog::_trimmed(LogEvent *le) 
 {
-  dout(10) << "trim_2_didread " << le << endl;
-
-  trim_reading = false;
+  dout(7) << "  trimmed " << le << endl;
+  trimming.erase(le);
+  delete le;
+  num_events--;
   
-  // we just read an event.
-  if (le->obsolete(mds) == true) {
-       trim_3_didretire(le);    // we can discard this event and be done.
-       logger->inc("obs");
-  } else {
-       dout(10) << "retire " << le << " ";
-       trimming.push_back(le);  // add to limbo list
-       le->retire(mds, new C_MDL_Trim(this, le, 3));   // retire entry
-       logger->inc("retire");
-  }
-
-  // read another event?      FIXME: max_trimming maybe?  would need to restructure this again.
-  if (num_events - trimming.size() > max_events &&
-         trimming.size() < g_conf.mds_log_max_trimming) {
-       trim_readnext();
-  }
+  trim(0);
 }
 
 
-int MDLog::trim_3_didretire(LogEvent *le)
-{
-  //dout(10) << "trim_2_didretire " << le << endl;
-
-  // done with this le.
-  if (le) {
-       num_events--;
-       trimming.remove(le);
-       delete le;
-  }  
-
-  // read more?
-  if (trim_reading == false &&
-         num_events - trimming.size() > max_events) {
-       trim_readnext();
-  }
-
-  // last one?
-  if (trimming.size() == 0 &&       // none mid-retire,
-         trim_reading == false) {      // and not mid-read
-       
-       dout(5) << "retired " << le << ", trim done, log size now " << num_events << endl;
-
-       // we're done.
-       list<Context*> finished = trim_waiters;
-       trim_waiters.clear();
-
-       list<Context*>::iterator it = finished.begin();
-       while (it != finished.end()) {
-         Context *c = *it;
-         if (c) {
-               c->finish(0);
-               delete c;
-         }
-       }
-  } else {
-       dout(5) << "retired " << le << ", still trimming, log size now " << num_events << endl;
-  }
-}
-
index 486e08e7424a6598dc0326c54f03d2a458c0d6ad..6420422954cf2a45367b2d529aeb58fbec0bdd2a 100644 (file)
@@ -39,13 +39,15 @@ class MDLog {
   size_t num_events; // in events
   size_t max_events;
 
-  LogStream *reader;
-  LogStream *writer;
+  LogStream *logstream;
   
-  list<LogEvent*> trimming;     // events currently being trimmed
+  set<LogEvent*>  trimming;     // events currently being trimmed
   list<Context*>  trim_waiters; // contexts waiting for trim
   bool            trim_reading;
 
+  bool waiting_for_read;
+  friend class C_MDL_Reading;
+
   Logger *logger;
   
  public:
@@ -63,16 +65,12 @@ class MDLog {
   }
 
   int submit_entry( LogEvent *e,
-                                        Context *c );
-  int submit_entry_2( LogEvent *e,
-                                         Context *c );
-
-  int trim(Context *c);                // want to trim
-  void trim_readnext();                // read next event
-  int trim_2_didread(LogEvent *e);     // read log event
-  int trim_3_didretire(LogEvent *e);   // finished retiring log event
-
+                                       Context *c = 0 );
+  void wait_for_sync( Context *c );
+  void flush();
 
+  void trim(Context *c);
+  void _trimmed(LogEvent *le);
 };
 
 #endif
index b3a0b7716cca589536256f60c643932e6cf555d7..faba9facc8102bcb4f4b492a19dc05e050892dc8 100644 (file)
@@ -258,6 +258,7 @@ public:
 
 void MDS::proc_message(Message *m) 
 {
+  
   switch (m->get_type()) {
        // OSD ===============
   case MSG_OSD_OPREPLY:
@@ -387,6 +388,10 @@ void MDS::my_dispatch(Message *m)
        did_heartbeat_hack = true;
   }
   */
+
+
+  // flush log
+  mdlog->flush();
        
 
   // finish any triggered contexts
@@ -534,32 +539,33 @@ class C_MDS_CommitRequest : public Context {
   MClientRequest *req;
   MClientReply *reply;
   CInode *tracei;    // inode to include a trace for
-  bool pinned;
   LogEvent *event;
+
 public:
   C_MDS_CommitRequest(MDS *mds, 
-                                         MClientRequest *req, MClientReply *reply, CInode *tracei,
-                                         LogEvent *event = 0,
-                                         bool pinned=false) {
+                                         MClientRequest *req, MClientReply *reply, CInode *tracei, 
+                                         LogEvent *event=0) {
        this->mds = mds;
        this->req = req;
        this->tracei = tracei;
-       this->pinned = pinned;
        this->reply = reply;
        this->event = event;
   }
   void finish(int r) {
-       if (r == 0) {
-         // success.  log and reply.
-         mds->commit_request(req, reply, tracei, event);
-       } else {
+       if (r != 0) {
          // failure.  set failure code and reply.
          reply->set_result(r);
+       }
+       if (event) {
+         mds->commit_request(req, reply, tracei, event);
+       } else {
+         // reply.
          mds->reply_request(req, reply, tracei);
        }
   }
 };
 
+
 /*
  * send generic response (just and error code)
  */
@@ -568,6 +574,7 @@ void MDS::reply_request(MClientRequest *req, int r, CInode *tracei)
   reply_request(req, new MClientReply(req, r), tracei);
 }
 
+
 /*
  * send given reply
  * include a trace to tracei
@@ -591,6 +598,7 @@ void MDS::reply_request(MClientRequest *req, MClientReply *reply, CInode *tracei
   stat_ops++;
 }
 
+
 /* 
  * commit event(s) to the metadata journal, then reply.
  * or, be sloppy and do it concurrently (see g_conf.mds_log_before_reply)
@@ -601,37 +609,22 @@ void MDS::commit_request(MClientRequest *req,
                                                 LogEvent *event,
                                                 LogEvent *event2) 
 {        
-  if (g_conf.mds_log_before_reply) {
+  // log
+  if (event) mdlog->submit_entry(event);
+  if (event2) mdlog->submit_entry(event2);
+  
+  if (g_conf.mds_log_before_reply && g_conf.mds_log) {
        // SAFE mode!
-       
-       if (event) {
-         // log, then reply
 
-         // pin inode so it doesn't go away!
-         if (tracei) mdcache->request_pin_inode(req, tracei);
-         
-         // pass event2 as event1 (so we chain together!)
-         /*
-               WARNING: by chaining back to CommitRequest we may get
-               something not quite right if the log commit fails.  what 
-               happens (to the whole system!) then?   ** FIXME **
-         */
-         dout(10) << "commit_request submitting log entry" << endl;
-         mdlog->submit_entry(event, 
-                                                 new C_MDS_CommitRequest(this, req, reply, tracei, event2, true));  // inode is pinned
-       }
-       else {
-         // just reply, no log entry (anymore).
-         reply_request(req, reply, tracei);
-       }
-  } else {
-       // SLOPPY mode!
-
-       // log
-       if (event) mdlog->submit_entry(event, NULL);
-       if (event2) mdlog->submit_entry(event2, NULL);
+       // pin inode so it doesn't go away!
+       if (tracei) mdcache->request_pin_inode(req, tracei);
 
-       // reply
+       // wait for log sync
+       mdlog->wait_for_sync(new C_MDS_CommitRequest(this, req, reply, tracei)); 
+       return;
+  }
+  else {
+       // just reply
        reply_request(req, reply, tracei);
   }
 }
diff --git a/ceph/mds/events/EAlloc.h b/ceph/mds/events/EAlloc.h
new file mode 100644 (file)
index 0000000..f686204
--- /dev/null
@@ -0,0 +1,59 @@
+#ifndef __EALLOC_H
+#define __EALLOC_H
+
+#include <assert.h>
+#include "include/config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+#include "../IdAllocator.h"
+
+#define EALLOC_EV_ALLOC  1
+#define EALLOC_EV_FREE   2
+
+class EAlloc : public LogEvent {
+ protected:
+  int  type;
+  idno_t id;
+  int  what;  
+
+ public:
+  EAlloc(int type, idno_t id, int what) :
+       LogEvent(EVENT_ALLOC) {
+       this->type = type;
+       this->id = id;
+       this->what = what;
+  }
+  EAlloc() :
+       LogEvent(EVENT_ALLOC) {
+  }
+  
+  virtual void encode_payload(bufferlist& bl) {
+       bl.append((char*)&type, sizeof(type));
+       bl.append((char*)&id, sizeof(id));
+       bl.append((char*)&what, sizeof(what));
+  }
+  void decode_payload(bufferlist& bl, int& off) {
+       bl.copy(off, sizeof(type), (char*)&type);
+       off += sizeof(type);
+       bl.copy(off, sizeof(id), (char*)&id);
+       off += sizeof(id);
+       bl.copy(off, sizeof(what), (char*)&what);
+       off += sizeof(what);
+  }
+
+  
+  virtual bool obsolete(MDS *mds) {
+       if (mds->idalloc->is_dirty(type,id))
+         return false;   // still dirty
+       else
+         return true;    // already flushed
+  }
+
+  virtual void retire(MDS *mds, Context *c) {
+       mds->idalloc->save(c);
+  }
+  
+};
+
+#endif
index e819f7a1dec509d57975c08e74a09612e49cefc4..a09f6e5cdbb7599bd8b0d088f16273442fdff4fb 100644 (file)
@@ -22,18 +22,21 @@ class EInodeUpdate : public LogEvent {
        this->inode = in->inode;
        version = in->get_version();
   }
-  EInodeUpdate(crope s) :
+  EInodeUpdate() :
        LogEvent(EVENT_INODEUPDATE) {
-       s.copy(0, sizeof(version), (char*)&version);
-       s.copy(sizeof(version), sizeof(inode), (char*)&inode);
   }
   
-  virtual crope get_payload() {
-       crope r;
-       r.append((char*)&version, sizeof(version));
-       r.append((char*)&inode, sizeof(inode));
-       return r;
+  virtual void encode_payload(bufferlist& bl) {
+       bl.append((char*)&version, sizeof(version));
+       bl.append((char*)&inode, sizeof(inode));
   }
+  void decode_payload(bufferlist& bl, int& off) {
+       bl.copy(off, sizeof(version), (char*)&version);
+       off += sizeof(version);
+       bl.copy(off, sizeof(inode), (char*)&inode);
+       off += sizeof(inode);
+  }
+
   
   virtual bool obsolete(MDS *mds) {
        // am i obsolete?
index 3c9c2749ff7a9625e3a6c1520a08db54db37870d..face114bfea276e2127ecebc074cff61b46240eb 100644 (file)
@@ -18,14 +18,17 @@ class EString : public LogEvent {
        LogEvent(EVENT_STRING) {
        event = e;
   }
-  EString(crope s) :
+  EString() :
        LogEvent(EVENT_STRING) {
-       event = s.c_str();
+  }
+
+  void decode_payload(bufferlist& bl, int& off) {
+       event = bl.c_str() + off;
+       off += event.length() + 1;
   }
   
-  // note: LogEvent owns serialized buffer
-  virtual crope get_payload() {
-       return crope(event.c_str());
+  void encode_payload(bufferlist& bl) {
+       bl.append(event.c_str(), event.length()+1);
   }
 };
 
index 7b57007b4b8eeb73ae5455db6595ffc2985c545a..051b92cd95d0a25a0d53b98d878e685958287fbd 100644 (file)
@@ -24,19 +24,22 @@ class EUnlink : public LogEvent {
        this->dname = dn->get_name();
        this->version = dir->get_version();
   }
-  EUnlink(crope s) :
+  EUnlink() :
        LogEvent(EVENT_UNLINK) {
-       s.copy(0, sizeof(dir_ino), (char*)&dir_ino);
-       s.copy(sizeof(dir_ino), sizeof(version), (char*)&version);
-       dname = s.c_str() + sizeof(dir_ino) + sizeof(version);
   }
   
-  virtual crope get_payload() {
-       crope r;
-       r.append((char*)&dir_ino, sizeof(dir_ino));
-       r.append((char*)&version, sizeof(version));
-       r.append((char*)dname.c_str(), dname.length() + 1);
-       return r;
+  virtual void encode_payload(bufferlist& bl) {
+       bl.append((char*)&dir_ino, sizeof(dir_ino));
+       bl.append((char*)&version, sizeof(version));
+       bl.append((char*)dname.c_str(), dname.length() + 1);
+  }
+  void decode_payload(bufferlist& bl, int& off) {
+       bl.copy(off, sizeof(dir_ino), (char*)&dir_ino);
+       off += sizeof(dir_ino);
+       bl.copy(off, sizeof(version), (char*)&version);
+       off += sizeof(version);
+       dname = bl.c_str() + off;
+       off += dname.length() + 1;
   }
   
   virtual bool obsolete(MDS *mds) {