]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: refactor Anchor{Table,Client} into MDSTable{Server,Client}
authorSage Weil <sage@newdream.net>
Wed, 16 Jul 2008 20:46:09 +0000 (13:46 -0700)
committerSage Weil <sage@newdream.net>
Wed, 16 Jul 2008 20:46:09 +0000 (13:46 -0700)
36 files changed:
src/Makefile.am
src/mds/Anchor.h
src/mds/AnchorClient.cc
src/mds/AnchorClient.h
src/mds/AnchorServer.cc [new file with mode: 0644]
src/mds/AnchorServer.h [new file with mode: 0644]
src/mds/AnchorTable.cc [deleted file]
src/mds/AnchorTable.h [deleted file]
src/mds/CInode.cc
src/mds/LogEvent.cc
src/mds/LogEvent.h
src/mds/LogSegment.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/MDS.cc
src/mds/MDS.h
src/mds/MDSTable.h
src/mds/MDSTableClient.cc [new file with mode: 0644]
src/mds/MDSTableClient.h [new file with mode: 0644]
src/mds/MDSTableServer.cc [new file with mode: 0644]
src/mds/MDSTableServer.h [new file with mode: 0644]
src/mds/Server.cc
src/mds/SnapClient.h [new file with mode: 0644]
src/mds/events/EAnchor.h [deleted file]
src/mds/events/EAnchorClient.h [deleted file]
src/mds/events/EMetaBlob.h
src/mds/events/ESnap.h [deleted file]
src/mds/events/ETableClient.h [new file with mode: 0644]
src/mds/events/ETableServer.h [new file with mode: 0644]
src/mds/journal.cc
src/mds/mds_table_types.h [new file with mode: 0644]
src/mds/mdstypes.h
src/messages/MAnchor.h [deleted file]
src/messages/MMDSTableRequest.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h

index eaad2d0f19dfd07341d1a04c30ca6a578043fc13..5583879187f4fa6413a40ac631c0222cfd72155b 100644 (file)
@@ -201,11 +201,13 @@ libmds_a_SOURCES = \
        mds/CDentry.cc \
        mds/CDir.cc \
        mds/CInode.cc \
-       mds/AnchorTable.cc \
-       mds/AnchorClient.cc \
        mds/LogEvent.cc \
        mds/MDSTable.cc \
        mds/IdAllocator.cc \
+       mds/MDSTableClient.cc \
+       mds/MDSTableServer.cc \
+       mds/AnchorServer.cc \
+       mds/AnchorClient.cc \
        mds/SnapTable.cc \
        mds/snap.cc \
        mds/SessionMap.cc \
@@ -336,7 +338,6 @@ noinst_HEADERS = \
        kernel/super.h\
        mds/Anchor.h\
        mds/AnchorClient.h\
-       mds/AnchorTable.h\
        mds/CDentry.h\
        mds/CDir.h\
        mds/IdAllocator.h\
@@ -347,8 +348,6 @@ noinst_HEADERS = \
        mds/Migrator.h\
        mds/ScatterLock.h\
        mds/SimpleLock.h\
-       mds/events/EAnchor.h\
-       mds/events/EAnchorClient.h\
        mds/events/ESessions.h\
        mds/events/EUpdate.h\
        mds/events/EExport.h\
@@ -443,8 +442,8 @@ noinst_HEADERS = \
        messages/MOSDBoot.h\
        messages/MOSDPGCreate.h\
        messages/MStatfsReply.h\
-       messages/MAnchor.h\
        messages/MCacheExpire.h\
+       messages/MMDSTableRequest.h\
        messages/MDiscoverReply.h\
        messages/MOSDFailure.h\
        messages/MExportDir.h\
index af4165d81828df28f53aecb9427ba2bc5870662c..214ac3bf94ad880529a199a9d19c4866bc1d917e 100644 (file)
@@ -22,40 +22,17 @@ using std::string;
 #include "mdstypes.h"
 #include "include/buffer.h"
 
-// anchor ops
-#define ANCHOR_OP_LOOKUP          1
-#define ANCHOR_OP_LOOKUP_REPLY    -2
-
-#define ANCHOR_OP_CREATE_PREPARE  11
-#define ANCHOR_OP_CREATE_AGREE    -12
-
-#define ANCHOR_OP_DESTROY_PREPARE 21
-#define ANCHOR_OP_DESTROY_AGREE   -22
-
-#define ANCHOR_OP_UPDATE_PREPARE  31
-#define ANCHOR_OP_UPDATE_AGREE    -32
-
-#define ANCHOR_OP_COMMIT   41
-#define ANCHOR_OP_ACK      -42
-#define ANCHOR_OP_ROLLBACK 43
-
-
+enum {
+  ANCHOR_OP_CREATE,
+  ANCHOR_OP_DESTROY,
+  ANCHOR_OP_UPDATE,
+};
 
 inline const char* get_anchor_opname(int o) {
   switch (o) {
-  case ANCHOR_OP_LOOKUP: return "lookup";
-  case ANCHOR_OP_LOOKUP_REPLY: return "lookup_reply";
-
-  case ANCHOR_OP_CREATE_PREPARE: return "create_prepare";
-  case ANCHOR_OP_CREATE_AGREE: return "create_agree";
-  case ANCHOR_OP_DESTROY_PREPARE: return "destroy_prepare";
-  case ANCHOR_OP_DESTROY_AGREE: return "destroy_agree";
-  case ANCHOR_OP_UPDATE_PREPARE: return "update_prepare";
-  case ANCHOR_OP_UPDATE_AGREE: return "update_agree";
-
-  case ANCHOR_OP_COMMIT: return "commit";
-  case ANCHOR_OP_ACK: return "ack";
-  case ANCHOR_OP_ROLLBACK: return "rollback";
+  case ANCHOR_OP_CREATE: return "create";
+  case ANCHOR_OP_DESTROY: return "destroy";
+  case ANCHOR_OP_UPDATE: return "update";
   default: assert(0); return 0;
   }
 }
index 1cc18dc7d8fa4d3205c8c9a8d895ee84db34b22e..830074d095feefaf4ed430de915e25a1c56d5bc3 100644 (file)
 using std::cout;
 using std::cerr;
 
-#include "Anchor.h"
 #include "AnchorClient.h"
 #include "MDSMap.h"
-
-#include "include/Context.h"
-#include "msg/Messenger.h"
-
-#include "MDS.h"
-#include "MDLog.h"
 #include "LogSegment.h"
+#include "MDS.h"
+#include "msg/Messenger.h"
 
-#include "events/EAnchorClient.h"
-#include "messages/MAnchor.h"
+#include "messages/MMDSTableRequest.h"
 
 #include "config.h"
 
@@ -36,235 +30,83 @@ using std::cerr;
 #define derr(x)  if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchorclient "
 
 
-void AnchorClient::dispatch(Message *m)
-{
-  switch (m->get_type()) {
-  case MSG_MDS_ANCHOR:
-    handle_anchor_reply((MAnchor*)m);
-    break;
 
-  default:
-    assert(0);
-  }
-}
+// LOOKUPS
 
-void AnchorClient::handle_anchor_reply(class MAnchor *m)
+void AnchorClient::handle_query_result(class MMDSTableRequest *m)
 {
-  inodeno_t ino = m->get_ino();
-  version_t atid = m->get_atid();
-
   dout(10) << "handle_anchor_reply " << *m << dendl;
 
-  switch (m->get_op()) {
-
-    // lookup
-  case ANCHOR_OP_LOOKUP_REPLY:
-    assert(pending_lookup.count(ino));
-    {
-      *pending_lookup[ino].trace = m->get_trace();
-      Context *onfinish = pending_lookup[ino].onfinish;
-      pending_lookup.erase(ino);
-      
-      if (onfinish) {
-        onfinish->finish(0);
-        delete onfinish;
-      }
-    }
-    break;
+  inodeno_t ino;
+  vector<Anchor> trace;
 
-    // prepare -> agree
-  case ANCHOR_OP_CREATE_AGREE:
-    if (pending_create_prepare.count(ino)) {
-      dout(10) << "got create_agree on " << ino << " atid " << atid << dendl;
-      Context *onfinish = pending_create_prepare[ino].onfinish;
-      *pending_create_prepare[ino].patid = atid;
-      pending_create_prepare.erase(ino);
+  bufferlist::iterator p = m->bl.begin();
+  ::decode(ino, p);
 
-      if (onfinish) {
-        onfinish->finish(0);
-        delete onfinish;
-      }
-    } 
-    else if (pending_commit.count(atid)) {
-      dout(10) << "stray create_agree on " << ino
-              << " atid " << atid
-              << ", already committing, resending COMMIT"
-              << dendl;      
-      MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
-      mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-    }
-    else {
-      dout(10) << "stray create_agree on " << ino
-              << " atid " << atid
-              << ", sending ROLLBACK"
-              << dendl;      
-      MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
-      mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-    }
-    break;
-
-  case ANCHOR_OP_DESTROY_AGREE:
-    if (pending_destroy_prepare.count(ino)) {
-      dout(10) << "got destroy_agree on " << ino << " atid " << atid << dendl;
-      Context *onfinish = pending_destroy_prepare[ino].onfinish;
-      *pending_destroy_prepare[ino].patid = atid;
-      pending_destroy_prepare.erase(ino);
-
-      if (onfinish) {
-        onfinish->finish(0);
-        delete onfinish;
-      }
-    } 
-    else if (pending_commit.count(atid)) {
-      dout(10) << "stray destroy_agree on " << ino
-              << " atid " << atid
-              << ", already committing, resending COMMIT"
-              << dendl;      
-      MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
-      mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-    }
-    else {
-      dout(10) << "stray destroy_agree on " << ino
-              << " atid " << atid
-              << ", sending ROLLBACK"
-              << dendl;      
-      MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
-      mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-    }
-    break;
-
-  case ANCHOR_OP_UPDATE_AGREE:
-    if (pending_update_prepare.count(ino)) {
-      dout(10) << "got update_agree on " << ino << " atid " << atid << dendl;
-      Context *onfinish = pending_update_prepare[ino].onfinish;
-      *pending_update_prepare[ino].patid = atid;
-      pending_update_prepare.erase(ino);
-
-      if (onfinish) {
-        onfinish->finish(0);
-        delete onfinish;
-      }
-    }
-    else if (pending_commit.count(atid)) {
-      dout(10) << "stray update_agree on " << ino
-              << " atid " << atid
-              << ", already committing, resending COMMIT"
-              << dendl;      
-      MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
-      mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-    }
-    else {
-      dout(10) << "stray update_agree on " << ino
-              << " atid " << atid
-              << ", sending ROLLBACK"
-              << dendl;      
-      MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
-      mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-    }
-    break;
-
-    // commit -> ack
-  case ANCHOR_OP_ACK:
-    {
-      dout(10) << "got ack on atid " << atid << ", logging" << dendl;
-
-      // remove from committing list
-      assert(pending_commit.count(atid));
-      assert(pending_commit[atid]->pending_commit_atids.count(atid));
-      
-      // log ACK.
-      mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid),
-                              new C_LoggedAck(this, atid));
-    }
-    break;
-
-  default:
-    assert(0);
+  assert(pending_lookup.count(ino));
+  ::decode(*pending_lookup[ino].trace, p);
+  Context *onfinish = pending_lookup[ino].onfinish;
+  pending_lookup.erase(ino);
+  
+  if (onfinish) {
+    onfinish->finish(0);
+    delete onfinish;
   }
 
   delete m;
 }
 
-
-void AnchorClient::_logged_ack(version_t atid)
+void AnchorClient::resend_queries()
 {
-  dout(10) << "_logged_ack" << dendl;
-
-  assert(pending_commit.count(atid));
-  assert(pending_commit[atid]->pending_commit_atids.count(atid));
-  
-  pending_commit[atid]->pending_commit_atids.erase(atid);
-  pending_commit.erase(atid);
-  
-  // kick any waiters (LogSegment trim)
-  if (ack_waiters.count(atid)) {
-    dout(15) << "kicking ack waiters on atid " << atid << dendl;
-    mds->queue_waiters(ack_waiters[atid]);
-    ack_waiters.erase(atid);
+  // resend any pending lookups.
+  for (hash_map<inodeno_t, _pending_lookup>::iterator p = pending_lookup.begin();
+       p != pending_lookup.end();
+       p++) {
+    dout(10) << "resending lookup on " << p->first << dendl;
+    _lookup(p->first);
   }
 }
 
-
-/*
- * public async interface
- */
-
-
-/*
- * FIXME: we need to be able to resubmit messages if the anchortable mds fails.
- */
-
-
 void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
 {
-  // send message
-  MAnchor *req = new MAnchor(ANCHOR_OP_LOOKUP, ino);
-
   assert(pending_lookup.count(ino) == 0);
-  pending_lookup[ino].onfinish = onfinish;
-  pending_lookup[ino].trace = &trace;
+  _pending_lookup& l = pending_lookup[ino];
+  l.onfinish = onfinish;
+  l.trace = &trace;
+  _lookup(ino);
+}
 
-  mds->send_message_mds(req, 
-                       mds->mdsmap->get_anchortable());
+void AnchorClient::_lookup(inodeno_t ino)
+{
+  MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_QUERY, 0, 0);
+  ::encode(ino, req->bl);
+  mds->send_message_mds(req, mds->mdsmap->get_anchortable());
 }
 
 
-// PREPARE
+// FRIENDLY PREPARE
 
 void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace, 
                                  version_t *patid, Context *onfinish)
 {
   dout(10) << "prepare_create " << ino << " " << trace << dendl;
-
-  // send message
-  MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_PREPARE, ino);
-  req->set_trace(trace);
-
-  pending_create_prepare[ino].trace = trace;
-  pending_create_prepare[ino].patid = patid;
-  pending_create_prepare[ino].onfinish = onfinish;
-
-  mds->send_message_mds(req, 
-                       mds->mdsmap->get_anchortable());
+  bufferlist bl;
+  __u32 op = ANCHOR_OP_CREATE;
+  ::encode(op, bl);
+  ::encode(ino, bl);
+  ::encode(trace, bl);
+  _prepare(bl, patid, onfinish);
 }
 
 void AnchorClient::prepare_destroy(inodeno_t ino, 
-                                 version_t *patid, Context *onfinish)
+                                  version_t *patid, Context *onfinish)
 {
   dout(10) << "prepare_destroy " << ino << dendl;
-
-  // send message
-  MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_PREPARE, ino);
-  pending_destroy_prepare[ino].onfinish = onfinish;
-  pending_destroy_prepare[ino].patid = patid;
-  mds->messenger->send_message(req, 
-                              mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
+  bufferlist bl;
+  __u32 op = ANCHOR_OP_DESTROY;
+  ::encode(op, bl);
+  ::encode(ino, bl);
+  _prepare(bl, patid, onfinish);
 }
 
 
@@ -272,94 +114,9 @@ void AnchorClient::prepare_update(inodeno_t ino, vector<Anchor>& trace,
                                  version_t *patid, Context *onfinish)
 {
   dout(10) << "prepare_update " << ino << " " << trace << dendl;
-
-  // send message
-  MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_PREPARE, ino);
-  req->set_trace(trace);
-  
-  pending_update_prepare[ino].trace = trace;
-  pending_update_prepare[ino].patid = patid;
-  pending_update_prepare[ino].onfinish = onfinish;
-  
-  mds->messenger->send_message(req, 
-                              mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-}
-
-
-// COMMIT
-
-void AnchorClient::commit(version_t atid, LogSegment *ls)
-{
-  dout(10) << "commit " << atid << dendl;
-
-  assert(pending_commit.count(atid) == 0);
-  pending_commit[atid] = ls;
-  ls->pending_commit_atids.insert(atid);
-
-  // send message
-  MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
-  mds->messenger->send_message(req, 
-                              mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-}
-
-
-
-// RECOVERY
-
-void AnchorClient::finish_recovery()
-{
-  dout(7) << "finish_recovery" << dendl;
-
-  resend_commits();
-}
-
-void AnchorClient::resend_commits()
-{
-  for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
-       p != pending_commit.end();
-       ++p) {
-    dout(10) << "resending commit on " << p->first << dendl;
-    MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first);
-    mds->send_message_mds(req, 
-                         mds->mdsmap->get_anchortable());
-  }
-}
-
-void AnchorClient::resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op)
-{
-  for (hash_map<inodeno_t, _pending_prepare>::iterator p = prepares.begin();
-       p != prepares.end();
-       p++) {
-    dout(10) << "resending " << get_anchor_opname(op) << " on " << p->first << dendl;
-    MAnchor *req = new MAnchor(op, p->first);
-    req->set_trace(p->second.trace);
-    mds->send_message_mds(req, 
-                         mds->mdsmap->get_anchortable());
-  } 
-}
-
-
-void AnchorClient::handle_mds_recovery(int who)
-{
-  dout(7) << "handle_mds_recovery mds" << who << dendl;
-
-  if (who != mds->mdsmap->get_anchortable()) 
-    return; // do nothing.
-
-  // resend any pending lookups.
-  for (hash_map<inodeno_t, _pending_lookup>::iterator p = pending_lookup.begin();
-       p != pending_lookup.end();
-       p++) {
-    dout(10) << "resending lookup on " << p->first << dendl;
-    mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first),
-                         mds->mdsmap->get_anchortable());
-  }
-  
-  // resend any pending prepares.
-  resend_prepares(pending_create_prepare, ANCHOR_OP_CREATE_PREPARE);
-  resend_prepares(pending_update_prepare, ANCHOR_OP_UPDATE_PREPARE);
-  resend_prepares(pending_destroy_prepare, ANCHOR_OP_DESTROY_PREPARE);
-
-  // resend any pending commits.
-  resend_commits();
+  bufferlist bl;
+  __u32 op = ANCHOR_OP_UPDATE;
+  ::encode(op, bl);
+  ::encode(ino, bl);
+  _prepare(bl, patid, onfinish);
 }
index fd790f39c399d783c5230e2f8f2fea627d738ccd..c437cb41d5f2e7188b3ae5924c5f026e527604c4 100644 (file)
 #ifndef __ANCHORCLIENT_H
 #define __ANCHORCLIENT_H
 
-#include <vector>
-using std::vector;
 #include <ext/hash_map>
 using __gnu_cxx::hash_map;
 
-#include "include/types.h"
-#include "msg/Dispatcher.h"
-
+#include "MDSTableClient.h"
 #include "Anchor.h"
 
 class Context;
 class MDS;
 class LogSegment;
 
-class AnchorClient : public Dispatcher {
-  MDS *mds;
-
+class AnchorClient : public MDSTableClient {
   // lookups
-  struct _pending_lookup { 
+  struct _pending_lookup {
     vector<Anchor> *trace;
     Context *onfinish;
   };
   hash_map<inodeno_t, _pending_lookup> pending_lookup;
 
-  // prepares
-  struct _pending_prepare { 
-    vector<Anchor> trace;
-    Context *onfinish;
-    version_t *patid;  // ptr to atid
-  };
-  hash_map<inodeno_t, _pending_prepare> pending_create_prepare;
-  hash_map<inodeno_t, _pending_prepare> pending_destroy_prepare;
-  hash_map<inodeno_t, _pending_prepare> pending_update_prepare;
-
-  // pending commits
-  map<version_t, LogSegment*> pending_commit;
-  map<version_t, list<Context*> > ack_waiters;
-
-  void handle_anchor_reply(class MAnchor *m);  
-
-  class C_LoggedAck : public Context {
-    AnchorClient *ac;
-    version_t atid;
-  public:
-    C_LoggedAck(AnchorClient *a, version_t t) : ac(a), atid(t) {}
-    void finish(int r) {
-      ac->_logged_ack(atid);
-    }
-  };
-  void _logged_ack(version_t atid);
-
 public:
-  AnchorClient(MDS *m) : mds(m) {}
+  AnchorClient(MDS *m) : MDSTableClient(m, TABLE_ANCHOR) {}
   
-  void dispatch(Message *m);
-
-  // async user interface
+  void handle_query_result(MMDSTableRequest *m);
   void lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
+  void _lookup(inodeno_t ino);
+  void resend_queries();
 
   void prepare_create(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
   void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish);
   void prepare_update(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
-
-  void commit(version_t atid, LogSegment *ls);
-
-  // for recovery (by other nodes)
-  void handle_mds_recovery(int mds); // called when someone else recovers
-
-  void resend_commits();
-  void resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op);
-
-  // for recovery (by me)
-  void got_journaled_agree(version_t atid, LogSegment *ls) {
-    pending_commit[atid] = ls;
-  }
-  void got_journaled_ack(version_t atid) {
-    pending_commit.erase(atid);
-  }
-  bool has_committed(version_t atid) {
-    return pending_commit.count(atid) == 0;
-  }
-  void wait_for_ack(version_t atid, Context *c) {
-    ack_waiters[atid].push_back(c);
-  }
-  void finish_recovery();                // called when i recover and go active
-
-
 };
 
 #endif
diff --git a/src/mds/AnchorServer.cc b/src/mds/AnchorServer.cc
new file mode 100644 (file)
index 0000000..8e37fe2
--- /dev/null
@@ -0,0 +1,271 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "AnchorServer.h"
+#include "MDS.h"
+#include "msg/Messenger.h"
+#include "messages/MMDSTableRequest.h"
+
+#define dout(x)  if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchorserver "
+#define derr(x)  if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchorserver "
+
+// table
+
+void AnchorServer::init_inode()
+{
+  ino = MDS_INO_ANCHORTABLE;
+  layout = g_default_file_layout;
+}
+
+void AnchorServer::reset_state()
+{
+  anchor_map.clear();
+  pending_create.clear();
+  pending_destroy.clear();
+  pending_update.clear();
+  pending_for_mds.clear();
+}
+
+void AnchorServer::dump()
+{
+  dout(7) << "dump v " << version << dendl;
+  for (hash_map<inodeno_t, Anchor>::iterator it = anchor_map.begin();
+       it != anchor_map.end();
+       it++) 
+    dout(15) << "dump " << it->second << dendl;
+}
+
+
+
+/*
+ * basic updates
+ */
+
+bool AnchorServer::add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash) 
+{
+  //dout(17) << "add " << ino << " dirfrag " << dirfrag << dendl;
+  
+  // parent should be there
+  assert(dirino < MDS_INO_BASE ||     // base case,
+         anchor_map.count(dirino));   // or have it
+  
+  if (anchor_map.count(ino) == 0) {
+    // new item
+    anchor_map[ino] = Anchor(ino, dirino, dn_hash);
+    dout(7) << "add added " << anchor_map[ino] << dendl;
+    return true;
+  } else {
+    dout(7) << "add had " << anchor_map[ino] << dendl;
+    return false;
+  }
+}
+
+void AnchorServer::inc(inodeno_t ino)
+{
+  dout(7) << "inc " << ino << dendl;
+
+  assert(anchor_map.count(ino));
+
+  while (1) {
+    Anchor &anchor = anchor_map[ino];
+    anchor.nref++;
+      
+    dout(10) << "inc now " << anchor << dendl;
+    ino = anchor.dirino;
+    
+    if (ino == 0) break;
+    if (anchor_map.count(ino) == 0) break;
+  }
+}
+
+void AnchorServer::dec(inodeno_t ino) 
+{
+  dout(7) << "dec " << ino << dendl;
+  assert(anchor_map.count(ino));
+
+  while (true) {
+    Anchor &anchor = anchor_map[ino];
+    anchor.nref--;
+      
+    if (anchor.nref == 0) {
+      dout(10) << "dec removing " << anchor << dendl;
+      inodeno_t dirino = anchor.dirino;
+      anchor_map.erase(ino);
+      ino = dirino;
+    } else {
+      dout(10) << "dec now " << anchor << dendl;
+      ino = anchor.dirino;
+    }
+    
+    if (ino == 0) break;
+    if (anchor_map.count(ino) == 0) break;
+  }
+}
+
+
+// server
+
+void AnchorServer::_prepare(bufferlist &bl, __u64 reqid, int bymds)
+{
+  bufferlist::iterator p = bl.begin();
+  __u32 what;
+  inodeno_t ino;
+  vector<Anchor> trace;
+  ::decode(what, p);
+  ::decode(ino, p);
+  ::decode(trace, p);
+
+  switch (what) {
+  case ANCHOR_OP_CREATE:
+    version++;
+
+    // make sure trace is in table
+    for (unsigned i=0; i<trace.size(); i++) 
+      add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
+    inc(ino);
+    pending_create[version] = ino;  // so we can undo
+    break;
+
+
+  case ANCHOR_OP_DESTROY:
+    version++;
+    pending_destroy[version] = ino;
+    break;
+    
+  case ANCHOR_OP_UPDATE:
+    version++;
+    pending_update[version].first = ino;
+    pending_update[version].second = trace;
+    break;
+
+  default:
+    assert(0);
+  }
+  //dump();
+}
+
+bool AnchorServer::_is_prepared(version_t tid)
+{
+  return 
+    pending_create.count(tid) ||
+    pending_destroy.count(tid) ||
+    pending_update.count(tid);
+}
+
+void AnchorServer::_commit(version_t tid)
+{
+  if (pending_create.count(tid)) {
+    dout(7) << "commit " << tid << " create " << pending_create[tid] << dendl;
+    pending_create.erase(tid);
+  } 
+
+  else if (pending_destroy.count(tid)) {
+    inodeno_t ino = pending_destroy[tid];
+    dout(7) << "commit " << tid << " destroy " << ino << dendl;
+    
+    dec(ino);  // destroy
+    
+    pending_destroy.erase(tid);
+  }
+
+  else if (pending_update.count(tid)) {
+    inodeno_t ino = pending_update[tid].first;
+    vector<Anchor> &trace = pending_update[tid].second;
+    
+    if (anchor_map.count(ino)) {
+      dout(7) << "commit " << tid << " update " << ino << dendl;
+
+      // remove old
+      dec(ino);
+      
+      // add new
+      for (unsigned i=0; i<trace.size(); i++) 
+       add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
+      inc(ino);
+    } else {
+      dout(7) << "commit " << tid << " update " << ino << " -- DNE" << dendl;
+    }
+    
+    pending_update.erase(tid);
+  }
+  else
+    assert(0);
+
+  pending_for_mds.erase(tid);
+
+  // bump version.
+  version++;
+  //dump();
+}
+
+void AnchorServer::_rollback(version_t tid) 
+{
+  if (pending_create.count(tid)) {
+    inodeno_t ino = pending_create[tid];
+    dout(7) << "rollback " << tid << " create " << ino << dendl;
+    dec(ino);
+    pending_create.erase(tid);
+  } 
+
+  else if (pending_destroy.count(tid)) {
+    inodeno_t ino = pending_destroy[tid];
+    dout(7) << "rollback " << tid << " destroy " << ino << dendl;
+    pending_destroy.erase(tid);
+  }
+
+  else if (pending_update.count(tid)) {
+    inodeno_t ino = pending_update[tid].first;
+    dout(7) << "rollback " << tid << " update " << ino << dendl;
+    pending_update.erase(tid);
+  }
+  else
+    assert(0);
+
+  pending_for_mds.erase(tid);
+
+  // bump version.
+  version++;
+  //dump();
+}
+
+
+void AnchorServer::handle_query(MMDSTableRequest *req)
+{
+  bufferlist::iterator p = req->bl.begin();
+  inodeno_t curino;
+  ::decode(curino, p);
+  dout(7) << "handle_lookup " << *req << " ino " << curino << dendl;
+
+  vector<Anchor> trace;
+  while (true) {
+    assert(anchor_map.count(curino) == 1);
+    Anchor &anchor = anchor_map[curino];
+    
+    dout(10) << "handle_lookup  adding " << anchor << dendl;
+    trace.insert(trace.begin(), anchor);  // lame FIXME
+    
+    if (anchor.dirino < MDS_INO_BASE) break;
+    curino = anchor.dirino;
+  }
+
+  // reply
+  MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_QUERY_REPLY, req->reqid, version);
+  ::encode(curino, req->bl);
+  ::encode(trace, req->bl);
+  mds->send_message_mds(reply, req->get_source().num());
+
+  delete req;
+}
+
+
diff --git a/src/mds/AnchorServer.h b/src/mds/AnchorServer.h
new file mode 100644 (file)
index 0000000..8bc65fd
--- /dev/null
@@ -0,0 +1,66 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __ANCHORSERVER_H
+#define __ANCHORSERVER_H
+
+#include "MDSTableServer.h"
+#include "Anchor.h"
+
+class AnchorServer : public MDSTableServer {
+ public:
+  AnchorServer(MDS *mds) :
+    MDSTableServer(mds, TABLE_ANCHOR) {}
+
+  // table bits
+  hash_map<inodeno_t, Anchor>  anchor_map;
+
+  // uncommitted operations
+  map<version_t, inodeno_t> pending_create;
+  map<version_t, inodeno_t> pending_destroy;
+  map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
+
+  void init_inode();
+  void reset_state();
+  void encode_state(bufferlist& bl) {
+    ::encode(anchor_map, bl);
+    ::encode(pending_create, bl);
+    ::encode(pending_destroy, bl);
+    ::encode(pending_update, bl);
+    ::encode(pending_for_mds, bl);
+  }
+  void decode_state(bufferlist::iterator& p) {
+    ::decode(anchor_map, p);
+    ::decode(pending_create, p);
+    ::decode(pending_destroy, p);
+    ::decode(pending_update, p);
+    ::decode(pending_for_mds, p);
+  }
+
+  bool add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash);
+  void inc(inodeno_t ino);
+  void dec(inodeno_t ino);
+
+  void dump();
+
+  // server bits
+  void _prepare(bufferlist &bl, __u64 reqid, int bymds);
+  bool _is_prepared(version_t tid);
+  void _commit(version_t tid);
+  void _rollback(version_t tid);
+  void handle_query(MMDSTableRequest *m);
+};
+
+
+#endif
diff --git a/src/mds/AnchorTable.cc b/src/mds/AnchorTable.cc
deleted file mode 100644 (file)
index 1490349..0000000
+++ /dev/null
@@ -1,678 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#include "AnchorTable.h"
-#include "MDS.h"
-
-#include "osdc/Filer.h"
-
-#include "msg/Messenger.h"
-#include "messages/MAnchor.h"
-
-#include "common/Clock.h"
-
-#include "MDLog.h"
-#include "events/EAnchor.h"
-
-#include "config.h"
-
-#define dout(x)  if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable "
-#define derr(x)  if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable "
-
-
-void AnchorTable::dump()
-{
-  dout(7) << "dump v " << version << dendl;
-  for (hash_map<inodeno_t, Anchor>::iterator it = anchor_map.begin();
-       it != anchor_map.end();
-       it++) 
-    dout(15) << "dump " << it->second << dendl;
-}
-
-
-/*
- * basic updates
- */
-
-bool AnchorTable::add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash) 
-{
-  //dout(17) << "add " << ino << " dirfrag " << dirfrag << dendl;
-  
-  // parent should be there
-  assert(dirino < MDS_INO_BASE ||     // base case,
-         anchor_map.count(dirino));   // or have it
-  
-  if (anchor_map.count(ino) == 0) {
-    // new item
-    anchor_map[ino] = Anchor(ino, dirino, dn_hash);
-    dout(7) << "add added " << anchor_map[ino] << dendl;
-    return true;
-  } else {
-    dout(7) << "add had " << anchor_map[ino] << dendl;
-    return false;
-  }
-}
-
-void AnchorTable::inc(inodeno_t ino)
-{
-  dout(7) << "inc " << ino << dendl;
-
-  assert(anchor_map.count(ino));
-
-  while (1) {
-    Anchor &anchor = anchor_map[ino];
-    anchor.nref++;
-      
-    dout(10) << "inc now " << anchor << dendl;
-    ino = anchor.dirino;
-    
-    if (ino == 0) break;
-    if (anchor_map.count(ino) == 0) break;
-  }
-}
-
-void AnchorTable::dec(inodeno_t ino) 
-{
-  dout(7) << "dec " << ino << dendl;
-  assert(anchor_map.count(ino));
-
-  while (true) {
-    Anchor &anchor = anchor_map[ino];
-    anchor.nref--;
-      
-    if (anchor.nref == 0) {
-      dout(10) << "dec removing " << anchor << dendl;
-      inodeno_t dirino = anchor.dirino;
-      anchor_map.erase(ino);
-      ino = dirino;
-    } else {
-      dout(10) << "dec now " << anchor << dendl;
-      ino = anchor.dirino;
-    }
-    
-    if (ino == 0) break;
-    if (anchor_map.count(ino) == 0) break;
-  }
-}
-
-
-/* 
- * high level 
- */
-
-
-// LOOKUP
-
-void AnchorTable::handle_lookup(MAnchor *req)
-{
-  inodeno_t curino = req->get_ino();
-  dout(7) << "handle_lookup " << curino << dendl;
-
-  vector<Anchor> trace;
-  while (true) {
-    assert(anchor_map.count(curino) == 1);
-    Anchor &anchor = anchor_map[curino];
-
-    dout(10) << "handle_lookup  adding " << anchor << dendl;
-    trace.insert(trace.begin(), anchor);  // lame FIXME
-
-    if (anchor.dirino < MDS_INO_BASE) break;
-    curino = anchor.dirino;
-  }
-
-  // reply
-  MAnchor *reply = new MAnchor(ANCHOR_OP_LOOKUP_REPLY, req->get_ino());
-  reply->set_trace(trace);
-  mds->messenger->send_message(reply, req->get_source_inst());
-
-  delete req;
-}
-
-
-// MIDLEVEL
-
-void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
-{
-  // make sure trace is in table
-  for (unsigned i=0; i<trace.size(); i++) 
-    add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
-  inc(ino);
-
-  version++;
-  pending_create[version] = ino;  // so we can undo
-  pending_reqmds[version] = reqmds;
-  //dump();
-}
-
-void AnchorTable::destroy_prepare(inodeno_t ino, int reqmds)
-{
-  version++;
-  pending_destroy[version] = ino;
-  pending_reqmds[version] = reqmds;
-  //dump();
-}
-
-void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
-{
-  version++;
-  pending_update[version].first = ino;
-  pending_update[version].second = trace;
-  pending_reqmds[version] = reqmds;
-  //dump();
-}
-
-void AnchorTable::commit(version_t atid) 
-{
-  if (pending_create.count(atid)) {
-    dout(7) << "commit " << atid << " create " << pending_create[atid] << dendl;
-    pending_create.erase(atid);
-  } 
-
-  else if (pending_destroy.count(atid)) {
-    inodeno_t ino = pending_destroy[atid];
-    dout(7) << "commit " << atid << " destroy " << ino << dendl;
-    
-    dec(ino);  // destroy
-    
-    pending_destroy.erase(atid);
-  }
-
-  else if (pending_update.count(atid)) {
-    inodeno_t ino = pending_update[atid].first;
-    vector<Anchor> &trace = pending_update[atid].second;
-    
-    if (anchor_map.count(ino)) {
-      dout(7) << "commit " << atid << " update " << ino << dendl;
-
-      // remove old
-      dec(ino);
-      
-      // add new
-      for (unsigned i=0; i<trace.size(); i++) 
-       add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
-      inc(ino);
-    } else {
-      dout(7) << "commit " << atid << " update " << ino << " -- DNE" << dendl;
-    }
-    
-    pending_update.erase(atid);
-  }
-  else
-    assert(0);
-
-  pending_reqmds.erase(atid);
-
-  // bump version.
-  version++;
-  //dump();
-}
-
-void AnchorTable::rollback(version_t atid) 
-{
-  if (pending_create.count(atid)) {
-    inodeno_t ino = pending_create[atid];
-    dout(7) << "rollback " << atid << " create " << ino << dendl;
-    dec(ino);
-    pending_create.erase(atid);
-  } 
-
-  else if (pending_destroy.count(atid)) {
-    inodeno_t ino = pending_destroy[atid];
-    dout(7) << "rollback " << atid << " destroy " << ino << dendl;
-    pending_destroy.erase(atid);
-  }
-
-  else if (pending_update.count(atid)) {
-    inodeno_t ino = pending_update[atid].first;
-    dout(7) << "rollback " << atid << " update " << ino << dendl;
-    pending_update.erase(atid);
-  }
-  else
-    assert(0);
-
-  pending_reqmds.erase(atid);
-
-  // bump version.
-  version++;
-  //dump();
-}
-
-
-
-
-// CREATE
-
-class C_AT_CreatePrepare : public Context {
-  AnchorTable *at;
-  MAnchor *req;
-  version_t atid;
-public:
-  C_AT_CreatePrepare(AnchorTable *a, MAnchor *r, version_t t) :
-    at(a), req(r), atid(t) { }
-  void finish(int r) {
-    at->_create_prepare_logged(req, atid);
-  }
-};
-
-void AnchorTable::handle_create_prepare(MAnchor *req)
-{
-  inodeno_t ino = req->get_ino();
-  vector<Anchor>& trace = req->get_trace();
-
-  dout(7) << "handle_create_prepare " << ino << dendl;
-  
-  create_prepare(ino, trace, req->get_source().num());
-
-  // log it
-  EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version, req->get_source().num());
-  le->set_trace(trace);
-  mds->mdlog->submit_entry(le, 
-                          new C_AT_CreatePrepare(this, req, version));
-}
-  
-void AnchorTable::_create_prepare_logged(MAnchor *req, version_t atid)
-{
-  inodeno_t ino = req->get_ino();
-  dout(7) << "_create_prepare_logged " << ino << " atid " << atid << dendl;
-
-  // reply
-  MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid);
-  mds->messenger->send_message(reply, req->get_source_inst());
-
-  delete req;
-}
-
-
-
-
-// DESTROY
-
-class C_AT_DestroyPrepare : public Context {
-  AnchorTable *at;
-  MAnchor *req;
-  version_t atid;
-public:
-  C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r, version_t t) :
-    at(a), req(r), atid(t) { }
-  void finish(int r) {
-    at->_destroy_prepare_logged(req, atid);
-  }
-};
-
-void AnchorTable::handle_destroy_prepare(MAnchor *req)
-{
-  inodeno_t ino = req->get_ino();
-  dout(7) << "handle_destroy_prepare " << ino << dendl;
-
-  destroy_prepare(ino, req->get_source().num());
-
-  mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version, req->get_source().num()),
-                          new C_AT_DestroyPrepare(this, req, version));
-}
-
-void AnchorTable::_destroy_prepare_logged(MAnchor *req, version_t atid)
-{
-  inodeno_t ino = req->get_ino();
-  dout(7) << "_destroy_prepare_logged " << ino << " atid " << atid << dendl;
-
-  // reply
-  MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid);
-  mds->messenger->send_message(reply, req->get_source_inst());
-  delete req;
-}
-
-
-
-// UPDATE
-
-class C_AT_UpdatePrepare : public Context {
-  AnchorTable *at;
-  MAnchor *req;
-  version_t atid;
-public:
-  C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r, version_t t) :
-    at(a), req(r), atid(t) { }
-  void finish(int r) {
-    at->_update_prepare_logged(req, atid);
-  }
-};
-
-void AnchorTable::handle_update_prepare(MAnchor *req)
-{
-  inodeno_t ino = req->get_ino();
-  vector<Anchor>& trace = req->get_trace();
-
-  dout(7) << "handle_update_prepare " << ino << dendl;
-  
-  update_prepare(ino, trace, req->get_source().num());
-
-  // log it
-  EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version, req->get_source().num());
-  le->set_trace(trace);
-  mds->mdlog->submit_entry(le, 
-                          new C_AT_UpdatePrepare(this, req, version));
-}
-  
-void AnchorTable::_update_prepare_logged(MAnchor *req, version_t atid)
-{
-  inodeno_t ino = req->get_ino();
-  dout(7) << "_update_prepare_logged " << ino << " atid " << atid << dendl;
-
-  // reply
-  MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid);
-  mds->messenger->send_message(reply, req->get_source_inst());
-  delete req;
-}
-
-
-
-// COMMIT
-
-class C_AT_Commit : public Context {
-  AnchorTable *at;
-  MAnchor *req;
-public:
-  C_AT_Commit(AnchorTable *a, MAnchor *r) :
-    at(a), req(r) { }
-  void finish(int r) {
-    at->_commit_logged(req);
-  }
-};
-
-void AnchorTable::handle_commit(MAnchor *req)
-{
-  version_t atid = req->get_atid();
-  dout(7) << "handle_commit " << atid << dendl;
-  
-  if (pending_create.count(atid) ||
-      pending_destroy.count(atid) ||
-      pending_update.count(atid)) {
-    commit(atid);
-    mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_COMMIT, atid, version));
-  }
-  else if (atid <= version) {
-    dout(0) << "got commit for atid " << atid << " <= " << version 
-           << ", already committed, sending ack." 
-           << dendl;
-    MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid);
-    mds->messenger->send_message(reply, req->get_source_inst());
-    delete req;
-    return;
-  } 
-  else {
-    // wtf.
-    dout(0) << "got commit for atid " << atid << " > " << version << dendl;
-    assert(atid <= version);
-  }
-  
-  // wait for it to journal
-  mds->mdlog->wait_for_sync(new C_AT_Commit(this, req));
-}
-
-
-void AnchorTable::_commit_logged(MAnchor *req)
-{
-  dout(7) << "_commit_logged, sending ACK" << dendl;
-  MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid());
-  mds->messenger->send_message(reply, req->get_source_inst());
-  delete req;
-}
-
-
-
-// ROLLBACK
-
-void AnchorTable::handle_rollback(MAnchor *req)
-{
-  version_t atid = req->get_atid();
-  dout(7) << "handle_rollback " << atid << dendl;
-  rollback(atid);
-  delete req;
-}
-
-
-
-/*
- * messages 
- */
-
-void AnchorTable::dispatch(Message *m)
-{
-  switch (m->get_type()) {
-  case MSG_MDS_ANCHOR:
-    handle_anchor_request((MAnchor*)m);
-    break;
-    
-  default:
-    assert(0);
-  }
-}
-
-
-void AnchorTable::handle_anchor_request(class MAnchor *req)
-{
-  // make sure i'm open!
-  if (!opened) {
-    dout(7) << "not open yet" << dendl;
-    
-    waiting_for_open.push_back(new C_MDS_RetryMessage(mds, req));
-    
-    if (!opening) {
-      opening = true;
-      load(0);
-    }
-    return;
-  }
-
-  dout(10) << "handle_anchor_request " << *req << dendl;
-
-  // go
-  switch (req->get_op()) {
-
-  case ANCHOR_OP_LOOKUP:
-    handle_lookup(req);
-    break;
-
-  case ANCHOR_OP_CREATE_PREPARE:
-    handle_create_prepare(req);
-    break;
-  case ANCHOR_OP_DESTROY_PREPARE:
-    handle_destroy_prepare(req);
-    break;
-  case ANCHOR_OP_UPDATE_PREPARE:
-    handle_update_prepare(req);
-    break;
-
-  case ANCHOR_OP_COMMIT:
-    handle_commit(req);
-    break;
-
-  case ANCHOR_OP_ROLLBACK:
-    handle_rollback(req);
-    break;
-
-  default:
-    assert(0);
-  }
-
-}
-
-
-
-
-// primitive load/save for now!
-
-// load/save entire table for now!
-
-class C_AT_Saved : public Context {
-  AnchorTable *at;
-  version_t version;
-public:
-  C_AT_Saved(AnchorTable *a, version_t v) : at(a), version(v) {}
-  void finish(int r) {
-    at->_saved(version);
-  }
-};
-
-void AnchorTable::save(Context *onfinish)
-{
-  dout(7) << "save v " << version << dendl;
-  if (!opened) {
-    assert(!onfinish);
-    return;
-  }
-  
-  if (onfinish)
-    waiting_for_save[version].push_back(onfinish);
-
-  if (committing_version == version) {
-    dout(7) << "save already committing v " << version << dendl;
-    return;
-  }
-  committing_version = version;
-
-  // build up write
-  bufferlist bl;
-
-  // version
-  ::encode(version, bl);
-  ::encode(anchor_map, bl);
-
-  // pending
-  ::encode(pending_reqmds, bl);
-  ::encode(pending_create, bl);
-  ::encode(pending_destroy, bl);
-  ::encode(pending_update, bl);
-
-  // write!
-  object_t oid = object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0);
-  vector<snapid_t> snaps;
-  mds->objecter->write(oid,
-                      0, bl.length(),
-                      mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_anchortable_layout),
-                      snaps,
-                      bl, 0,
-                      NULL, new C_AT_Saved(this, version));
-}
-
-void AnchorTable::_saved(version_t v)
-{
-  dout(7) << "_saved v " << v << dendl;
-
-  assert(v <= committing_version);
-  assert(committed_version < v);
-  committed_version = v;
-  
-  finish_contexts(waiting_for_save[v], 0);
-  waiting_for_save.erase(v);  
-}
-
-
-
-class C_AT_Load : public Context {
-  AnchorTable *at;
-public:
-  bufferlist bl;
-  C_AT_Load(AnchorTable *a) : at(a) {}
-  void finish(int result) {
-    assert(result > 0);
-    at->_loaded(bl);
-  }
-};
-
-void AnchorTable::load(Context *onfinish)
-{
-  dout(7) << "load" << dendl;
-  assert(!opened);
-
-  waiting_for_open.push_back(onfinish);
-
-  C_AT_Load *fin = new C_AT_Load(this);
-  object_t oid = object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0);
-  vector<snapid_t> snaps;
-  mds->objecter->read(oid,
-                     0, 0,
-                     mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_anchortable_layout),
-                     snaps,
-                     &fin->bl, 0,
-                     fin);
-}
-
-void AnchorTable::_loaded(bufferlist& bl)
-{
-  dout(10) << "_loaded got " << bl.length() << " bytes" << dendl;
-
-  bufferlist::iterator p = bl.begin();
-
-  ::decode(version, p);
-  ::decode(anchor_map, p);
-
-  ::decode(pending_reqmds, p);
-  ::decode(pending_create, p);
-  ::decode(pending_destroy, p);
-  ::decode(pending_update, p);
-
-  assert(p.end());
-
-  // done.
-  opened = true;
-  opening = false;
-  
-  finish_contexts(waiting_for_open);
-}
-
-
-//////
-
-void AnchorTable::finish_recovery()
-{
-  dout(7) << "finish_recovery" << dendl;
-  
-  // resend agrees for everyone.
-  for (map<version_t,int>::iterator p = pending_reqmds.begin();
-       p != pending_reqmds.end();
-       p++) 
-    resend_agree(p->first, p->second);
-}
-
-
-void AnchorTable::resend_agree(version_t v, int who)
-{
-  if (pending_create.count(v)) {
-    MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, pending_create[v], v);
-    mds->send_message_mds(reply, who);
-  }
-  else if (pending_destroy.count(v)) {
-    MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, pending_destroy[v], v);
-    mds->send_message_mds(reply, who);
-  }
-  else {
-    assert(pending_update.count(v));
-    MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, pending_update[v].first, v);
-    mds->send_message_mds(reply, who);
-  }
-}
-
-void AnchorTable::handle_mds_recovery(int who)
-{
-  dout(7) << "handle_mds_recovery mds" << who << dendl;
-  
-  // resend agrees for recovered mds
-  for (map<version_t,int>::iterator p = pending_reqmds.begin();
-       p != pending_reqmds.end();
-       p++) {
-    if (p->second != who) continue;
-    resend_agree(p->first, p->second);
-  }
-}
diff --git a/src/mds/AnchorTable.h b/src/mds/AnchorTable.h
deleted file mode 100644 (file)
index 520597e..0000000
+++ /dev/null
@@ -1,127 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __ANCHORTABLE_H
-#define __ANCHORTABLE_H
-
-#include "Anchor.h"
-#include "include/Context.h"
-
-#include <ext/hash_map>
-using namespace __gnu_cxx;
-
-class MDS;
-class MAnchor;
-
-class AnchorTable {
-  MDS *mds;
-
-  // keep the entire table in memory.
-  hash_map<inodeno_t, Anchor>  anchor_map;
-
-  // uncommitted operations
-  map<version_t, int> pending_reqmds;
-  map<version_t, inodeno_t> pending_create;
-  map<version_t, inodeno_t> pending_destroy;
-  map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
-
-  version_t version;  // this includes anchor_map AND pending_* state.
-  version_t committing_version;
-  version_t committed_version;
-
-  // load/save state
-  bool opening, opened;
-
-  // waiters
-  list<Context*> waiting_for_open;
-  map<version_t, list<Context*> > waiting_for_save;
-
-protected:
-
-  // basic updates
-  bool add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash);
-  void inc(inodeno_t ino);
-  void dec(inodeno_t ino);
-
-  // mid-level
-  void create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
-  void destroy_prepare(inodeno_t ino, int reqmds);
-  void update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
-  void commit(version_t atid);
-  void rollback(version_t atid);
-  friend class EAnchor;  // used for journal replay.
-
-  // high level interface
-  void handle_lookup(MAnchor *req);
-
-  void handle_create_prepare(MAnchor *req);
-  void _create_prepare_logged(MAnchor *req, version_t atid);
-  friend class C_AT_CreatePrepare;
-
-  void handle_destroy_prepare(MAnchor *req);
-  void _destroy_prepare_logged(MAnchor *req, version_t atid);
-  friend class C_AT_DestroyPrepare;
-
-  void handle_update_prepare(MAnchor *req);
-  void _update_prepare_logged(MAnchor *req, version_t atid);
-  friend class C_AT_UpdatePrepare;
-
-  void handle_commit(MAnchor *req);
-  void _commit_logged(MAnchor *req);
-  friend class C_AT_Commit;
-
-  void handle_rollback(MAnchor *req);
-
-  // messages
-  void handle_anchor_request(MAnchor *m);  
-
-  void dump();
-
-public:
-  AnchorTable(MDS *m) :
-    mds(m),
-    version(0), committing_version(0), committed_version(0), 
-    opening(false), opened(false) { }
-
-  void dispatch(class Message *m);
-
-  version_t get_version() { return version; }
-  version_t get_committed_version() { return committed_version; }
-
-  void create_fresh() {   
-    // reset (i.e. on mkfs) to empty, but unsaved table.
-    version = 1;
-    opened = true;
-    opening = false;
-    anchor_map.clear();
-    pending_create.clear();
-    pending_destroy.clear();
-    pending_update.clear();
-  }
-
-  // load/save entire table for now!
-  void save(Context *onfinish);
-  void _saved(version_t v);
-  void load(Context *onfinish);
-  void _loaded(bufferlist& bl);
-
-  // recovery
-  void handle_mds_recovery(int who);
-  void finish_recovery();
-  void resend_agree(version_t v, int who);
-
-};
-
-#endif
index 46ea94888cc2365501f8213ec503445c125028a1..d51e939b8783ce9f26d9d1472350ec45f75db5c2 100644 (file)
@@ -20,7 +20,6 @@
 
 #include "MDS.h"
 #include "MDCache.h"
-#include "AnchorTable.h"
 
 #include "snap.h"
 
index de9c3ede1becc2c15d9feb92c569609401690ff3..09e4659b6d3d69270928a4e452d3fe8f44d03460 100644 (file)
 
 #include "events/EPurgeFinish.h"
 
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
-
-#include "events/ESnap.h"
+#include "events/ETableClient.h"
+#include "events/ETableServer.h"
 
 
 LogEvent *LogEvent::decode(bufferlist& bl)
@@ -74,10 +72,8 @@ LogEvent *LogEvent::decode(bufferlist& bl)
 
   case EVENT_PURGEFINISH: le = new EPurgeFinish; break;
 
-  case EVENT_ANCHOR: le = new EAnchor; break;
-  case EVENT_ANCHORCLIENT: le = new EAnchorClient; break;
-
-  case EVENT_SNAP: le = new ESnap; break;
+  case EVENT_TABLECLIENT: le = new ETableClient; break;
+  case EVENT_TABLESERVER: le = new ETableServer; break;
 
   default:
     generic_dout(1) << "uh oh, unknown log event type " << type << dendl;
index 6f27f3cc5d5cb9b47eacef9f4dab8c2e922093e9..151ae5df90b4c488fe5a5d2802442cd9d8aed98e 100644 (file)
 
 #define EVENT_PURGEFINISH  30
 
-#define EVENT_ANCHOR       40
-#define EVENT_ANCHORCLIENT 41
-
-#define EVENT_SNAP         50
+#define EVENT_TABLECLIENT  42
+#define EVENT_TABLESERVER  43
 
 
 
index 4fcfdd48b5a241da9631819f67d299880d776282..033432a7cb143e3d69e228a9377f75c9b57755f4 100644 (file)
@@ -49,8 +49,7 @@ class LogSegment {
   //xlist<CInode*>  purging_inodes;
   map<CInode*, map<loff_t,loff_t> > purging_inodes;
 
-  // committed anchor transactions
-  hash_set<version_t> pending_commit_atids;
+  map<int, hash_set<version_t> > pending_commit_tids;  // mdstable
   set<metareqid_t> uncommitted_masters;
 
   // client request ids
@@ -59,15 +58,14 @@ class LogSegment {
   // table version
   version_t allocv;
   version_t sessionmapv;
-  version_t anchortablev;
-  version_t snaptablev;
+  map<int,version_t> tablev;
 
   // try to expire
   C_Gather *try_to_expire(MDS *mds);
 
   // cons
   LogSegment(loff_t off) : offset(off), end(off), num_events(0), trimmable_at(0),
-                          allocv(0), sessionmapv(0), anchortablev(0), snaptablev(0)
+                          allocv(0), sessionmapv(0)
   { }
 };
 
index 5d90e4c001d3b1081d85c4a4d0a9c1cad47de1a0..55d54f258a26bbfe744061239f0b9e7885305e7e 100644 (file)
@@ -5216,7 +5216,7 @@ void MDCache::_anchor_prepared(CInode *in, version_t atid, bool add)
   EUpdate *le = new EUpdate(mds->mdlog, add ? "anchor_create":"anchor_destroy");
   mds->locker->predirty_nested(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
   le->metablob.add_primary_dentry(in->parent, true, 0, pi);
-  le->metablob.add_anchor_transaction(atid);
+  le->metablob.add_table_transaction(TABLE_ANCHOR, atid);
   mds->mdlog->submit_entry(le, new C_MDC_AnchorLogged(this, in, atid, mut));
 }
 
index a6fbe843328b4e5f51a5f3989ff83235a33727fc..defe2a8c309949c63576195d72c6851d984d429f 100644 (file)
@@ -257,6 +257,9 @@ struct MDRequest : public Mutation {
     map<__u32,entity_inst_t> imported_client_map;
     map<CInode*, map<__u32,Capability::Export> > cap_imports;
     
+    // for snaps
+    version_t stid;
+
     // called when slave commits or aborts
     Context *slave_commit;
     bufferlist rollback_bl;
@@ -271,6 +274,7 @@ struct MDRequest : public Mutation {
     More() : 
       src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
       destdn_was_remote_inode(0), was_link_merge(false),
+      stid(0),
       slave_commit(0),
       fragment_in(0), fragment_bits(0) { }
   } *_more;
index bc1adc31faa1b828b59ae18510bbd1900f68ce89..fa4cedb1d22d5f2b39a5182f47ec28717bebc986 100644 (file)
@@ -33,7 +33,7 @@
 #include "MDBalancer.h"
 #include "Migrator.h"
 
-#include "AnchorTable.h"
+#include "AnchorServer.h"
 #include "AnchorClient.h"
 
 #include "IdAllocator.h"
@@ -57,7 +57,7 @@
 #include "messages/MClientRequest.h"
 #include "messages/MClientRequestForward.h"
 
-#include "messages/MAnchor.h"
+#include "messages/MMDSTableRequest.h"
 
 #include "config.h"
 
@@ -90,12 +90,12 @@ MDS::MDS(int whoami, Messenger *m, MonMap *mm) :
   mdlog = new MDLog(this);
   balancer = new MDBalancer(this);
 
-  anchorclient = new AnchorClient(this);
   idalloc = new IdAllocator(this);
-
-  anchortable = new AnchorTable(this);
   snaptable = new SnapTable(this);
 
+  anchorserver = new AnchorServer(this);
+  anchorclient = new AnchorClient(this);
+
   server = new Server(this);
   locker = new Locker(this, mdcache);
 
@@ -127,7 +127,7 @@ MDS::~MDS() {
   if (mdlog) { delete mdlog; mdlog = NULL; }
   if (balancer) { delete balancer; balancer = NULL; }
   if (idalloc) { delete idalloc; idalloc = NULL; }
-  if (anchortable) { delete anchortable; anchortable = NULL; }
+  if (anchorserver) { delete anchorserver; anchorserver = NULL; }
   if (snaptable) { delete snaptable; snaptable = NULL; }
   if (anchorclient) { delete anchorclient; anchorclient = NULL; }
   if (osdmap) { delete osdmap; osdmap = 0; }
@@ -237,6 +237,36 @@ void MDS::reopen_logger(utime_t start)
   server->reopen_logger(start, append);
 }
 
+
+
+
+MDSTableClient *MDS::get_table_client(int t)
+{
+  switch (t) {
+  case TABLE_ANCHOR: return anchorclient;
+    //case TABLE_SNAP: return snapserver;
+  default: assert(0);
+  }
+}
+
+MDSTableServer *MDS::get_table_server(int t)
+{
+  switch (t) {
+  case TABLE_ANCHOR: return anchorserver;
+    //case TABLE_SNAP: return snapserver;
+  default: assert(0);
+  }
+}
+
+
+
+
+
+
+
+
+
+
 void MDS::send_message_mds(Message *m, int mds)
 {
   // send mdsmap first?
@@ -742,8 +772,8 @@ void MDS::boot_create()
   // fixme: fake out anchortable
   if (mdsmap->get_anchortable() == whoami) {
     dout(10) << "boot_create creating fresh anchortable" << dendl;
-    anchortable->create_fresh();
-    anchortable->save(fin->new_sub());
+    anchorserver->reset();
+    anchorserver->save(fin->new_sub());
   }
 
   if (whoami == 0) {
@@ -791,7 +821,7 @@ void MDS::boot_start(int step, int r)
 
       if (mdsmap->get_anchortable() == whoami) {
        dout(2) << "boot_start " << step << ": opening anchor table" << dendl;
-       anchortable->load(gather->new_sub());
+       anchorserver->load(gather->new_sub());
       }
       if (whoami == 0) {
        dout(2) << "boot_start " << step << ": opening snap table" << dendl;    
@@ -949,7 +979,7 @@ void MDS::recovery_done()
   
   // kick anchortable (resent AGREEs)
   if (mdsmap->get_anchortable() == whoami) 
-    anchortable->finish_recovery();
+    anchorserver->finish_recovery();
   
   // kick anchorclient (resent COMMITs)
   anchorclient->finish_recovery();
@@ -969,8 +999,8 @@ void MDS::handle_mds_recovery(int who)
   
   mdcache->handle_mds_recovery(who);
 
-  if (anchortable)
-    anchortable->handle_mds_recovery(who);
+  if (anchorserver)
+    anchorserver->handle_mds_recovery(who);
   anchorclient->handle_mds_recovery(who);
   
   queue_waiters(waiting_for_active_peer[who]);
@@ -1094,12 +1124,17 @@ void MDS::_dispatch(Message *m)
       balancer->proc_message(m);
       break;
 
-      // anchor
-    case MSG_MDS_ANCHOR:
-      if (((MAnchor*)m)->get_op() < 0)
-       anchorclient->dispatch(m);
-      else
-       anchortable->dispatch(m);
+    case MSG_MDS_TABLE_REQUEST:
+      {
+       MMDSTableRequest *req = (MMDSTableRequest*)m;
+       if (req->op < 0) {
+         MDSTableClient *client = get_table_client(req->table);
+         client->handle_request(req);
+       } else {
+         MDSTableServer *server = get_table_server(req->table);
+         server->handle_request(req);
+       }
+      }
       break;
 
       // OSD
index b41bf73138cd45ce2ad2980e6e16e3f45eae1487..453b65f1c9dcd5b5a1ea0e284088a21b63364e3f 100644 (file)
@@ -54,13 +54,9 @@ class Filer;
 
 class Server;
 class Locker;
-class AnchorTable;
-class AnchorClient;
 class MDCache;
 class MDLog;
 class MDBalancer;
-class IdAllocator;
-class SnapTable;
 
 class CInode;
 class CDir;
@@ -76,6 +72,13 @@ class MHashReaddirReply;
 
 class MMDSBeacon;
 
+class IdAllocator;
+class SnapTable;
+
+class MDSTableClient;
+class MDSTableServer;
+class AnchorServer;
+class AnchorClient;
 
 class MDS : public Dispatcher {
  public:
@@ -103,11 +106,14 @@ class MDS : public Dispatcher {
 
   IdAllocator  *idalloc;
 
-  AnchorTable  *anchortable;
+  AnchorServer *anchorserver;
   AnchorClient *anchorclient;
 
   SnapTable    *snaptable;
 
+  MDSTableClient *get_table_client(int t);
+  MDSTableServer *get_table_server(int t);
+
   Logger       *logger, *logger2;
 
 
index 647d54c2d72eb195cce126605e97988cf25802aa..8b692b45e20e104d7e614a90e97e145c68720388 100644 (file)
@@ -16,6 +16,7 @@
 #define __MDSTABLE_H
 
 #include "mdstypes.h"
+#include "mds_table_types.h"
 #include "include/buffer.h"
 #include "include/Context.h"
 
diff --git a/src/mds/MDSTableClient.cc b/src/mds/MDSTableClient.cc
new file mode 100644 (file)
index 0000000..2eeb34c
--- /dev/null
@@ -0,0 +1,190 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include <iostream>
+using std::cout;
+using std::cerr;
+
+#include "MDSMap.h"
+
+#include "include/Context.h"
+#include "msg/Messenger.h"
+
+#include "MDS.h"
+#include "MDLog.h"
+#include "LogSegment.h"
+
+#include "MDSTableClient.h"
+#include "events/ETableClient.h"
+
+#include "messages/MMDSTableRequest.h"
+
+#include "config.h"
+
+#define dout(x)  if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableclient "
+#define derr(x)  if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableclient "
+
+
+void MDSTableClient::handle_request(class MMDSTableRequest *m)
+{
+  dout(10) << "handle_request " << *m << dendl;
+  assert(m->table == table);
+
+  version_t tid = m->tid;
+  __u64 reqid = m->reqid;
+
+  switch (m->op) {
+  case TABLE_OP_QUERY_REPLY:
+    handle_query_result(m);
+    break;
+    
+  case TABLE_OP_AGREE:
+    if (pending_prepare.count(reqid)) {
+      dout(10) << "got agree on " << reqid << " atid " << tid << dendl;
+      Context *onfinish = pending_prepare[reqid].onfinish;
+      *pending_prepare[reqid].ptid = tid;
+      pending_prepare.erase(reqid);
+      if (onfinish) {
+        onfinish->finish(0);
+        delete onfinish;
+      }
+    } 
+    else if (pending_commit.count(tid)) {
+      dout(10) << "stray agree on " << reqid
+              << " tid " << tid
+              << ", already committing, resending COMMIT"
+              << dendl;      
+      MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_COMMIT, 0, tid);
+      mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+    }
+    else {
+      dout(10) << "stray agree on " << reqid
+              << " tid " << tid
+              << ", sending ROLLBACK"
+              << dendl;      
+      MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_ROLLBACK, 0, tid);
+      mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+    }
+    break;
+
+  case TABLE_OP_ACK:
+    dout(10) << "got ack on tid " << tid << ", logging" << dendl;
+
+    // remove from committing list
+    assert(pending_commit.count(tid));
+    assert(pending_commit[tid]->pending_commit_tids[table].count(tid));
+    
+    // log ACK.
+    mds->mdlog->submit_entry(new ETableClient(table, TABLE_OP_ACK, tid),
+                            new C_LoggedAck(this, tid));
+    break;
+
+  default:
+    assert(0);
+  }
+
+  delete m;
+}
+
+
+void MDSTableClient::_logged_ack(version_t tid)
+{
+  dout(10) << "_logged_ack" << dendl;
+
+  assert(pending_commit.count(tid));
+  assert(pending_commit[tid]->pending_commit_tids[table].count(tid));
+  
+  pending_commit[tid]->pending_commit_tids[table].erase(tid);
+  pending_commit.erase(tid);
+  
+  // kick any waiters (LogSegment trim)
+  if (ack_waiters.count(tid)) {
+    dout(15) << "kicking ack waiters on tid " << tid << dendl;
+    mds->queue_waiters(ack_waiters[tid]);
+    ack_waiters.erase(tid);
+  }
+}
+
+void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, Context *onfinish)
+{
+  __u64 reqid = ++last_reqid;
+  dout(10) << "_prepare " << reqid << dendl;
+
+  // send message
+  MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_PREPARE, reqid);
+  req->bl = mutation;
+
+  pending_prepare[reqid].mutation = mutation;
+  pending_prepare[reqid].ptid = ptid;
+  pending_prepare[reqid].onfinish = onfinish;
+
+  mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+}
+
+void MDSTableClient::commit(version_t tid, LogSegment *ls)
+{
+  dout(10) << "commit " << tid << dendl;
+
+  assert(pending_commit.count(tid) == 0);
+  pending_commit[tid] = ls;
+  ls->pending_commit_tids[table].insert(tid);
+
+  // send message
+  MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_COMMIT, 0, tid);
+  mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+}
+
+
+
+// recovery
+
+
+void MDSTableClient::finish_recovery()
+{
+  dout(7) << "finish_recovery" << dendl;
+  resend_commits();
+}
+
+void MDSTableClient::resend_commits()
+{
+  for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
+       p != pending_commit.end();
+       ++p) {
+    dout(10) << "resending commit on " << p->first << dendl;
+    MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_COMMIT, 0, p->first);
+    mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+  }
+}
+
+void MDSTableClient::handle_mds_recovery(int who)
+{
+  dout(7) << "handle_mds_recovery mds" << who << dendl;
+
+  if (who != mds->mdsmap->get_anchortable()) 
+    return; // do nothing.
+
+  resend_queries();
+  
+  // prepares.
+  for (hash_map<__u64, _pending_prepare>::iterator p = pending_prepare.begin();
+       p != pending_prepare.end();
+       p++) {
+    dout(10) << "resending " << p->first << dendl;
+    MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_PREPARE, p->first);
+    req->bl = p->second.mutation;
+    mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+  } 
+
+  resend_commits();
+}
diff --git a/src/mds/MDSTableClient.h b/src/mds/MDSTableClient.h
new file mode 100644 (file)
index 0000000..18fd464
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __SNAPCLIENT_H
+#define __SNAPCLIENT_H
+
+#include <vector>
+using std::vector;
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+
+#include "include/types.h"
+#include "include/Context.h"
+#include "mds_table_types.h"
+
+class MDS;
+class LogSegment;
+class MMDSTableRequest;
+
+class MDSTableClient {
+protected:
+  MDS *mds;
+  int table;
+
+  __u64 last_reqid;
+
+  // prepares
+  struct _pending_prepare {
+    Context *onfinish;
+    version_t *ptid;
+    bufferlist mutation;
+  };
+
+  hash_map<__u64, _pending_prepare> pending_prepare;
+
+  // pending commits
+  map<version_t, LogSegment*> pending_commit;
+  map<version_t, list<Context*> > ack_waiters;
+
+  void handle_reply(class MMDSTableQuery *m);  
+
+  class C_LoggedAck : public Context {
+    MDSTableClient *tc;
+    version_t tid;
+  public:
+    C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {}
+    void finish(int r) {
+      tc->_logged_ack(tid);
+    }
+  };
+  void _logged_ack(version_t tid);
+
+public:
+  MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(0) {}
+  virtual ~MDSTableClient() {}
+
+  void handle_request(MMDSTableRequest *m);
+
+  void _prepare(bufferlist& mutation, version_t *ptid, Context *onfinish);
+  void commit(version_t tid, LogSegment *ls);
+
+  // for recovery (by other nodes)
+  void handle_mds_recovery(int mds); // called when someone else recovers
+  void resend_commits();
+
+  // for recovery (by me)
+  void got_journaled_agree(version_t tid, LogSegment *ls) {
+    pending_commit[tid] = ls;
+  }
+  void got_journaled_ack(version_t tid) {
+    pending_commit.erase(tid);
+  }
+  bool has_committed(version_t tid) {
+    return pending_commit.count(tid) == 0;
+  }
+  void wait_for_ack(version_t tid, Context *c) {
+    ack_waiters[tid].push_back(c);
+  }
+  void finish_recovery();                // called when i recover and go active
+
+
+  // child must implement
+  virtual void resend_queries() = 0;
+  virtual void handle_query_result(MMDSTableRequest *m) = 0;
+
+  // and friendly front-end for _prepare.
+
+};
+
+#endif
diff --git a/src/mds/MDSTableServer.cc b/src/mds/MDSTableServer.cc
new file mode 100644 (file)
index 0000000..af0be98
--- /dev/null
@@ -0,0 +1,130 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "MDSTableServer.h"
+#include "MDS.h"
+#include "MDLog.h"
+#include "msg/Messenger.h"
+
+#include "messages/MMDSTableRequest.h"
+#include "events/ETableServer.h"
+
+#define dout(x)  if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableserver(" << get_mdstable_name(table) << ") "
+#define derr(x)  if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableserver(" << get_mdstable_name(table) << ") "
+
+
+void MDSTableServer::handle_request(MMDSTableRequest *req)
+{
+  assert(req->op >= 0);
+  switch (req->op) {
+  case TABLE_OP_QUERY: return handle_query(req);
+  case TABLE_OP_PREPARE: return handle_prepare(req);
+  case TABLE_OP_COMMIT: return handle_commit(req);
+  case TABLE_OP_ROLLBACK: return handle_rollback(req);
+  default: assert(0);
+  }
+}
+
+// prepare
+
+void MDSTableServer::handle_prepare(MMDSTableRequest *req)
+{
+  dout(7) << "handle_prepare " << *req << dendl;
+  int from = req->get_source().num();
+
+  _prepare(req->bl, req->reqid, from);
+  pending_for_mds[req->reqid].mds = from;
+  pending_for_mds[req->reqid].reqid = req->reqid;
+  pending_for_mds[req->reqid].tid = version;
+
+  ETableServer *le = new ETableServer(table, TABLE_OP_PREPARE, req->reqid, from, version, version);
+  le->mutation = req->bl;
+  mds->mdlog->submit_entry(le, new C_Prepare(this, req, version));
+}
+
+void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
+{
+  dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
+  MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_AGREE, req->reqid, tid);
+  mds->send_message_mds(reply, req->get_source().num());
+  delete req;
+}
+
+
+// commit
+
+void MDSTableServer::handle_commit(MMDSTableRequest *req)
+{
+  dout(7) << "handle_commit " << *req << dendl;
+
+  version_t tid = req->tid;
+
+  if (_is_prepared(tid)) {
+    _commit(tid);
+    mds->mdlog->submit_entry(new ETableServer(table, TABLE_OP_COMMIT, 0, -1, tid, version));
+    mds->mdlog->wait_for_sync(new C_Commit(this, req));
+  }
+  else if (tid <= version) {
+    dout(0) << "got commit for tid " << tid << " <= " << version 
+           << ", already committed, sending ack." 
+           << dendl;
+    _commit_logged(req);
+  } 
+  else {
+    // wtf.
+    dout(0) << "got commit for tid " << tid << " > " << version << dendl;
+    assert(tid <= version);
+  }
+}
+
+void MDSTableServer::_commit_logged(MMDSTableRequest *req)
+{
+  dout(7) << "_commit_logged, sending ACK" << dendl;
+  MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_ACK, req->reqid, req->tid);
+  mds->send_message_mds(reply, req->get_source().num());
+  delete req;
+}
+
+// ROLLBACK
+
+void MDSTableServer::handle_rollback(MMDSTableRequest *req)
+{
+  dout(7) << "handle_rollback " << *req << dendl;
+  _rollback(req->tid);
+  delete req;
+}
+
+
+// recovery
+
+void MDSTableServer::finish_recovery()
+{
+  dout(7) << "finish_recovery" << dendl;
+  handle_mds_recovery(-1);  // resend agrees for everyone.
+}
+
+void MDSTableServer::handle_mds_recovery(int who)
+{
+  if (who >= 0)
+    dout(7) << "handle_mds_recovery mds" << who << dendl;
+  
+  // resend agrees for recovered mds
+  for (map<version_t,_pending>::iterator p = pending_for_mds.begin();
+       p != pending_for_mds.end();
+       p++) {
+    if (who >= 0 && p->second.mds != who) continue;
+    MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_AGREE, p->second.reqid, p->second.tid);
+    mds->send_message_mds(reply, who);
+  }
+}
diff --git a/src/mds/MDSTableServer.h b/src/mds/MDSTableServer.h
new file mode 100644 (file)
index 0000000..8b93704
--- /dev/null
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDSTABLESERVER_H
+#define __MDSTABLESERVER_H
+
+#include "MDSTable.h"
+
+class MMDSTableRequest;
+
+class MDSTableServer : public MDSTable {
+public:
+  int table;
+
+  /* mds's requesting any pending ops.  child needs to encodig the corresponding
+   * pending mutation state in the table.
+   */
+  struct _pending {
+    __u64 reqid;
+    __s32 mds;
+    version_t tid;
+    void encode(bufferlist& bl) const {
+      ::encode(reqid, bl);
+      ::encode(mds, bl);
+      ::encode(tid, bl);
+    }
+    void decode(bufferlist::iterator& bl) {
+      ::decode(reqid, bl);
+      ::decode(mds, bl);
+      ::decode(tid, bl);
+    }
+  };
+  WRITE_CLASS_ENCODER(_pending)
+  map<version_t,_pending> pending_for_mds;  // ** child should encode this! **
+
+
+private:
+  void handle_prepare(MMDSTableRequest *m);
+  void _prepare_logged(MMDSTableRequest *m, version_t tid);
+  struct C_Prepare : public Context {
+    MDSTableServer *server;
+    MMDSTableRequest *req;
+    version_t tid;
+    C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : server(s), req(r), tid(v) {}
+    void finish(int r) {
+      server->_prepare_logged(req, tid);
+    }
+  };
+
+  void handle_commit(MMDSTableRequest *m);
+  void _commit_logged(MMDSTableRequest *m);
+  struct C_Commit : public Context {
+    MDSTableServer *server;
+    MMDSTableRequest *req;
+    C_Commit(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
+    void finish(int r) {
+      server->_commit_logged(req);
+    }
+  };
+
+  void handle_rollback(MMDSTableRequest *m);
+
+ public:
+  virtual void handle_query(MMDSTableRequest *m) = 0;
+  virtual void _prepare(bufferlist &bl, __u64 reqid, int bymds) = 0;
+  virtual bool _is_prepared(version_t tid) = 0;
+  virtual void _commit(version_t tid) = 0;
+  virtual void _rollback(version_t tid) = 0;
+
+  MDSTableServer(MDS *m, int tab) : MDSTable(m, get_mdstable_name(tab)), table(tab) {}
+  virtual ~MDSTableServer() {}
+
+  void handle_request(MMDSTableRequest *m);
+
+  // recovery
+  void finish_recovery();
+  void handle_mds_recovery(int who);
+};
+WRITE_CLASS_ENCODER(MDSTableServer::_pending)
+
+#endif
index 367ae30fea68ede9ad4573b838518ef2a4b4219e..d81d9ef5b36ef7a57e333950d98242433282e7a8 100644 (file)
@@ -44,7 +44,6 @@
 #include "events/ESession.h"
 #include "events/EOpen.h"
 #include "events/ECommitted.h"
-#include "events/ESnap.h"
 
 #include "include/filepath.h"
 #include "common/Timer.h"
@@ -2422,7 +2421,7 @@ void Server::_link_remote(MDRequest *mdr, bool inc, CDentry *dn, CInode *targeti
   }
 
   if (mdr->more()->dst_reanchor_atid)
-    le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
+    le->metablob.add_table_transaction(TABLE_ANCHOR, mdr->more()->dst_reanchor_atid);
 
   // mark committing (needed for proper recovery)
   mdr->committing = true;
@@ -2931,7 +2930,7 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn, CDentry *straydn)
   }
 
   if (mdr->more()->dst_reanchor_atid)
-    le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
+    le->metablob.add_table_transaction(TABLE_ANCHOR, mdr->more()->dst_reanchor_atid);
 
   // log + wait
   mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn));
@@ -3672,9 +3671,9 @@ void Server::_rename_prepare(MDRequest *mdr,
 
   // anchor updates?
   if (mdr->more()->src_reanchor_atid)
-    metablob->add_anchor_transaction(mdr->more()->src_reanchor_atid);
+    metablob->add_table_transaction(TABLE_ANCHOR, mdr->more()->src_reanchor_atid);
   if (mdr->more()->dst_reanchor_atid)
-    metablob->add_anchor_transaction(mdr->more()->dst_reanchor_atid);
+    metablob->add_table_transaction(TABLE_ANCHOR, mdr->more()->dst_reanchor_atid);
 }
 
 
@@ -4914,7 +4913,7 @@ void Server::handle_client_mksnap(MDRequest *mdr)
   mdr->ls = mdlog->get_current_segment();
   EUpdate *le = new EUpdate(mdlog, "mksnap");
   le->metablob.add_client_req(req->get_reqid());
-  le->metablob.add_snap_transaction(stid);
+  le->metablob.add_table_transaction(TABLE_SNAP, stid);
   mds->locker->predirty_nested(mdr, &le->metablob, diri, 0, PREDIRTY_PRIMARY, false);
   mdcache->journal_dirty_inode(&le->metablob, diri, diri->find_snaprealm()->get_latest_snap());
 
diff --git a/src/mds/SnapClient.h b/src/mds/SnapClient.h
new file mode 100644 (file)
index 0000000..5c7a1b0
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __SNAPCLIENT_H
+#define __SNAPCLIENT_H
+
+#include <vector>
+using std::vector;
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+
+#include "include/types.h"
+#include "msg/Dispatcher.h"
+
+#include "snap.h"
+
+class Context;
+class MDS;
+class LogSegment;
+
+class MDSTableClient : public Dispatcher {
+  MDS *mds;
+
+  // prepares
+  struct _pending_prepare {
+    Context *onfinish;
+    version_t *patid;
+    bufferlist mutation;
+  };
+
+  hash_map<metareqid_t, _pending_prepare> pending_prepare;
+
+  // pending commits
+  map<version_t, LogSegment*> pending_commit;
+  map<version_t, list<Context*> > ack_waiters;
+
+  void handle_reply(class MAnchor *m);  
+
+
+};
+
+#endif
diff --git a/src/mds/events/EAnchor.h b/src/mds/events/EAnchor.h
deleted file mode 100644 (file)
index e526ed5..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#ifndef __MDS_EANCHOR_H
-#define __MDS_EANCHOR_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "../Anchor.h"
-
-class EAnchor : public LogEvent {
-protected:
-  __u32 op;
-  inodeno_t ino;
-  version_t atid; 
-  vector<Anchor> trace;
-  version_t version;    // anchor table version
-  __s32 reqmds;
-
- public:
-  EAnchor() : LogEvent(EVENT_ANCHOR) { }
-  EAnchor(int o, inodeno_t i, version_t v, int rm) :
-    LogEvent(EVENT_ANCHOR),
-    op(o), ino(i), atid(0), version(v), reqmds(rm) { }
-  EAnchor(int o, version_t a, version_t v) :
-    LogEvent(EVENT_ANCHOR),
-    op(o), atid(a), version(v), reqmds(-1) { }
-
-  void set_trace(vector<Anchor>& t) { trace = t; }
-  vector<Anchor>& get_trace() { return trace; }
-  
-  void encode(bufferlist& bl) const {
-    ::encode(op, bl);
-    ::encode(ino, bl);
-    ::encode(atid, bl);
-    ::encode(trace, bl);
-    ::encode(version, bl);
-    ::encode(reqmds, bl);
-  }
-  void decode(bufferlist::iterator &bl) {
-    ::decode(op, bl);
-    ::decode(ino, bl);
-    ::decode(atid, bl);
-    ::decode(trace, bl);
-    ::decode(version, bl);
-    ::decode(reqmds, bl);
-  }
-
-  void print(ostream& out) {
-    out << "EAnchor " << get_anchor_opname(op);
-    if (ino) out << " " << ino;
-    if (atid) out << " atid " << atid;
-    if (version) out << " v " << version;
-    if (reqmds >= 0) out << " by mds" << reqmds;
-  }  
-
-  void update_segment();
-  void replay(MDS *mds);  
-};
-
-#endif
diff --git a/src/mds/events/EAnchorClient.h b/src/mds/events/EAnchorClient.h
deleted file mode 100644 (file)
index b86a355..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#ifndef __MDS_EANCHORCLIENT_H
-#define __MDS_EANCHORCLIENT_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "../Anchor.h"
-
-class EAnchorClient : public LogEvent {
-protected:
-  __u32 op;
-  version_t atid; 
-
- public:
-  EAnchorClient() : LogEvent(EVENT_ANCHORCLIENT) { }
-  EAnchorClient(int o, version_t at) :
-    LogEvent(EVENT_ANCHORCLIENT),
-    op(o), atid(at) { }
-
-  void encode(bufferlist &bl) const {
-    ::encode(op, bl);
-    ::encode(atid, bl);
-  }
-  void decode(bufferlist::iterator &bl) {
-    ::decode(op, bl);
-    ::decode(atid, bl);
-  }
-
-  void print(ostream& out) {
-    out << "EAnchorClient " << get_anchor_opname(op);
-    if (atid) out << " atid " << atid;
-  }  
-
-  void replay(MDS *mds);
-  
-};
-
-#endif
index fe53d3d377e726b6b988df3ba920701f84baf17b..f1f34e7390ce3b3de0c8b691e5445659bc05f48d 100644 (file)
@@ -268,8 +268,7 @@ private:
   list<dirfrag_t>         lump_order;
   map<dirfrag_t, dirlump> lump_map;
 
-  list<version_t>         atids;  // anchor table transactions
-  list<version_t>         stids;  // snap table transactions
+  list<pair<__u8,version_t> > table_tids;  // tableclient transactions
 
   // ino's i've allocated
   list<inodeno_t> allocated_inos;
@@ -285,8 +284,7 @@ private:
   void encode(bufferlist& bl) const {
     ::encode(lump_order, bl);
     ::encode(lump_map, bl);
-    ::encode(atids, bl);
-    ::encode(stids, bl);
+    ::encode(table_tids, bl);
     ::encode(allocated_inos, bl);
     if (!allocated_inos.empty())
       ::encode(alloc_tablev, bl);
@@ -296,8 +294,7 @@ private:
   void decode(bufferlist::iterator &bl) {
     ::decode(lump_order, bl);
     ::decode(lump_map, bl);
-    ::decode(atids, bl);
-    ::decode(stids, bl);
+    ::decode(table_tids, bl);
     ::decode(allocated_inos, bl);
     if (!allocated_inos.empty())
       ::decode(alloc_tablev, bl);
@@ -328,11 +325,8 @@ private:
     client_reqs.push_back(r);
   }
 
-  void add_anchor_transaction(version_t atid) {
-    atids.push_back(atid);
-  }  
-  void add_snap_transaction(version_t stid) {
-    stids.push_back(stid);
+  void add_table_transaction(int table, version_t tid) {
+    table_tids.push_back(pair<__u8, version_t>(table, tid));
   }  
 
   void add_allocated_ino(inodeno_t ino, version_t tablev) {
@@ -511,10 +505,8 @@ private:
     out << "[metablob";
     if (!lump_order.empty()) 
       out << " " << lump_order.front() << ", " << lump_map.size() << " dirs";
-    if (!atids.empty())
-      out << " atids=" << atids;
-    if (!stids.empty())
-      out << " stids=" << stids;
+    if (!table_tids.empty())
+      out << " table_tids=" << table_tids;
     if (!allocated_inos.empty())
       out << " inos=" << allocated_inos << " v" << alloc_tablev;
     out << "]";
diff --git a/src/mds/events/ESnap.h b/src/mds/events/ESnap.h
deleted file mode 100644 (file)
index a08f8d0..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#ifndef __MDS_ESNAP_H
-#define __MDS_ESNAP_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "../snap.h"
-
-class ESnap : public LogEvent {
-public:
-  bool create;   
-  SnapInfo snap;
-  version_t version;    // table version
-
- public:
-  ESnap() : LogEvent(EVENT_SNAP) { }
-  ESnap(bool c, SnapInfo &sn, version_t v) : 
-    LogEvent(EVENT_SNAP),
-    create(c), snap(sn), version(v) { }
-
-  void encode(bufferlist& bl) const {
-    ::encode(create, bl);
-    ::encode(snap, bl);
-    ::encode(version, bl);
-  }
-  void decode(bufferlist::iterator &bl) {
-    ::decode(create, bl);
-    ::decode(snap, bl);
-    ::decode(version, bl);
-  }
-
-  void print(ostream& out) {
-    out << "ESnap " << (create ? "create":"remove")
-       << " " << snap 
-       << " v " << version;
-  }  
-
-  void update_segment();
-  void replay(MDS *mds);  
-};
-
-#endif
diff --git a/src/mds/events/ETableClient.h b/src/mds/events/ETableClient.h
new file mode 100644 (file)
index 0000000..0dcc309
--- /dev/null
@@ -0,0 +1,55 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDS_ETABLECLIENT_H
+#define __MDS_ETABLECLIENT_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../mds_table_types.h"
+#include "../LogEvent.h"
+
+struct ETableClient : public LogEvent {
+  __u16 table;
+  __s16 op;
+  version_t tid;
+
+  ETableClient() : LogEvent(EVENT_TABLECLIENT) { }
+  ETableClient(int t, int o, version_t ti) :
+    LogEvent(EVENT_TABLECLIENT),
+    table(t), op(o), tid(ti) { }
+
+  void encode(bufferlist& bl) const {
+    ::encode(table, bl);
+    ::encode(op, bl);
+    ::encode(tid, bl);
+  }
+  void decode(bufferlist::iterator &bl) {
+    ::decode(table, bl);
+    ::decode(op, bl);
+    ::decode(tid, bl);
+  }
+
+  void print(ostream& out) {
+    out << "ETableClient " << get_mdstable_name(table) << " " << get_mdstable_opname(op);
+    if (tid) out << " tid " << tid;
+  }  
+
+  //void update_segment();
+  void replay(MDS *mds);  
+};
+
+#endif
diff --git a/src/mds/events/ETableServer.h b/src/mds/events/ETableServer.h
new file mode 100644 (file)
index 0000000..c0d760f
--- /dev/null
@@ -0,0 +1,70 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDS_ETABLESERVER_H
+#define __MDS_ETABLESERVER_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../mds_table_types.h"
+#include "../LogEvent.h"
+
+struct ETableServer : public LogEvent {
+  __u16 table;
+  __s16 op;
+  __u64 reqid;
+  __s32 bymds;
+  bufferlist mutation;
+  version_t tid;
+  version_t version;
+
+  ETableServer() : LogEvent(EVENT_TABLESERVER) { }
+  ETableServer(int t, int o, __u64 ri, int m, version_t ti, version_t v) :
+    LogEvent(EVENT_TABLESERVER),
+    table(t), op(o), reqid(ri), bymds(m), tid(ti), version(v) { }
+
+  void encode(bufferlist& bl) const {
+    ::encode(table, bl);
+    ::encode(op, bl);
+    ::encode(reqid, bl);
+    ::encode(bymds, bl);
+    ::encode(mutation, bl);
+    ::encode(tid, bl);
+    ::encode(version, bl);
+  }
+  void decode(bufferlist::iterator &bl) {
+    ::decode(table, bl);
+    ::decode(op, bl);
+    ::decode(reqid, bl);
+    ::decode(bymds, bl);
+    ::decode(mutation, bl);
+    ::decode(tid, bl);
+    ::decode(version, bl);
+  }
+
+  void print(ostream& out) {
+    out << "ETableServer " << get_mdstable_name(table) << " " << get_mdstable_opname(op);
+    if (reqid) out << " reqid " << reqid;
+    if (bymds >= 0) out << " mds" << bymds;
+    if (tid) out << " tid " << tid;
+    if (version) out << " version " << version;
+  }  
+
+  void update_segment();
+  void replay(MDS *mds);  
+};
+
+#endif
index 606751452769ab57c3d20e52b6b9515f1c92f45b..bc6eb1ff6ea315e12d468c9ef9ac1340e235c7fe 100644 (file)
 #include "events/EImportFinish.h"
 #include "events/EFragment.h"
 
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
+#include "events/ETableClient.h"
+#include "events/ETableServer.h"
 
-#include "events/ESnap.h"
 
 #include "LogSegment.h"
 
 #include "MDCache.h"
 #include "Server.h"
 #include "Migrator.h"
-#include "AnchorTable.h"
-#include "AnchorClient.h"
 #include "IdAllocator.h"
 #include "SnapTable.h"
 
+#include "MDSTableClient.h"
+#include "MDSTableServer.h"
+
 #include "Locker.h"
 
 
@@ -190,28 +190,32 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
   }
 
   // pending commit atids
-  for (hash_set<version_t>::iterator p = pending_commit_atids.begin();
-       p != pending_commit_atids.end();
+  for (map<int, hash_set<version_t> >::iterator p = pending_commit_tids.begin();
+       p != pending_commit_tids.end();
        ++p) {
-    if (!gather) gather = new C_Gather;
-    assert(!mds->anchorclient->has_committed(*p));
-    dout(10) << "try_to_expire anchor transaction " << *p 
-            << " pending commit (not yet acked), waiting" << dendl;
-    mds->anchorclient->wait_for_ack(*p, gather->new_sub());
+    MDSTableClient *client = mds->get_table_client(p->first);
+    for (hash_set<version_t>::iterator q = p->second.begin();
+        q != p->second.end();
+        ++q) {
+      if (!gather) gather = new C_Gather;
+      assert(!client->has_committed(*q));
+      dout(10) << "try_to_expire " << get_mdstable_name(p->first) << " transaction " << *q 
+              << " pending commit (not yet acked), waiting" << dendl;
+      client->wait_for_ack(*q, gather->new_sub());
+    }
   }
   
-  // anchortable
-  if (anchortablev > mds->anchortable->get_committed_version()) {
-    dout(10) << "try_to_expire waiting for anchor table to save, need " << anchortablev << dendl;
-    if (!gather) gather = new C_Gather;
-    mds->anchortable->save(gather->new_sub());
-  }
-
-  // snaptable
-  if (snaptablev > mds->snaptable->get_committed_version()) {
-    dout(10) << "try_to_expire waiting for snap table to save, need " << snaptablev << dendl;
-    if (!gather) gather = new C_Gather;
-    mds->snaptable->save(gather->new_sub());
+  // table servers
+  for (map<int, version_t>::iterator p = tablev.begin();
+       p != tablev.end();
+       p++) {
+    MDSTableServer *server = mds->get_table_server(p->first);
+    if (p->second > server->get_committed_version()) {
+      dout(10) << "try_to_expire waiting for " << get_mdstable_name(p->first) 
+              << " to save, need " << p->second << dendl;
+      if (!gather) gather = new C_Gather;
+      server->save(gather->new_sub());
+    }
   }
 
   // FIXME client requests...?
@@ -470,12 +474,13 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg)
     }
   }
 
-  // anchor transactions
-  for (list<version_t>::iterator p = atids.begin();
-       p != atids.end();
+  // table client transactions
+  for (list<pair<__u8,version_t> >::iterator p = table_tids.begin();
+       p != table_tids.end();
        ++p) {
-    dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << dendl;
-    mds->anchorclient->got_journaled_agree(*p, logseg);
+    dout(10) << "EMetaBlob.replay noting " << get_mdstable_name(p->first) << " transaction " << p->second << dendl;
+    MDSTableClient *client = mds->get_table_client(p->first);
+    client->got_journaled_agree(p->second, logseg);
   }
 
   // allocated_inos
@@ -580,73 +585,58 @@ void ESessions::replay(MDS *mds)
 
 
 
-// -----------------------
-// EAnchor
 
-void EAnchor::update_segment()
+void ETableServer::update_segment()
 {
-  _segment->anchortablev = version;
+  _segment->tablev[table] = version;
 }
 
-void EAnchor::replay(MDS *mds)
+void ETableServer::replay(MDS *mds)
 {
-  if (mds->anchortable->get_version() >= version) {
-    dout(10) << "EAnchor.replay event " << version
-            << " <= table " << mds->anchortable->get_version() << dendl;
+  MDSTableServer *server = mds->get_table_server(table);
+  if (server->get_version() >= version) {
+    dout(10) << "ETableServer.replay " << get_mdstable_name(table) << " event " << version
+            << " <= table " << server->get_version() << dendl;
     return;
   }
   
-  dout(10) << " EAnchor.replay event " << version
-          << " - 1 == table " << mds->anchortable->get_version() << dendl;
-  assert(version-1 == mds->anchortable->get_version());
+  dout(10) << " ETableServer.replay " << get_mdstable_name(table) << " event " << version
+          << " - 1 == table " << server->get_version() << dendl;
+  assert(version-1 == server->get_version());
   
   switch (op) {
-    // anchortable
-  case ANCHOR_OP_CREATE_PREPARE:
-    mds->anchortable->create_prepare(ino, trace, reqmds);
+  case TABLE_OP_PREPARE:
+    server->_prepare(mutation, reqid, bymds);
     break;
-  case ANCHOR_OP_DESTROY_PREPARE:
-    mds->anchortable->destroy_prepare(ino, reqmds);
+  case TABLE_OP_COMMIT:
+    server->_commit(tid);
     break;
-  case ANCHOR_OP_UPDATE_PREPARE:
-    mds->anchortable->update_prepare(ino, trace, reqmds);
-    break;
-  case ANCHOR_OP_COMMIT:
-    mds->anchortable->commit(atid);
-    break;
-    
   default:
     assert(0);
   }
   
-  assert(version == mds->anchortable->get_version());
+  assert(version == server->get_version());
 }
 
 
-// EAnchorClient
-
-void EAnchorClient::replay(MDS *mds)
+void ETableClient::replay(MDS *mds)
 {
-  dout(10) << " EAnchorClient.replay op " << op << " atid " << atid << dendl;
-    
-  switch (op) {
-    // anchorclient
-  case ANCHOR_OP_ACK:
-    mds->anchorclient->got_journaled_ack(atid);
-    break;
+  dout(10) << " ETableClient.replay " << get_mdstable_name(table)
+          << " op " << get_mdstable_opname(op)
+          << " tid " << tid << dendl;
     
-  default:
-    assert(0);
-  }
+  MDSTableClient *client = mds->get_table_client(table);
+  assert(op == TABLE_OP_ACK);
+  client->got_journaled_ack(tid);
 }
 
 
 // -----------------------
 // ESnap
-
+/*
 void ESnap::update_segment()
 {
-  _segment->snaptablev = version;
+  _segment->tablev[TABLE_SNAP] = version;
 }
 
 void ESnap::replay(MDS *mds)
@@ -671,6 +661,7 @@ void ESnap::replay(MDS *mds)
 
   assert(version == mds->snaptable->get_version());
 }
+*/
 
 
 
diff --git a/src/mds/mds_table_types.h b/src/mds/mds_table_types.h
new file mode 100644 (file)
index 0000000..e013831
--- /dev/null
@@ -0,0 +1,57 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MDSTABLETYPES_H
+#define __MDSTABLETYPES_H
+
+// MDS TABLES
+
+enum {
+  TABLE_ANCHOR,
+  TABLE_SNAP,
+};
+
+inline const char *get_mdstable_name(int t) {
+  switch (t) {
+  case TABLE_ANCHOR: return "anchortable";
+  case TABLE_SNAP: return "snaptable";
+  default: assert(0);
+  }
+}
+
+enum {
+  TABLE_OP_QUERY        =  1,
+  TABLE_OP_QUERY_REPLY  = -2,
+  TABLE_OP_PREPARE      =  3,
+  TABLE_OP_AGREE        = -4,
+  TABLE_OP_COMMIT       =  5,
+  TABLE_OP_ACK          = -6,
+  TABLE_OP_ROLLBACK     =  7,
+};
+
+inline const char *get_mdstable_opname(int op) {
+  switch (op) {
+  case TABLE_OP_QUERY: return "query";
+  case TABLE_OP_QUERY_REPLY: return "query_reply";
+  case TABLE_OP_PREPARE: return "prepare";
+  case TABLE_OP_AGREE: return "agree";
+  case TABLE_OP_COMMIT: return "commit";
+  case TABLE_OP_ACK: return "ack";
+  case TABLE_OP_ROLLBACK: return "rollback";
+  default: assert(0); return 0;
+
+  }
+};
+
+#endif
index 43da6c2fc828f07511790eab2f44e17515d5e743..4c5e389ef769408ba89e2cf37a6710933828eddd 100644 (file)
@@ -53,6 +53,8 @@ using namespace std;
 
 
 
+// CAPS
+
 typedef __u32 capseq_t;
 
 inline string cap_string(int cap)
diff --git a/src/messages/MAnchor.h b/src/messages/MAnchor.h
deleted file mode 100644 (file)
index 6d4049f..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __MANCHORREQUEST_H
-#define __MANCHORREQUEST_H
-
-#include <vector>
-
-#include "msg/Message.h"
-#include "mds/Anchor.h"
-
-
-class MAnchor : public Message {
-  __u32 op;
-  inodeno_t ino;
-  vector<Anchor> trace;
-  version_t atid;  // anchor table version.
-
- public:
-  MAnchor() {}
-  MAnchor(int o, inodeno_t i, version_t v=0) : 
-    Message(MSG_MDS_ANCHOR),
-    op(o), ino(i), atid(v) { }
-  
-  virtual const char *get_type_name() { return "anchor"; }
-  void print(ostream& o) {
-    o << "anchor(" << get_anchor_opname(op);
-    if (ino) o << " " << ino;
-    if (atid) o << " atid " << atid;
-    if (!trace.empty()) o << ' ' << trace;
-    o << ")";
-  }
-
-  void set_trace(vector<Anchor>& trace) { 
-    this->trace = trace; 
-  }
-
-  int get_op() { return op; }
-  inodeno_t get_ino() { return ino; }
-  vector<Anchor>& get_trace() { return trace; }
-  version_t get_atid() { return atid; }
-
-  virtual void decode_payload() {
-    bufferlist::iterator p = payload.begin();
-    ::decode(op, p);
-    ::decode(ino, p);
-    ::decode(atid, p);
-    ::decode(trace, p);
-  }
-
-  virtual void encode_payload() {
-    ::encode(op, payload);
-    ::encode(ino, payload);
-    ::encode(atid, payload);
-    ::encode(trace, payload);
-  }
-};
-
-#endif
diff --git a/src/messages/MMDSTableRequest.h b/src/messages/MMDSTableRequest.h
new file mode 100644 (file)
index 0000000..3b18d22
--- /dev/null
@@ -0,0 +1,61 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MMDSTABLEREQUEST_H
+#define __MMDSTABLEREQUEST_H
+
+#include "msg/Message.h"
+#include "mds/mds_table_types.h"
+
+class MMDSTableRequest : public Message {
+ public:
+  __u16 table;
+  __s16 op;
+  __u64 reqid;
+  version_t tid;
+  bufferlist bl;
+
+  MMDSTableRequest() {}
+  MMDSTableRequest(int tab, int o, __u64 r, version_t v=0) : 
+    Message(MSG_MDS_TABLE_REQUEST),
+    table(tab), op(o), reqid(r), tid(v) { }
+  
+  virtual const char *get_type_name() { return "mds_table_request"; }
+  void print(ostream& o) {
+    o << "mds_table_request(" << get_mdstable_opname(op) << " " << reqid;
+    if (tid) o << " tid " << tid;
+    if (bl.length()) o << " " << bl.length() << " bytes";
+    o << ")";
+  }
+
+  virtual void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(table, p);
+    ::decode(op, p);
+    ::decode(reqid, p);
+    ::decode(tid, p);
+    ::decode(bl, p);
+  }
+
+  virtual void encode_payload() {
+    ::encode(table, payload);
+    ::encode(op, payload);
+    ::encode(reqid, payload);
+    ::encode(tid, payload);
+    ::encode(bl, payload);
+  }
+};
+
+#endif
index 026ab44cad35a2df0a5ab79e01495b86ecff77a0..7c91740ec9a2fad5c9542460c60ec6615a3fd36d 100644 (file)
@@ -96,7 +96,7 @@ using namespace std;
 
 #include "messages/MHeartbeat.h"
 
-#include "messages/MAnchor.h"
+#include "messages/MMDSTableRequest.h"
 
 //#include "messages/MInodeUpdate.h"
 #include "messages/MCacheExpire.h"
@@ -365,8 +365,8 @@ decode_message(ceph_msg_header& env, bufferlist& front, bufferlist& data)
     m = new MCacheExpire();
     break;
 
-  case MSG_MDS_ANCHOR:
-    m = new MAnchor();
+  case MSG_MDS_TABLE_REQUEST:
+    m = new MMDSTableRequest;
     break;
 
        /*  case MSG_MDS_INODEUPDATE:
index 7d84e54c8cad0ca18853283b90a4f5f4e9b4328f..333bb4a621712c527b5e85ae61ae81b219a05806 100644 (file)
@@ -86,8 +86,8 @@
 
 #define MSG_MDS_BEACON             90  // to monitor
 #define MSG_MDS_SLAVE_REQUEST      91
+#define MSG_MDS_TABLE_REQUEST      92
 
-#define MSG_MDS_ANCHOR             0x100
 #define MSG_MDS_HEARTBEAT          0x500  // for mds load balancer