]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
anchortable separated out anchorclient;
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 12 Oct 2006 23:20:18 +0000 (23:20 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 12 Oct 2006 23:20:18 +0000 (23:20 +0000)
logstream updated;
misc mds cleanup

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@933 29311d96-e01e-0410-9327-a35deaab8ce9

24 files changed:
ceph/Makefile
ceph/config.cc
ceph/config.h
ceph/fakefuse.cc
ceph/mds/Anchor.h [new file with mode: 0644]
ceph/mds/AnchorClient.cc [new file with mode: 0644]
ceph/mds/AnchorClient.h [new file with mode: 0644]
ceph/mds/AnchorTable.cc
ceph/mds/AnchorTable.h
ceph/mds/CDir.cc
ceph/mds/CInode.cc
ceph/mds/LogEvent.cc [new file with mode: 0644]
ceph/mds/LogEvent.h
ceph/mds/LogStream.cc [deleted file]
ceph/mds/LogStream.h [deleted file]
ceph/mds/MDCache.cc
ceph/mds/MDLog.cc
ceph/mds/MDLog.h
ceph/mds/MDS.cc
ceph/mds/MDS.h
ceph/mds/MDSMap.h
ceph/messages/MAnchorReply.h
ceph/messages/MAnchorRequest.h
ceph/osdc/LogStreamer.cc

index bed4d67700b7d02d7cca642f7a89e9df2721f81d..36e05f4a7d14d32e7fa55fc9b5d20ac0d353a111 100644 (file)
@@ -36,8 +36,9 @@ MDS_OBJS= \
        mds/CDir.o\
        mds/CInode.o\
        mds/AnchorTable.o\
+       mds/AnchorClient.o\
        mds/MDStore.o\
-       mds/LogStream.o\
+       mds/LogEvent.o\
        mds/IdAllocator.o\
        mds/MDLog.o
 
@@ -50,7 +51,8 @@ OSD_OBJS= \
 OSDC_OBJS= \
        osdc/Filer.o\
        osdc/ObjectCacher.o\
-       osdc/Objecter.o
+       osdc/Objecter.o\
+       osdc/LogStreamer.o
 
 MON_OBJS= \
        mon/Monitor.o\
@@ -209,8 +211,8 @@ mon.o: ${MON_OBJS}
        ${CC} -fPIC ${CFLAGS} -c $< -o $@
 
 count:
-       cat *.cc */*.cc */*.h */*/*.h | wc -l
-       cat *.cc */*.cc */*.h */*/*.h | grep -c \;
+       cat ${SRCS} | wc -l
+       cat ${SRCS} | grep -c \;
 
 .depend:
        touch .depend
index 98e92222937974250285273048671de0a38cb918..c118eb117076e40f0a625abb9b54c4a8bffb689e 100644 (file)
@@ -48,8 +48,8 @@ FileLayout g_OSD_MDLogLayout( 1<<20, 1, 1<<20, 2 );  // 1M objects
 //FileLayout g_OSD_MDLogLayout( 1<<20, 1, 1<<20 );  // old way
 
 // fake osd failures: osd -> time
-map<int,float> g_fake_osd_down;
-map<int,float> g_fake_osd_out;
+std::map<int,float> g_fake_osd_down;
+std::map<int,float> g_fake_osd_out;
 
 md_config_t g_debug_after_conf;
 
@@ -287,11 +287,9 @@ md_config_t g_conf = {
 
 #include <stdlib.h>
 #include <string.h>
-#include <iostream>
-using namespace std;
 
 
-void env_to_vec(vector<char*>& args) 
+void env_to_vec(std::vector<char*>& args) 
 {
   const char *p = getenv("CEPH_ARGS");
   if (!p) return;
@@ -317,13 +315,13 @@ void env_to_vec(vector<char*>& args)
 
 
 void argv_to_vec(int argc, char **argv,
-                 vector<char*>& args)
+                 std::vector<char*>& args)
 {
   for (int i=1; i<argc; i++)
     args.push_back(argv[i]);
 }
 
-void vec_to_argv(vector<char*>& args,
+void vec_to_argv(std::vector<char*>& args,
                  int& argc, char **&argv)
 {
   argv = (char**)malloc(sizeof(char*) * argc);
@@ -334,9 +332,9 @@ void vec_to_argv(vector<char*>& args,
     argv[argc++] = args[i];
 }
 
-void parse_config_options(vector<char*>& args)
+void parse_config_options(std::vector<char*>& args)
 {
-  vector<char*> nargs;
+  std::vector<char*> nargs;
 
   for (unsigned i=0; i<args.size(); i++) {
     if (strcmp(args[i], "--nummon") == 0) 
index 1ea67092fb4a44311f04d8963bcc883685c41b3d..08a700c2b30bd2f4ec799de7656346fe6a017e94 100644 (file)
@@ -20,10 +20,9 @@ extern class FileLayout g_OSD_MDLogLayout;
 
 #include <vector>
 #include <map>
-using namespace std;
 
-extern map<int,float> g_fake_osd_down;
-extern map<int,float> g_fake_osd_out;
+extern std::map<int,float> g_fake_osd_down;
+extern std::map<int,float> g_fake_osd_out;
 
 #define OSD_REP_PRIMARY 0
 #define OSD_REP_SPLAY   1
@@ -273,15 +272,15 @@ struct md_config_t {
 extern md_config_t g_conf;     
 extern md_config_t g_debug_after_conf;     
 
-#define dout(x)  if ((x) <= g_conf.debug) cout
-#define dout2(x) if ((x) <= g_conf.debug) cout
+#define dout(x)  if ((x) <= g_conf.debug) std::cout
+#define dout2(x) if ((x) <= g_conf.debug) std::cout
 
-void env_to_vec(vector<char*>& args);
+void env_to_vec(std::vector<char*>& args);
 void argv_to_vec(int argc, char **argv,
-                 vector<char*>& args);
-void vec_to_argv(vector<char*>& args,
+                 std::vector<char*>& args);
+void vec_to_argv(std::vector<char*>& args,
                  int& argc, char **&argv);
 
-void parse_config_options(vector<char*>& args);
+void parse_config_options(std::vector<char*>& args);
 
 #endif
index 874234c6545fa7b2db465b5581652b8358fedb4a..f021d83bac0358ab9d77d4dad643cef090d0850f 100644 (file)
@@ -75,8 +75,8 @@ int main(int argc, char **argv) {
   args = nargs;
   vec_to_argv(args, argc, argv);
 
-  MonMap *monmap = new MonMap;
-
+  MonMap *monmap = new MonMap(g_conf.num_mon);
+  
   Monitor *mon[g_conf.num_mon];
   for (int i=0; i<g_conf.num_mon; i++) {
     mon[i] = new Monitor(i, new FakeMessenger(MSG_ADDR_MON(i)), monmap);
diff --git a/ceph/mds/Anchor.h b/ceph/mds/Anchor.h
new file mode 100644 (file)
index 0000000..caabdbe
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __ANCHOR_H
+#define __ANCHOR_H
+
+#include <string>
+using std::string;
+
+#include "include/types.h"
+#include "include/bufferlist.h"
+
+class Anchor {
+public:
+  inodeno_t ino;      // my ino
+  inodeno_t dirino;   // containing dir
+  string    ref_dn;   // referring dentry
+  int       nref;     // reference count
+
+  Anchor() {}
+  Anchor(inodeno_t ino, inodeno_t dirino, string& ref_dn, int nref=0) {
+    this->ino = ino;
+    this->dirino = dirino;
+    this->ref_dn = ref_dn;
+    this->nref = nref;
+  }  
+  
+  void _encode(bufferlist &bl) {
+    bl.append((char*)&ino, sizeof(ino));
+    bl.append((char*)&dirino, sizeof(dirino));
+    bl.append((char*)&nref, sizeof(nref));
+    ::_encode(ref_dn, bl);
+  }
+  void _decode(bufferlist& bl, int& off) {
+    bl.copy(off, sizeof(ino), (char*)&ino);
+    off += sizeof(ino);
+    bl.copy(off, sizeof(dirino), (char*)&dirino);
+    off += sizeof(dirino);
+    bl.copy(off, sizeof(nref), (char*)&nref);
+    off += sizeof(nref);
+    ::_decode(ref_dn, bl, off);
+  }
+} ;
+
+#endif
diff --git a/ceph/mds/AnchorClient.cc b/ceph/mds/AnchorClient.cc
new file mode 100644 (file)
index 0000000..b330a93
--- /dev/null
@@ -0,0 +1,149 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include <iostream>
+using std::cout;
+using std::cerr;
+using std::endl;
+
+#include "Anchor.h"
+#include "AnchorClient.h"
+#include "MDSMap.h"
+
+#include "include/Context.h"
+#include "msg/Messenger.h"
+
+#include "MDS.h"
+
+#include "messages/MAnchorRequest.h"
+#include "messages/MAnchorReply.h"
+
+#include "config.h"
+#undef dout
+#define dout(x)  if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient "
+#define derr(x)  if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient "
+
+
+void AnchorClient::dispatch(Message *m)
+{
+  switch (m->get_type()) {
+  case MSG_MDS_ANCHORREPLY:
+    handle_anchor_reply((MAnchorReply*)m);
+    break;
+
+  default:
+    assert(0);
+  }
+}
+
+void AnchorClient::handle_anchor_reply(class MAnchorReply *m)
+{
+  switch (m->get_op()) {
+
+  case ANCHOR_OP_LOOKUP:
+    {
+      assert(pending_lookup_trace.count(m->get_ino()) == 1);
+
+      *(pending_lookup_trace[ m->get_ino() ]) = m->get_trace();
+      Context *onfinish = pending_lookup_context[ m->get_ino() ];
+
+      pending_lookup_trace.erase(m->get_ino());
+      pending_lookup_context.erase(m->get_ino());
+
+      if (onfinish) {
+        onfinish->finish(0);
+        delete onfinish;
+      }
+    }
+    break;
+
+  case ANCHOR_OP_UPDATE:
+  case ANCHOR_OP_CREATE:
+  case ANCHOR_OP_DESTROY:
+    {
+      assert(pending_op.count(m->get_ino()) == 1);
+
+      Context *onfinish = pending_op[m->get_ino()];
+      pending_op.erase(m->get_ino());
+
+      if (onfinish) {
+        onfinish->finish(0);
+        delete onfinish;
+      }
+    }
+    break;
+
+  default:
+    assert(0);
+  }
+
+}
+
+
+
+/*
+ * public async interface
+ */
+
+void AnchorClient::lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+{
+  // send message
+  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_LOOKUP, ino);
+
+  pending_lookup_trace[ino] = &trace;
+  pending_lookup_context[ino] = onfinish;
+
+  messenger->send_message(req, 
+                         MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+                         MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+{
+  // send message
+  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_CREATE, ino);
+  req->set_trace(trace);
+
+  pending_op[ino] = onfinish;
+
+  messenger->send_message(req, 
+                         MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+                         MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+{
+  // send message
+  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_UPDATE, ino);
+  req->set_trace(trace);
+  
+  pending_op[ino] = onfinish;
+  
+  messenger->send_message(req, 
+                         MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+                         MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::destroy(inodeno_t ino, Context *onfinish)
+{
+  // send message
+  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_DESTROY, ino);
+
+  pending_op[ino] = onfinish;
+
+  messenger->send_message(req, 
+                         MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+                         MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+
diff --git a/ceph/mds/AnchorClient.h b/ceph/mds/AnchorClient.h
new file mode 100644 (file)
index 0000000..80b736a
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __ANCHORCLIENT_H
+#define __ANCHORCLIENT_H
+
+#include <vector>
+using std::vector;
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+
+#include "include/types.h"
+#include "msg/Dispatcher.h"
+
+#include "Anchor.h"
+
+class Messenger;
+class MDSMap;
+class Context;
+
+class AnchorClient : public Dispatcher {
+  Messenger *messenger;
+  MDSMap *mdsmap;
+
+  // remote state
+  hash_map<inodeno_t, Context*>  pending_op;
+  hash_map<inodeno_t, Context*>  pending_lookup_context;
+  hash_map<inodeno_t, vector<Anchor*>*>  pending_lookup_trace;
+
+  void handle_anchor_reply(class MAnchorReply *m);  
+
+
+public:
+  AnchorClient(Messenger *ms, MDSMap *mm) : messenger(ms), mdsmap(mm) {}
+  
+  // async user interface
+  void lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
+  void create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
+  void update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
+  void destroy(inodeno_t ino, Context *onfinish);
+
+  void dispatch(Message *m);
+};
+
+#endif
index 21ab8ea368b3b565bb3c30ecf59c5646ee0d955c..b8c8017ff3ba40580af210a9e800ea2e7d1f5713 100644 (file)
@@ -11,8 +11,6 @@
  * 
  */
 
-
-
 #include "AnchorTable.h"
 #include "MDS.h"
 
 #include "messages/MAnchorRequest.h"
 #include "messages/MAnchorReply.h"
 
+#include "common/Clock.h"
+
 #include "config.h"
 #undef dout
-#define dout(x)  if (x <= g_conf.debug) cout << "anchortable: "
+#define dout(x)  if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
+#define derr(x)  if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
 
 AnchorTable::AnchorTable(MDS *mds)
 {
@@ -43,8 +44,8 @@ AnchorTable::AnchorTable(MDS *mds)
 
 bool AnchorTable::add(inodeno_t ino, inodeno_t dirino, string& ref_dn) 
 {
-  dout(7) << "add " << ino << " dirino " << dirino << " ref_dn " << ref_dn << endl;
-
+  dout(7) << "add " << std::hex << ino << " dirino " << dirino << std::dec << " ref_dn " << ref_dn << endl;
+  
   // parent should be there
   assert(dirino < 1000 ||             // system dirino
          anchor_map.count(dirino));   // have
@@ -52,17 +53,17 @@ bool AnchorTable::add(inodeno_t ino, inodeno_t dirino, string& ref_dn)
   if (anchor_map.count(ino) == 0) {
     // new item
     anchor_map[ ino ] = new Anchor(ino, dirino, ref_dn);
-    dout(10) << "  add: added " << ino << endl;
+    dout(10) << "  add: added " << std::hex << ino << std::dec << endl;
     return true;
   } else {
-    dout(10) << "  add: had " << ino << endl;
+    dout(10) << "  add: had " << std::hex << ino << std::dec << endl;
     return false;
   }
 }
 
 void AnchorTable::inc(inodeno_t ino)
 {
-  dout(7) << "inc " << ino << endl;
+  dout(7) << "inc " << std::hex << ino << std::dec << endl;
 
   assert(anchor_map.count(ino) != 0);
   Anchor *anchor = anchor_map[ino];
@@ -71,7 +72,7 @@ void AnchorTable::inc(inodeno_t ino)
   while (1) {
     anchor->nref++;
       
-    dout(10) << "  inc: record " << ino << " now " << anchor->nref << endl;
+    dout(10) << "  inc: record " << std::hex << ino << std::dec << " now " << anchor->nref << endl;
     ino = anchor->dirino;
     
     if (ino == 0) break;
@@ -83,7 +84,7 @@ void AnchorTable::inc(inodeno_t ino)
 
 void AnchorTable::dec(inodeno_t ino) 
 {
-  dout(7) << "dec " << ino << endl;
+  dout(7) << "dec " << std::hex << ino << std::dec << endl;
 
   assert(anchor_map.count(ino) != 0);
   Anchor *anchor = anchor_map[ino];
@@ -93,13 +94,13 @@ void AnchorTable::dec(inodeno_t ino)
     anchor->nref--;
       
     if (anchor->nref == 0) {
-      dout(10) << "  dec: record " << ino << " now 0, removing" << endl;
+      dout(10) << "  dec: record " << std::hex << ino << std::dec << " now 0, removing" << endl;
       inodeno_t dirino = anchor->dirino;
       anchor_map.erase(ino);
       delete anchor;
       ino = dirino;
     } else {
-      dout(10) << "  dec: record " << ino << " now " << anchor->nref << endl;
+      dout(10) << "  dec: record " << std::hex << ino << std::dec << " now " << anchor->nref << endl;
       ino = anchor->dirino;
     }
     
@@ -117,14 +118,14 @@ void AnchorTable::dec(inodeno_t ino)
 
 void AnchorTable::lookup(inodeno_t ino, vector<Anchor*>& trace)
 {
-  dout(7) << "lookup " << ino << endl;
+  dout(7) << "lookup " << std::hex << ino << std::dec << endl;
 
   assert(anchor_map.count(ino) == 1);
   Anchor *anchor = anchor_map[ino];
   assert(anchor);
 
   while (true) {
-    dout(10) << "  record " << anchor->ino << " dirino " << anchor->dirino << " ref_dn " << anchor->ref_dn << endl;
+    dout(10) << "  record " << std::hex << anchor->ino << " dirino " << anchor->dirino << std::dec << " ref_dn " << anchor->ref_dn << endl;
     trace.insert(trace.begin(), anchor);  // lame FIXME
 
     if (anchor->dirino < MDS_INO_BASE) break;
@@ -137,7 +138,7 @@ void AnchorTable::lookup(inodeno_t ino, vector<Anchor*>& trace)
 
 void AnchorTable::create(inodeno_t ino, vector<Anchor*>& trace)
 {
-  dout(7) << "create " << ino << endl;
+  dout(7) << "create " << std::hex << ino << std::dec << endl;
   
   // make sure trace is in table
   for (unsigned i=0; i<trace.size(); i++) 
@@ -163,10 +164,6 @@ void AnchorTable::proc_message(Message *m)
   case MSG_MDS_ANCHORREQUEST:
     handle_anchor_request((MAnchorRequest*)m);
     break;
-        
-  case MSG_MDS_ANCHORREPLY:
-    handle_anchor_reply((MAnchorReply*)m);
-    break;
     
   default:
     assert(0);
@@ -221,123 +218,6 @@ void AnchorTable::handle_anchor_request(class MAnchorRequest *m)
   delete m;
 }
 
-void AnchorTable::handle_anchor_reply(class MAnchorReply *m)
-{
-  switch (m->get_op()) {
-
-  case ANCHOR_OP_LOOKUP:
-    {
-      assert(pending_lookup_trace.count(m->get_ino()) == 1);
-
-      *(pending_lookup_trace[ m->get_ino() ]) = m->get_trace();
-      Context *onfinish = pending_lookup_context[ m->get_ino() ];
-
-      pending_lookup_trace.erase(m->get_ino());
-      pending_lookup_context.erase(m->get_ino());
-
-      if (onfinish) {
-        onfinish->finish(0);
-        delete onfinish;
-      }
-    }
-    break;
-
-  case ANCHOR_OP_UPDATE:
-  case ANCHOR_OP_CREATE:
-  case ANCHOR_OP_DESTROY:
-    {
-      assert(pending_op.count(m->get_ino()) == 1);
-
-      Context *onfinish = pending_op[m->get_ino()];
-      pending_op.erase(m->get_ino());
-
-      if (onfinish) {
-        onfinish->finish(0);
-        delete onfinish;
-      }
-    }
-    break;
-
-  default:
-    assert(0);
-  }
-
-}
-
-
-/*
- * public async interface
- */
-
-void AnchorTable::lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
-{
-  // me?
-  if (false && mds->get_nodeid() == 0) {
-    lookup(ino, trace);
-    onfinish->finish(0);
-    delete onfinish;
-    return;
-  }
-
-  // send message
-  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_LOOKUP, ino);
-
-  pending_lookup_trace[ino] = &trace;
-  pending_lookup_context[ino] = onfinish;
-
-  mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-void AnchorTable::create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
-{
-  // me?
-  if (false && mds->get_nodeid() == 0) {
-    create(ino, trace);
-    onfinish->finish(0);
-    delete onfinish;
-    return;
-  }
-
-  // send message
-  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_CREATE, ino);
-  req->set_trace(trace);
-
-  pending_op[ino] = onfinish;
-
-  mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-void AnchorTable::update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
-{
-  // send message
-  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_UPDATE, ino);
-  req->set_trace(trace);
-
-  pending_op[ino] = onfinish;
-
-  mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-void AnchorTable::destroy(inodeno_t ino, Context *onfinish)
-{
-  // me?
-  if (false && mds->get_nodeid() == 0) {
-    destroy(ino);
-    onfinish->finish(0);
-    delete onfinish;
-    return;
-  }
-
-  // send message
-  MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_DESTROY, ino);
-
-  pending_op[ino] = onfinish;
-
-  mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-
-
 
 
 
@@ -351,28 +231,28 @@ void AnchorTable::save(Context *onfinish)
   if (!opened) return;
   
   // build up write
-  crope tab;
+  bufferlist tabbl;
 
   int num = anchor_map.size();
-  tab.append((char*)&num, sizeof(int));
+  tabbl.append((char*)&num, sizeof(int));
 
   for (hash_map<inodeno_t, Anchor*>::iterator it = anchor_map.begin();
        it != anchor_map.end();
        it++) {
-    dout(14) << "adding anchor for " << it->first << endl;
+    dout(14) << " saving anchor for " << std::hex << it->first << std::dec << endl;
     Anchor *a = it->second;
     assert(a);
-    a->_rope(tab);
+    a->_encode(tabbl);
   }
 
-  size_t size = tab.length();
-  tab.insert(0, (char*)&size, sizeof(size));
+  bufferlist bl;
+  size_t size = tabbl.length();
+  bl.append((char*)&size, sizeof(size));
+  bl.claim_append(tabbl);
 
   dout(7) << " " << num << " anchors, " << size << " bytes" << endl;
   
   // write!
-  bufferlist bl;
-  bl.append(tab.c_str(), tab.length());
   mds->filer->write(table_inode,
                     0, bl.length(),
                     bl, 0, 
@@ -383,39 +263,35 @@ void AnchorTable::save(Context *onfinish)
 
 class C_AT_Load : public Context {
   AnchorTable *at;
-  Context *onfinish;
 public:
   size_t size;
   bufferlist bl;
-  C_AT_Load(size_t size, AnchorTable *at, Context *onfinish) {
+  C_AT_Load(size_t size, AnchorTable *at) {
     this->size = size;
     this->at = at;
-    this->onfinish = onfinish;
   }
   void finish(int result) {
     assert(result > 0);
 
-    at->load_2(size, bl, onfinish);
+    at->load_2(size, bl);
   }
 };
 
 class C_AT_LoadSize : public Context {
   AnchorTable *at;
   MDS *mds;
-  Context *onfinish;
 public:
   bufferlist bl;
-  C_AT_LoadSize(AnchorTable *at, MDS *mds, Context *onfinish) {
+  C_AT_LoadSize(AnchorTable *at, MDS *mds) {
     this->at = at;
     this->mds = mds;
-    this->onfinish = onfinish;
   }
   void finish(int r) {
     size_t size = 0;
     bl.copy(0, sizeof(size), (char*)&size);
     cout << "r is " << r << " size is " << size << endl;
     if (r > 0 && size > 0) {
-      C_AT_Load *c = new C_AT_Load(size, at, onfinish);
+      C_AT_Load *c = new C_AT_Load(size, at);
       mds->filer->read(at->table_inode,
                        sizeof(size), size,
                        &c->bl,
@@ -423,7 +299,7 @@ public:
     } else {
       // fail
       bufferlist empty;
-      at->load_2(0, empty, onfinish);
+      at->load_2(0, empty);
     }
   }
 };
@@ -433,31 +309,29 @@ void AnchorTable::load(Context *onfinish)
   dout(7) << "load" << endl;
 
   assert(!opened);
+
+  waiting_for_open.push_back(onfinish);
   
-  C_AT_LoadSize *c = new C_AT_LoadSize(this, mds, onfinish);
+  C_AT_LoadSize *c = new C_AT_LoadSize(this, mds);
   mds->filer->read(table_inode,
                    0, sizeof(size_t),
                    &c->bl,
                    c);
 }
 
-void AnchorTable::load_2(size_t size, bufferlist& bl, Context *onfinish)
+void AnchorTable::load_2(size_t size, bufferlist& bl)
 {
-  // make a rope to be easy.. FIXME someday
-  crope r;
-  bl._rope(r);
-
   // num
   int off = 0;
   int num;
-  r.copy(0, sizeof(num), (char*)&num);
+  bl.copy(0, sizeof(num), (char*)&num);
   off += sizeof(num);
   
   // parse anchors
   for (int i=0; i<num; i++) {
     Anchor *a = new Anchor;
-    a->_unrope(r, off);
-    dout(10) << "  load_2 unroped " << a->ino << " dirino " << a->dirino << " ref_dn " << a->ref_dn << endl;
+    a->_decode(bl, off);
+    dout(10) << "load_2 decoded " << std::hex << a->ino << " dirino " << a->dirino << std::dec << " ref_dn " << a->ref_dn << endl;
     anchor_map[a->ino] = a;
   }
 
@@ -467,10 +341,6 @@ void AnchorTable::load_2(size_t size, bufferlist& bl, Context *onfinish)
   opening = false;
 
   // finish
-  if (onfinish) {
-    onfinish->finish(0);
-    delete onfinish;
-  }
   finish_contexts(waiting_for_open);
 }
 
index 6cfc929e63d4115c01fced0165f1d6461fd6cc4a..f7ce1a1eab3a119c8b903dc46f4eac9bcbbbbd68 100644 (file)
 #ifndef __ANCHORTABLE_H
 #define __ANCHORTABLE_H
 
-#include "include/types.h"
+#include "Anchor.h"
 #include "include/Context.h"
-#include "include/bufferlist.h"
 
 #include <ext/hash_map>
 using namespace __gnu_cxx;
 
 class MDS;
 
-class Anchor {
-public:
-  inodeno_t ino;      // my ino
-  inodeno_t dirino;   // containing dir
-  string    ref_dn;   // referring dentry
-  int       nref;     // reference count
-
-  Anchor() {}
-  Anchor(inodeno_t ino, inodeno_t dirino, string& ref_dn, int nref=0) {
-    this->ino = ino;
-    this->dirino = dirino;
-    this->ref_dn = ref_dn;
-    this->nref = nref;
-  }  
-
-  void _rope(crope& r) {
-    r.append((char*)&ino, sizeof(ino));
-    r.append((char*)&dirino, sizeof(dirino));
-    r.append((char*)&nref, sizeof(nref));
-    ::_rope(ref_dn, r);
-  }
-  void _unrope(crope& r, int& off) {
-    r.copy(off, sizeof(ino), (char*)&ino);
-    off += sizeof(ino);
-    r.copy(off, sizeof(dirino), (char*)&dirino);
-    off += sizeof(dirino);
-    r.copy(off, sizeof(nref), (char*)&nref);
-    off += sizeof(nref);
-    ::_unrope(ref_dn, r, off);
-  }
-} ;
-
 
 class AnchorTable {
   MDS *mds;
@@ -64,11 +31,6 @@ class AnchorTable {
   bool opening, opened;
   list<Context*> waiting_for_open;
 
-  // remote state
-  hash_map<inodeno_t, Context*>  pending_op;
-  hash_map<inodeno_t, Context*>  pending_lookup_context;
-  hash_map<inodeno_t, vector<Anchor*>*>  pending_lookup_trace;
-
  public:
   inode_t table_inode;
 
@@ -101,17 +63,9 @@ class AnchorTable {
   void proc_message(class Message *m);
  protected:
   void handle_anchor_request(class MAnchorRequest *m);  
-  void handle_anchor_reply(class MAnchorReply *m);  
 
 
  public:
-  // user interface
-  void lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
-  void create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
-  void update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
-  void destroy(inodeno_t ino, Context *onfinish);
-
-
 
   // load/save entire table for now!
   void reset() {
@@ -120,7 +74,7 @@ class AnchorTable {
   }
   void save(Context *onfinish);
   void load(Context *onfinish);
-  void load_2(size_t size, bufferlist& bl, Context *onfinish);
+  void load_2(size_t size, bufferlist& bl);
 
 
 };
index 65f486d0c22357c90905c232c9a1abdd6964fca7..73de70ec2d84b2f6a7fc25b026054a0aeeea6f68 100644 (file)
 #include "MDSMap.h"
 
 #include "include/Context.h"
+#include "common/Clock.h"
 
 #include <cassert>
 
 #include "config.h"
 #undef dout
-#define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << "mds" << mds->get_nodeid() << "        cdir: "
+#define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << "        cdir: "
 
 
 // PINS
index e5f095de97fe8991372e01354b7992bcfa58d055..146a6b53f9615d1f8b1b05a633ef634d1644a45d 100644 (file)
 #include "MDS.h"
 #include "AnchorTable.h"
 
+#include "common/Clock.h"
+
 #include <string>
 
 #include "config.h"
 #undef dout
-#define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << "cinode: "
+#define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << "     cinode: "
 
 
 int cinode_pins[CINODE_NUM_PINS];  // counts
diff --git a/ceph/mds/LogEvent.cc b/ceph/mds/LogEvent.cc
new file mode 100644 (file)
index 0000000..832c6c6
--- /dev/null
@@ -0,0 +1,71 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "LogEvent.h"
+
+#include "MDS.h"
+
+// events i know of
+#include "events/EString.h"
+#include "events/EInodeUpdate.h"
+#include "events/EDirUpdate.h"
+#include "events/EUnlink.h"
+#include "events/EAlloc.h"
+
+LogEvent *LogEvent::decode(bufferlist& bl)
+{
+  // parse type, length
+  int off = 0;
+  __uint32_t type;
+  bl.copy(off, sizeof(type), (char*)&type);
+  off += sizeof(type);
+
+  int length = bl.length() - off;
+  dout(15) << "decode_log_event type " << type << ", size " << length << endl;
+  
+  assert(type > 0);
+  
+  // create event
+  LogEvent *le;
+  switch (type) {
+  case EVENT_STRING:  // string
+    le = new EString();
+    break;
+    
+  case EVENT_INODEUPDATE:
+    le = new EInodeUpdate();
+    break;
+    
+  case EVENT_DIRUPDATE:
+    le = new EDirUpdate();
+    break;
+    
+  case EVENT_UNLINK:
+    le = new EUnlink();
+    break;
+    
+  case EVENT_ALLOC:
+    le = new EAlloc();
+    break;
+
+  default:
+    dout(1) << "uh oh, unknown event type " << type << endl;
+    assert(0);
+  }
+
+  // decode
+  le->decode_payload(bl, off);
+  
+  return le;
+}
+
index f42906909749ef143bf20b8e52198e1244f41291..76730284a15a46c758639df432f90d66b65c2587 100644 (file)
 #ifndef __LOGEVENT_H
 #define __LOGEVENT_H
 
-#include <stdlib.h>
-#include <string>
-#include <ext/rope>
-using namespace std;
-
 #define EVENT_STRING       1
 #define EVENT_INODEUPDATE  2
 #define EVENT_DIRUPDATE    3
 #define EVENT_UNLINK       4
 #define EVENT_ALLOC        5
 
-#include "config.h"
 
+#include <string>
+using namespace std;
+
+#include "include/bufferlist.h"
+#include "include/Context.h"
+
+class MDS;
 
 // generic log event
 class LogEvent {
@@ -49,15 +50,10 @@ class LogEvent {
     assert(_type > 0);
     bl.append((char*)&_type, sizeof(_type));
 
-    // len placeholder
-    int len = 0;   // we don't know just yet...
-    int off = bl.length();
-    bl.append((char*)&len, sizeof(len)); 
-
     // payload
     encode_payload(bl);
 
-    // HACK: pad payload to match md log layout?
+    /*// HACK: pad payload to match md log layout?
     int elen = bl.length() - off + sizeof(_type);
     if (elen % g_conf.mds_log_pad_entry > 0) {
       int add = g_conf.mds_log_pad_entry - (elen % g_conf.mds_log_pad_entry);
@@ -69,10 +65,13 @@ class LogEvent {
     } 
 
     len = bl.length() - off - sizeof(len);
-
     bl.copy_in(off, sizeof(len), (char*)&len);
+    */
   }
+
+  static LogEvent *decode(bufferlist &bl);
   
+  // ...
   virtual bool obsolete(MDS *m) {
     return true;
   }
@@ -81,6 +80,7 @@ class LogEvent {
     c->finish(0);
     delete c;
   }
+
 };
 
 #endif
diff --git a/ceph/mds/LogStream.cc b/ceph/mds/LogStream.cc
deleted file mode 100644 (file)
index c388e61..0000000
+++ /dev/null
@@ -1,323 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-
-#include "LogStream.h"
-#include "MDS.h"
-#include "LogEvent.h"
-
-#include "osdc/Filer.h"
-
-#include "common/Logger.h"
-
-#include "events/EString.h"
-#include "events/EInodeUpdate.h"
-#include "events/EDirUpdate.h"
-#include "events/EUnlink.h"
-#include "events/EAlloc.h"
-
-#include <iostream>
-using namespace std;
-
-#include "config.h"
-#undef dout
-#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".logstream "
-
-
-
-
-LogStream::LogStream(MDS *mds, Filer *filer, inodeno_t log_ino, Logger *l) 
-{
-  this->mds = mds;
-  this->filer = filer;
-  this->logger = l;
-  
-  // inode
-  memset(&log_inode, 0, sizeof(log_inode));
-  log_inode.ino = log_ino;
-  log_inode.layout = g_OSD_MDLogLayout;
-  
-  if (g_conf.mds_local_osd) {
-    log_inode.layout.object_layout = OBJECT_LAYOUT_STARTOSD;
-    log_inode.layout.osd = mds->get_nodeid() + 10000;   // hack
-  }
-  
-  // wr
-  sync_pos = flush_pos = append_pos = 0;
-  autoflush = true;
-  
-  // rd
-  read_pos = 0;
-  reading = false;
-}
-
-
-
-// ----------------------------
-// writing
-
-class C_LS_Append : public Context {
-  LogStream *ls;
-  off_t off;
-public:
-  C_LS_Append(LogStream *ls, off_t off) {
-    this->ls = ls;
-    this->off = off;
-  }
-  void finish(int r) {
-    ls->_append_2(off);
-  }
-};
-
-
-off_t LogStream::append(LogEvent *e)
-{
-  // serialize
-  bufferlist bl;
-  e->encode(bl);
-
-  size_t elen = bl.length();
-  
-  // append
-  dout(15) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl;
-  
-  off_t off = append_pos;
-  append_pos += elen;
-  
-  //dout(15) << "write buf was " << write_buf.length() << " bl " << write_buf << endl;
-  write_buf.claim_append(bl);
-  //dout(15) << "write buf now " << write_buf.length() << " bl " << write_buf << endl;
-
-  return off;
-}
-
-void LogStream::_append_2(off_t off)
-{
-  dout(15) << g_clock.now() << " sync_pos now " << off << " skew " << off % g_conf.mds_log_pad_entry << endl;
-  sync_pos = off;
-
-  // discard written bufferlist
-  assert(writing_buffers.count(off) == 1);
-  delete writing_buffers[off];
-  writing_buffers.erase(off);
-  
-  utime_t now = g_clock.now();
-  now -= writing_latency[off];
-  writing_latency.erase(off);
-  logger->finc("lsum", (double)now);
-  logger->inc("lnum", 1);
-
-  // wake up waiters
-  map< off_t, list<Context*> >::iterator it = waiting_for_sync.begin();
-  while (it != waiting_for_sync.end()) {
-    if (it->first > sync_pos) break;
-    
-    // wake them up!
-    dout(15) << it->second.size() << " waiters at offset " << it->first << " <= " << sync_pos << endl;
-    for (list<Context*>::iterator cit = it->second.begin();
-         cit != it->second.end();
-         cit++) 
-      mds->finished_queue.push_back(*cit);
-    
-    // continue
-    waiting_for_sync.erase(it++);
-  }
-}
-
-
-void LogStream::wait_for_sync(Context *c, off_t off) 
-{
-  if (off == 0) off = append_pos;
-  assert(off > sync_pos);
-  
-  dout(15) << "sync " << c << " waiting for " << off << "  (sync_pos currently " << sync_pos << ")" << endl;
-  waiting_for_sync[off].push_back(c);
-  
-  // initiate flush now?  (since we have a waiter...)
-  if (autoflush) flush();
-}
-
-void LogStream::flush()
-{
-  // write to disk
-  if (write_buf.length()) {
-    dout(15) << "flush flush_pos " << flush_pos << " < append_pos " << append_pos << ", writing " << write_buf.length() << " bytes" << endl;
-    
-    assert(write_buf.length() == append_pos - flush_pos);
-    
-    // tuck writing buffer away until write finishes
-    writing_buffers[append_pos] = new bufferlist;
-    writing_buffers[append_pos]->claim(write_buf);
-
-    writing_latency[append_pos] = g_clock.now();
-
-    // write it
-    mds->filer->write(log_inode, 
-                      flush_pos, writing_buffers[append_pos]->length(), 
-                      *writing_buffers[append_pos],
-                      0,
-                      new C_LS_Append(this, append_pos), // on ack
-                      NULL);                             // on safe
-
-    flush_pos = append_pos;
-  } else {
-    dout(15) << "flush flush_pos " << flush_pos << " == append_pos " << append_pos << ", nothing to do" << endl;
-  }
-
-}
-
-
-
-
-
-
-// -------------------------------------------------
-// reading
-
-
-LogEvent *LogStream::get_next_event()
-{
-  if (read_buf.length() < 2*sizeof(__uint32_t)) 
-    return 0;
-  
-  // parse type, length
-  int off = 0;
-  __uint32_t type, length;
-  read_buf.copy(off, sizeof(__uint32_t), (char*)&type);
-  off += sizeof(__uint32_t);
-  read_buf.copy(off, sizeof(__uint32_t), (char*)&length);
-  off += sizeof(__uint32_t);
-
-  dout(15) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl;
-
-  assert(type > 0);
-
-  if (read_buf.length() < off + length)
-    return 0;
-  
-  // create event
-  LogEvent *le;
-  switch (type) {
-  case EVENT_STRING:  // string
-    le = new EString();
-    break;
-    
-  case EVENT_INODEUPDATE:
-    le = new EInodeUpdate();
-    break;
-    
-  case EVENT_DIRUPDATE:
-    le = new EDirUpdate();
-    break;
-    
-  case EVENT_UNLINK:
-    le = new EUnlink();
-    break;
-    
-  case EVENT_ALLOC:
-    le = new EAlloc();
-    break;
-
-  default:
-    dout(1) << "uh oh, unknown event type " << type << endl;
-    assert(0);
-  }
-
-  // decode
-  le->decode_payload(read_buf, off);
-  off = sizeof(type) + sizeof(length) + length;  // advance past any padding that wasn't decoded..
-
-  // discard front of read_buf
-  read_pos += off;
-  read_buf.splice(0, off);  
-
-  dout(15) << "get_next_event got event, read_pos now " << read_pos << " (append_pos is " << append_pos << ")" << endl;
-
-  // ok!
-  return le;
-}
-
-
-
-class C_LS_ReadAfterSync : public Context {
-public:
-  LogStream *ls;
-  Context *waitfornext;
-  C_LS_ReadAfterSync(LogStream *l, Context *c) : ls(l), waitfornext(c) {}
-  void finish(int) {
-    ls->wait_for_next_event(waitfornext);
-  }
-};
-
-class C_LS_ReadChunk : public Context {
-public:
-  bufferlist bl;
-  LogStream *ls;
-
-  C_LS_ReadChunk(LogStream *ls) {
-    this->ls = ls;
-  }
-  void finish(int result) {
-    ls->_did_read(bl);
-  }
-};
-
-
-void LogStream::wait_for_next_event(Context *c)
-{
-  if (read_pos == sync_pos) {
-    dout(-15) << "waiting for sync before initiating read at " << read_pos << endl;
-    wait_for_sync(new C_LS_ReadAfterSync(this, c));
-    return;                 
-  }
-
-  // add waiter
-  if (c) waiting_for_read.push_back(c);
-
-  // issue read
-  off_t tail = read_pos + read_buf.length();
-  size_t size = g_conf.mds_log_read_inc;
-  if (tail + (off_t)size > sync_pos) {
-    size = sync_pos - tail;
-    dout(15) << "wait_for_next_event ugh.. read_pos is " << read_pos << ", tail is " << tail << ", sync_pos only " << sync_pos << ", flush_pos " << flush_pos << ", append_pos " << append_pos << endl;
-    
-    if (size == 0) {
-      //    assert(size > 0);   // bleh, wait for sync, etc.
-      // just do it.  communication is ordered, right?   FIXME SOMEDAY this is totally gross blech
-      //size = flush_pos - tail;
-      // read tiny bit, kill some time
-      assert(flush_pos > sync_pos);
-      size = 1;
-    }
-  }
-
-  dout(15) << "wait_for_next_event reading from pos " << tail << " len " << size << endl;
-  C_LS_ReadChunk *readc = new C_LS_ReadChunk(this);
-  mds->filer->read(log_inode,  
-                   tail, g_conf.mds_log_read_inc, 
-                   &readc->bl,
-                   readc);
-}
-
-
-void LogStream::_did_read(bufferlist& blist)
-{
-  dout(15) << "_did_read got " << blist.length() << " bytes at offset " << (read_pos + read_buf.length()) << endl;
-  read_buf.claim_append(blist);
-
-  list<Context*> finished;
-  finished.splice(finished.begin(), waiting_for_read);
-  finish_contexts(finished, 0);
-}
-
diff --git a/ceph/mds/LogStream.h b/ceph/mds/LogStream.h
deleted file mode 100644 (file)
index 73d79a1..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __LOGSTREAM_H
-#define __LOGSTREAM_H
-
-#include "include/types.h"
-#include "include/Context.h"
-
-#include "include/buffer.h"
-#include "include/bufferlist.h"
-
-#include "common/Clock.h"
-
-#include <map>
-#include <list>
-
-class LogEvent;
-class Filer;
-class MDS;
-class Logger;
-
-class LogStream {
- protected:
-  MDS *mds;
-  Filer *filer;
-  Logger *logger;
-
-  inode_t log_inode;
-
-  // writing
-  off_t sync_pos;        // first non-written byte
-  off_t flush_pos;       // first non-writing byte, beginning of write_buf
-  off_t append_pos;      // where next event will be written
-  bufferlist write_buf;  // unwritten (between flush_pos and append_pos)
-
-  std::map< off_t, bufferlist* >   writing_buffers;
-  std::map< off_t, utime_t >       writing_latency;
-
-  // reading
-  off_t read_pos;        // abs position in file
-  //off_t read_buf_start;  // where read buf begins
-  bufferlist read_buf;
-  bool reading;
-
-  std::map< off_t, std::list<Context*> > waiting_for_sync;
-  std::list<Context*>                    waiting_for_read;
-
-
-  bool autoflush;
-  
- public:
-  LogStream(MDS *mds, Filer *filer, inodeno_t log_ino, Logger *l);
-
-  off_t get_read_pos() { return read_pos; }
-  off_t get_append_pos() { return append_pos; }
-
-  // write (append to end)
-  off_t append(LogEvent *e);          // returns offset it will be written to
-  void  _append_2(off_t off);     
-  void  wait_for_sync(Context *c, off_t off=0);  // wait for flush
-  void  flush();                                 // initiate flush
-  
-  // read (from front)
-  //bool      has_next_event();
-  LogEvent *get_next_event();
-  void      wait_for_next_event(Context *c);
-  void      _did_read(bufferlist& blist);
-
-
-  // old interface
-  /*
-  // WARNING: non-reentrant; single reader only
-  int read_next(LogEvent **le, Context *c, int step=1);
-  void did_read_bit(crope& next_bit, LogEvent **le, Context *c) ;
-
-  int append(LogEvent *e, Context *c);  // append at cur_pos, mind you!
-  */
-};
-
-#endif
index a3e1c4de39ecc9c1f699a11c53c8cd3eb14f0b47..4aff95c51ce5e1840d1cea70ff4ff5e343feaaf3 100644 (file)
@@ -21,7 +21,7 @@
 #include "MDSMap.h"
 #include "MDLog.h"
 #include "MDBalancer.h"
-#include "AnchorTable.h"
+#include "AnchorClient.h"
 
 #include "include/filepath.h"
 
@@ -101,7 +101,7 @@ using namespace std;
 
 #include "config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << "mds" << mds->get_nodeid() << ".cache "
+#define  dout(l)    if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".cache "
 
 
 
@@ -1530,7 +1530,7 @@ void MDCache::open_remote_ino(inodeno_t ino,
   dout(7) << "open_remote_ino on " << ino << endl;
   
   C_MDC_OpenRemoteInoLookup *c = new C_MDC_OpenRemoteInoLookup(this, ino, req, onfinish);
-  mds->anchormgr->lookup(ino, c->anchortrace, c);
+  mds->anchorclient->lookup(ino, c->anchortrace, c);
 }
 
 void MDCache::open_remote_ino_2(inodeno_t ino,
@@ -1856,7 +1856,7 @@ void MDCache::anchor_inode(CInode *in, Context *onfinish)
     in->make_anchor_trace(trace);
     
     // do it
-    mds->anchormgr->create(in->ino(), trace, 
+    mds->anchorclient->create(in->ino(), trace, 
                            new C_MDC_AnchorInode( in ));
   }
 }
@@ -2658,7 +2658,7 @@ void MDCache::dentry_unlink(CDentry *dn, Context *c)
       vector<Anchor*> atrace;
       in->make_anchor_trace(atrace);
       assert(atrace.size() == 1);   // it's dangling
-      mds->anchormgr->update(in->ino(), atrace, 
+      mds->anchorclient->update(in->ino(), atrace, 
                              new C_MDC_DentryUnlink(this, dn, dir, c));
       return;
     }
@@ -2676,7 +2676,7 @@ void MDCache::dentry_unlink(CDentry *dn, Context *c)
         dout(7) << "nlink=1+primary or 0+dangling, removing anchor" << endl;
 
         // remove anchor (async)
-        mds->anchormgr->destroy(dn->inode->ino(), NULL);
+        mds->anchorclient->destroy(dn->inode->ino(), NULL);
       }
     } else {
       int auth = dn->inode->authority();
@@ -2795,7 +2795,7 @@ void MDCache::handle_inode_unlink(MInodeUnlink *m)
       in->mark_clean();       // mark it clean.
       
       // remove anchor (async)
-      mds->anchormgr->destroy(in->ino(), NULL);
+      mds->anchorclient->destroy(in->ino(), NULL);
     }
     else {
       in->mark_dirty();
@@ -2809,7 +2809,7 @@ void MDCache::handle_inode_unlink(MInodeUnlink *m)
       dout(7) << "nlink=1, removing anchor" << endl;
       
       // remove anchor (async)
-      mds->anchormgr->destroy(in->ino(), NULL);
+      mds->anchorclient->destroy(in->ino(), NULL);
     }
   }
 
index 914c16b842b070fe3fe1e398caf47e351b72d26e..e15a0399aa728ed63cfe36e13d436130cf06c0f7 100644 (file)
  * 
  */
 
-
-
-#include "include/types.h"
 #include "MDLog.h"
 #include "MDS.h"
-#include "LogStream.h"
 #include "LogEvent.h"
 
+#include "osdc/LogStreamer.h"
+
 #include "common/LogType.h"
 #include "common/Logger.h"
-#include "msg/Message.h"
-
-LogType mdlog_logtype;
 
 #include "config.h"
 #undef dout
-#define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".log "
+#define  dout(l)    if (l<=g_conf.debug || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".log "
+#define  derr(l)    if (l<=g_conf.debug || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".log "
 
 // cons/des
 
+LogType mdlog_logtype;
+
 MDLog::MDLog(MDS *m) 
 {
   mds = m;
   num_events = 0;
-  max_events = 0;
-
   waiting_for_read = false;
 
+  max_events = g_conf.mds_log_max_len;
+
+  // logger
   char name[80];
   sprintf(name, "mds%d.log", mds->get_nodeid());
-  logger = new Logger(name, (LogType*)&mdlog_logtype);
-
-  logstream = new LogStream(mds, mds->filer, MDS_INO_LOG_OFFSET + mds->get_nodeid(), logger);
+  logger = new Logger(name, &mdlog_logtype);
 
   static bool didit = false;
   if (!didit) {
@@ -57,35 +54,53 @@ MDLog::MDLog(MDS *m)
     mdlog_logtype.add_inc("lsum");
     mdlog_logtype.add_inc("lnum");
   }
+
+  // inode
+  memset(&log_inode, 0, sizeof(log_inode));
+  log_inode.ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
+  log_inode.layout = g_OSD_MDLogLayout;
+  
+  if (g_conf.mds_local_osd) {
+    log_inode.layout.object_layout = OBJECT_LAYOUT_STARTOSD;
+    log_inode.layout.osd = mds->get_nodeid() + 10000;   // hack
+  }
+
+  // log streamer
+  logstreamer = new LogStreamer(log_inode, mds->objecter, logger);
+
 }
 
 
 MDLog::~MDLog()
 {
-  if (logstream) { delete logstream; logstream = 0; }
+  if (logstreamer) { delete logstreamer; logstreamer = 0; }
   if (logger) { delete logger; logger = 0; }
 }
 
 
 
 void MDLog::submit_entry( LogEvent *e,
-                         Context *c ) 
+                         Context *c ) 
 {
   dout(5) << "submit_entry at " << num_events << endl;
 
   if (g_conf.mds_log) {
-    off_t off = logstream->append(e);
+    // encode and append
+    bufferlist bl;
+    e->encode(bl);
+    logstreamer->append_entry(bl);
+
     delete e;
     num_events++;
 
     logger->inc("add");
     logger->set("size", num_events);
-    logger->set("append", logstream->get_append_pos());
+    logger->set("append", logstreamer->get_write_pos());
 
     if (c) 
-      logstream->wait_for_sync(c, off);
+      logstreamer->flush(c);
   } else {
-    // hack: log is disabled..
+    // hack: log is disabled.
     if (c) {
       c->finish(0);
       delete c;
@@ -97,9 +112,9 @@ void MDLog::wait_for_sync( Context *c )
 {
   if (g_conf.mds_log) {
     // wait
-    logstream->wait_for_sync(c);
+    logstreamer->flush(c);
   } else {
-    // hack: bypass
+    // hack: bypass.
     c->finish(0);
     delete c;
   }
@@ -107,7 +122,7 @@ void MDLog::wait_for_sync( Context *c )
 
 void MDLog::flush()
 {
-  logstream->flush();
+  logstreamer->flush();
 
   // trim
   trim(NULL);
@@ -151,75 +166,86 @@ void MDLog::_did_read()
   trim(0);
 }
 
+void MDLog::_trimmed(LogEvent *le) 
+{
+  dout(7) << "  trimmed " << le << endl;
+  trimming.erase(le);
+  delete le;
+  logger->set("trim", trimming.size());
+  logger->set("read", logstreamer->get_read_pos());
+  trim(0);
+}
+
+
+
 void MDLog::trim(Context *c)
 {
   // add waiter
-  if (c) trim_waiters.push_back(c);
+  if (c) 
+    trim_waiters.push_back(c);
 
   // trim!
   while (num_events > max_events) {
     
+    off_t gap = logstreamer->get_write_pos() - logstreamer->get_read_pos();
+    dout(5) << "trim num_events " << num_events << " > max " << max_events
+           << ", trimming " << trimming.size()
+           << ", byte gap " << gap
+           << endl;
+
     if ((int)trimming.size() >= g_conf.mds_log_max_trimming) {
-      dout(7) << "  already trimming max, waiting" << endl;
+      dout(7) << "trim  already trimming max, waiting" << endl;
       return;
     }
-
-    off_t gap = logstream->get_append_pos() - logstream->get_read_pos();
-    dout(5) << "trim: num_events " << num_events << " - trimming " << trimming.size() << " > max " << max_events << " .. gap " << gap << endl;
     
-    LogEvent *le = logstream->get_next_event();
-
-    if (le) {
+    bufferlist bl;
+    if (logstreamer->try_read_entry(bl)) {
+      // decode logevent
+      LogEvent *le = LogEvent::decode(bl);
       num_events--;
 
       // we just read an event.
       if (le->obsolete(mds) == true) {
         // obsolete
-        dout(7) << "  obsolete " << le << endl;
+        dout(7) << "trim  obsolete " << le << endl;
         delete le;
         logger->inc("obs");
       } else {
         assert ((int)trimming.size() < g_conf.mds_log_max_trimming);
 
         // trim!
-        dout(7) << "  trimming " << le << endl;
+        dout(7) << "trim  trimming " << le << endl;
         trimming.insert(le);
         le->retire(mds, new C_MDL_Trimmed(this, le));
         logger->inc("retire");
         logger->set("trim", trimming.size());
       }
-      logger->set("read", logstream->get_read_pos());
+      logger->set("read", logstreamer->get_read_pos());
       logger->set("size", num_events);
     } else {
       // need to read!
       if (!waiting_for_read) {
         waiting_for_read = true;
-        dout(7) << "  waiting for read" << endl;
-        logstream->wait_for_next_event(new C_MDL_Reading(this));
+        dout(7) << "trim  waiting for read" << endl;
+        logstreamer->wait_for_readable(new C_MDL_Reading(this));
       } else {
-        dout(7) << "  already waiting for read" << endl;
+        dout(7) << "trim  already waiting for read" << endl;
       }
       return;
     }
   }
+
+  dout(5) << "trim num_events " << num_events << " <= max " << max_events
+         << ", trimming " << trimming.size()
+         << ", done for now."
+         << endl;
   
   // trimmed!
-  list<Context*> finished;
-  finished.splice(finished.begin(), trim_waiters);
-  
+  std::list<Context*> finished;
+  finished.swap(trim_waiters);
   finish_contexts(finished, 0);
 }
 
-void MDLog::_trimmed(LogEvent *le) 
-{
-  dout(7) << "  trimmed " << le << endl;
-  trimming.erase(le);
-  delete le;
-  logger->set("trim", trimming.size());
-  logger->set("read", logstream->get_read_pos());
-  trim(0);
-}
-
 
index 1a02bf376561e7216ec1c55920bf112b75d84dd7..63aa226084fb958a12bc40077b382ef20a468351 100644 (file)
 #ifndef __MDLOG_H
 #define __MDLOG_H
 
-/*
-
-hmm, some things that go in the MDS log:
-
-
-prepare + commit versions of many of these?
-
-- inode update
- ???  entry will include mdloc_t of dir it resides in... 
-
-- directory operation
-  unlink,
-  rename= atomic link+unlink (across multiple dirs, possibly...)
-
-- import
-- export
-
-
-*/
-
-#include "../include/Context.h"
+#include "include/types.h"
+#include "include/Context.h"
 
 #include <list>
 
-using namespace std;
-
 #include <ext/hash_set>
 using namespace __gnu_cxx;
 
-class LogStream;
+class LogStreamer;
 class LogEvent;
 class MDS;
 
@@ -66,10 +45,11 @@ class MDLog {
   size_t num_events; // in events
   size_t max_events;
 
-  LogStream *logstream;
+  inode_t log_inode;
+  LogStreamer *logstreamer;
   
   hash_set<LogEvent*>  trimming;     // events currently being trimmed
-  list<Context*>  trim_waiters;   // contexts waiting for trim
+  std::list<Context*>  trim_waiters;   // contexts waiting for trim
   bool            trim_reading;
 
   bool waiting_for_read;
index 4e4379e52f1feda1783a10051d3b0a625e8e5272..81a45d2ad7dd72f11e9c7561e64b15ff05de7043 100644 (file)
@@ -30,7 +30,9 @@
 #include "MDLog.h"
 #include "MDBalancer.h"
 #include "IdAllocator.h"
+
 #include "AnchorTable.h"
+#include "AnchorClient.h"
 
 #include "include/filepath.h"
 
@@ -90,23 +92,30 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) {
   monmap = mm;
   messenger = m;
 
-  mdsmap = 0;
+  mdsmap = new MDSMap;
+
+  objecter = new Objecter(messenger, monmap, osdmap);
+  filer = new Filer(objecter);
 
   mdcache = new MDCache(this);
   mdstore = new MDStore(this);
   mdlog = new MDLog(this);
   balancer = new MDBalancer(this);
-  anchormgr = new AnchorTable(this);
+
+  anchorclient = new AnchorClient(messenger, mdsmap);
+
+  // hack
+  if (whoami == 0) 
+    anchormgr = new AnchorTable(this);
+  else
+    anchormgr = 0;
 
   req_rate = 0;
 
 
   osdmap = 0;
 
-  objecter = new Objecter(messenger, monmap, osdmap);
-  filer = new Filer(objecter);
 
-  mdlog->set_max_events(g_conf.mds_log_max_len);
 
   shutting_down = false;
   shut_down = false;
@@ -180,6 +189,7 @@ MDS::~MDS() {
   if (balancer) { delete balancer; balancer = NULL; }
   if (idalloc) { delete idalloc; idalloc = NULL; }
   if (anchormgr) { delete anchormgr; anchormgr = NULL; }
+  if (anchorclient) { delete anchorclient; anchorclient = NULL; }
   if (osdmap) { delete osdmap; osdmap = 0; }
 
   if (filer) { delete filer; filer = 0; }
@@ -472,6 +482,9 @@ void MDS::my_dispatch(Message *m)
   case MDS_PORT_ANCHORMGR:
     anchormgr->proc_message(m);
     break;
+  case MDS_PORT_ANCHORCLIENT:
+    anchorclient->dispatch(m);
+    break;
     
   case MDS_PORT_CACHE:
     mdcache->proc_message(m);
@@ -648,9 +661,6 @@ void MDS::my_dispatch(Message *m)
 
 void MDS::handle_mds_map(MMDSMap *m)
 {
-  if (!mdsmap) 
-    mdsmap = new MDSMap;
-    
   map<epoch_t, bufferlist>::reverse_iterator p = m->maps.rbegin();
 
   dout(1) << "handle_mds_map epoch " << p->first << endl;
@@ -715,6 +725,10 @@ void MDS::handle_client_mount(MClientMount *m)
       // fake out idalloc (reset, pretend loaded)
       idalloc->reset();
 
+      // fake out anchortable
+      if (mdsmap->get_anchortable() == whoami)
+       anchormgr->reset();
+
       // init osds too
       //mkfs(new C_MDS_Unpause(this));
       //waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m));
@@ -850,7 +864,7 @@ void MDS::commit_request(MClientRequest *req,
   if (event) mdlog->submit_entry(event);
   if (event2) mdlog->submit_entry(event2);
   
-  if (g_conf.mds_log_before_reply && g_conf.mds_log) {
+  if (g_conf.mds_log_before_reply && g_conf.mds_log && event) {
     // SAFE mode!
 
     // pin inode so it doesn't go away!
index d44344d3f4353fe479878739a11926ab23f4b324..96ac1746e157129fb1e380aa06d8da6b961d5c9d 100644 (file)
@@ -45,7 +45,8 @@ using namespace __gnu_cxx;
 #define MDS_PORT_STORE    102
 #define MDS_PORT_BALANCER 103
 
-#define MDS_PORT_ANCHORMGR 200
+#define MDS_PORT_ANCHORCLIENT 200
+#define MDS_PORT_ANCHORMGR    201
 
 #define MDS_PORT_OSDMON    300
 #define MDS_PORT_PGMGR     301
@@ -74,6 +75,7 @@ class Objecter;
 class Filer;
 
 class AnchorTable;
+class AnchorClient;
 class CInode;
 class CDir;
 class CDentry;
@@ -115,6 +117,7 @@ class MDS : public Dispatcher {
   Objecter     *objecter;
   Filer        *filer;       // for reading/writing to/from osds
   AnchorTable  *anchormgr;
+  AnchorClient *anchorclient;
   //  PGManager    *pgmanager;
 
   ClientMap    clientmap;
index f9ee5c71cb4acf820636844a80aea3c1707efc7d..6117e6943d3c798fd675848a1a538c2d101deb97 100644 (file)
@@ -30,6 +30,8 @@ class MDSMap {
   epoch_t epoch;
   utime_t ctime;
 
+  int anchortable;
+
   set<int> all_mds;
   set<int> down_mds;
   map<int,entity_inst_t> mds_inst;
@@ -37,13 +39,15 @@ class MDSMap {
   friend class MDSMonitor;
 
  public:
-  MDSMap() : epoch(0) {}
+  MDSMap() : epoch(0), anchortable(0) {}
 
   epoch_t get_epoch() const { return epoch; }
   void inc_epoch() { epoch++; }
 
   const utime_t& get_ctime() const { return ctime; }
 
+  int get_anchortable() const { return anchortable; }
+
   int get_num_mds() const { return all_mds.size(); }
   int get_num_up_mds() const { return all_mds.size() - down_mds.size(); }
 
@@ -69,6 +73,7 @@ class MDSMap {
   void encode(bufferlist& blist) {
     blist.append((char*)&epoch, sizeof(epoch));
     blist.append((char*)&ctime, sizeof(ctime));
+    blist.append((char*)&anchortable, sizeof(anchortable));
     
     _encode(all_mds, blist);
     _encode(down_mds, blist);
@@ -81,6 +86,8 @@ class MDSMap {
     off += sizeof(epoch);
     blist.copy(off, sizeof(ctime), (char*)&ctime);
     off += sizeof(ctime);
+    blist.copy(off, sizeof(anchortable), (char*)&anchortable);
+    off += sizeof(anchortable);
     
     _decode(all_mds, blist, off);
     _decode(down_mds, blist, off);
index 07d5cb752671fcd19422833d9fff1f4ed2af74ac..0186118f53260eef885585c032aa12b1db8d3d6d 100644 (file)
@@ -45,28 +45,29 @@ class MAnchorReply : public Message {
   inodeno_t get_ino() { return ino; }
   vector<Anchor*>& get_trace() { return trace; }
 
-  virtual void decode_payload(crope& s, int& off) {
-    s.copy(off, sizeof(op), (char*)&op);
+  virtual void decode_payload() {
+    int off = 0;
+    payload.copy(off, sizeof(op), (char*)&op);
     off += sizeof(op);
-    s.copy(off, sizeof(ino), (char*)&ino);
+    payload.copy(off, sizeof(ino), (char*)&ino);
     off += sizeof(ino);
     int n;
-    s.copy(off, sizeof(int), (char*)&n);
+    payload.copy(off, sizeof(int), (char*)&n);
     off += sizeof(int);
     for (int i=0; i<n; i++) {
       Anchor *a = new Anchor;
-      a->_unrope(s, off);
+      a->_decode(payload, off);
       trace.push_back(a);
     }
   }
 
-  virtual void encode_payload(crope& r) {
-    r.append((char*)&op, sizeof(op));
-    r.append((char*)&ino, sizeof(ino));
+  virtual void encode_payload() {
+    payload.append((char*)&op, sizeof(op));
+    payload.append((char*)&ino, sizeof(ino));
     int n = trace.size();
-    r.append((char*)&n, sizeof(int));
+    payload.append((char*)&n, sizeof(int));
     for (int i=0; i<n; i++) 
-      trace[i]->_rope(r);
+      trace[i]->_encode(payload);
   }
 };
 
index 48a6ea335eceafab3a0ed182c80f451581b3a4ec..2a2d0088978b4e7b42a293d5e96d1f7d2a1ea79e 100644 (file)
@@ -47,28 +47,29 @@ class MAnchorRequest : public Message {
   inodeno_t get_ino() { return ino; }
   vector<Anchor*>& get_trace() { return trace; }
 
-  virtual void decode_payload(crope& s, int& off) {
-    s.copy(off, sizeof(op), (char*)&op);
+  virtual void decode_payload() {
+    int off = 0;
+    payload.copy(off, sizeof(op), (char*)&op);
     off += sizeof(op);
-    s.copy(off, sizeof(ino), (char*)&ino);
+    payload.copy(off, sizeof(ino), (char*)&ino);
     off += sizeof(ino);
     int n;
-    s.copy(off, sizeof(int), (char*)&n);
+    payload.copy(off, sizeof(int), (char*)&n);
     off += sizeof(int);
     for (int i=0; i<n; i++) {
       Anchor *a = new Anchor;
-      a->_unrope(s, off);
+      a->_decode(payload, off);
       trace.push_back(a);
     }
   }
 
-  virtual void encode_payload(crope& r) {
-    r.append((char*)&op, sizeof(op));
-    r.append((char*)&ino, sizeof(ino));
+  virtual void encode_payload() {
+    payload.append((char*)&op, sizeof(op));
+    payload.append((char*)&ino, sizeof(ino));
     int n = trace.size();
-    r.append((char*)&n, sizeof(int));
+    payload.append((char*)&n, sizeof(int));
     for (int i=0; i<n; i++) 
-      trace[i]->_rope(r);
+      trace[i]->_encode(payload);
   }
 };
 
index e60915457f8621c39251a3171239aac1c40e8760..7c475a2b774a0d8e0a1a9ddb6460279bd7b668d3 100644 (file)
@@ -94,6 +94,11 @@ void LogStreamer::flush(Context *onsync)
   if (write_pos == flush_pos) {
     assert(write_buf.length() == 0);
     dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << endl;
+
+    if (onsync) {
+      onsync->finish(0);
+      delete onsync;
+    }
     return;
   }