mds/CDentry.cc \
mds/CDir.cc \
mds/CInode.cc \
- mds/AnchorTable.cc \
- mds/AnchorClient.cc \
mds/LogEvent.cc \
mds/MDSTable.cc \
mds/IdAllocator.cc \
+ mds/MDSTableClient.cc \
+ mds/MDSTableServer.cc \
+ mds/AnchorServer.cc \
+ mds/AnchorClient.cc \
mds/SnapTable.cc \
mds/snap.cc \
mds/SessionMap.cc \
kernel/super.h\
mds/Anchor.h\
mds/AnchorClient.h\
- mds/AnchorTable.h\
mds/CDentry.h\
mds/CDir.h\
mds/IdAllocator.h\
mds/Migrator.h\
mds/ScatterLock.h\
mds/SimpleLock.h\
- mds/events/EAnchor.h\
- mds/events/EAnchorClient.h\
mds/events/ESessions.h\
mds/events/EUpdate.h\
mds/events/EExport.h\
messages/MOSDBoot.h\
messages/MOSDPGCreate.h\
messages/MStatfsReply.h\
- messages/MAnchor.h\
messages/MCacheExpire.h\
+ messages/MMDSTableRequest.h\
messages/MDiscoverReply.h\
messages/MOSDFailure.h\
messages/MExportDir.h\
#include "mdstypes.h"
#include "include/buffer.h"
-// anchor ops
-#define ANCHOR_OP_LOOKUP 1
-#define ANCHOR_OP_LOOKUP_REPLY -2
-
-#define ANCHOR_OP_CREATE_PREPARE 11
-#define ANCHOR_OP_CREATE_AGREE -12
-
-#define ANCHOR_OP_DESTROY_PREPARE 21
-#define ANCHOR_OP_DESTROY_AGREE -22
-
-#define ANCHOR_OP_UPDATE_PREPARE 31
-#define ANCHOR_OP_UPDATE_AGREE -32
-
-#define ANCHOR_OP_COMMIT 41
-#define ANCHOR_OP_ACK -42
-#define ANCHOR_OP_ROLLBACK 43
-
-
+enum {
+ ANCHOR_OP_CREATE,
+ ANCHOR_OP_DESTROY,
+ ANCHOR_OP_UPDATE,
+};
inline const char* get_anchor_opname(int o) {
switch (o) {
- case ANCHOR_OP_LOOKUP: return "lookup";
- case ANCHOR_OP_LOOKUP_REPLY: return "lookup_reply";
-
- case ANCHOR_OP_CREATE_PREPARE: return "create_prepare";
- case ANCHOR_OP_CREATE_AGREE: return "create_agree";
- case ANCHOR_OP_DESTROY_PREPARE: return "destroy_prepare";
- case ANCHOR_OP_DESTROY_AGREE: return "destroy_agree";
- case ANCHOR_OP_UPDATE_PREPARE: return "update_prepare";
- case ANCHOR_OP_UPDATE_AGREE: return "update_agree";
-
- case ANCHOR_OP_COMMIT: return "commit";
- case ANCHOR_OP_ACK: return "ack";
- case ANCHOR_OP_ROLLBACK: return "rollback";
+ case ANCHOR_OP_CREATE: return "create";
+ case ANCHOR_OP_DESTROY: return "destroy";
+ case ANCHOR_OP_UPDATE: return "update";
default: assert(0); return 0;
}
}
using std::cout;
using std::cerr;
-#include "Anchor.h"
#include "AnchorClient.h"
#include "MDSMap.h"
-
-#include "include/Context.h"
-#include "msg/Messenger.h"
-
-#include "MDS.h"
-#include "MDLog.h"
#include "LogSegment.h"
+#include "MDS.h"
+#include "msg/Messenger.h"
-#include "events/EAnchorClient.h"
-#include "messages/MAnchor.h"
+#include "messages/MMDSTableRequest.h"
#include "config.h"
#define derr(x) if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchorclient "
-void AnchorClient::dispatch(Message *m)
-{
- switch (m->get_type()) {
- case MSG_MDS_ANCHOR:
- handle_anchor_reply((MAnchor*)m);
- break;
- default:
- assert(0);
- }
-}
+// LOOKUPS
-void AnchorClient::handle_anchor_reply(class MAnchor *m)
+void AnchorClient::handle_query_result(class MMDSTableRequest *m)
{
- inodeno_t ino = m->get_ino();
- version_t atid = m->get_atid();
-
dout(10) << "handle_anchor_reply " << *m << dendl;
- switch (m->get_op()) {
-
- // lookup
- case ANCHOR_OP_LOOKUP_REPLY:
- assert(pending_lookup.count(ino));
- {
- *pending_lookup[ino].trace = m->get_trace();
- Context *onfinish = pending_lookup[ino].onfinish;
- pending_lookup.erase(ino);
-
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
- }
- break;
+ inodeno_t ino;
+ vector<Anchor> trace;
- // prepare -> agree
- case ANCHOR_OP_CREATE_AGREE:
- if (pending_create_prepare.count(ino)) {
- dout(10) << "got create_agree on " << ino << " atid " << atid << dendl;
- Context *onfinish = pending_create_prepare[ino].onfinish;
- *pending_create_prepare[ino].patid = atid;
- pending_create_prepare.erase(ino);
+ bufferlist::iterator p = m->bl.begin();
+ ::decode(ino, p);
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
- }
- else if (pending_commit.count(atid)) {
- dout(10) << "stray create_agree on " << ino
- << " atid " << atid
- << ", already committing, resending COMMIT"
- << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
- }
- else {
- dout(10) << "stray create_agree on " << ino
- << " atid " << atid
- << ", sending ROLLBACK"
- << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
- }
- break;
-
- case ANCHOR_OP_DESTROY_AGREE:
- if (pending_destroy_prepare.count(ino)) {
- dout(10) << "got destroy_agree on " << ino << " atid " << atid << dendl;
- Context *onfinish = pending_destroy_prepare[ino].onfinish;
- *pending_destroy_prepare[ino].patid = atid;
- pending_destroy_prepare.erase(ino);
-
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
- }
- else if (pending_commit.count(atid)) {
- dout(10) << "stray destroy_agree on " << ino
- << " atid " << atid
- << ", already committing, resending COMMIT"
- << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
- }
- else {
- dout(10) << "stray destroy_agree on " << ino
- << " atid " << atid
- << ", sending ROLLBACK"
- << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
- }
- break;
-
- case ANCHOR_OP_UPDATE_AGREE:
- if (pending_update_prepare.count(ino)) {
- dout(10) << "got update_agree on " << ino << " atid " << atid << dendl;
- Context *onfinish = pending_update_prepare[ino].onfinish;
- *pending_update_prepare[ino].patid = atid;
- pending_update_prepare.erase(ino);
-
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
- }
- else if (pending_commit.count(atid)) {
- dout(10) << "stray update_agree on " << ino
- << " atid " << atid
- << ", already committing, resending COMMIT"
- << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
- }
- else {
- dout(10) << "stray update_agree on " << ino
- << " atid " << atid
- << ", sending ROLLBACK"
- << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
- }
- break;
-
- // commit -> ack
- case ANCHOR_OP_ACK:
- {
- dout(10) << "got ack on atid " << atid << ", logging" << dendl;
-
- // remove from committing list
- assert(pending_commit.count(atid));
- assert(pending_commit[atid]->pending_commit_atids.count(atid));
-
- // log ACK.
- mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid),
- new C_LoggedAck(this, atid));
- }
- break;
-
- default:
- assert(0);
+ assert(pending_lookup.count(ino));
+ ::decode(*pending_lookup[ino].trace, p);
+ Context *onfinish = pending_lookup[ino].onfinish;
+ pending_lookup.erase(ino);
+
+ if (onfinish) {
+ onfinish->finish(0);
+ delete onfinish;
}
delete m;
}
-
-void AnchorClient::_logged_ack(version_t atid)
+void AnchorClient::resend_queries()
{
- dout(10) << "_logged_ack" << dendl;
-
- assert(pending_commit.count(atid));
- assert(pending_commit[atid]->pending_commit_atids.count(atid));
-
- pending_commit[atid]->pending_commit_atids.erase(atid);
- pending_commit.erase(atid);
-
- // kick any waiters (LogSegment trim)
- if (ack_waiters.count(atid)) {
- dout(15) << "kicking ack waiters on atid " << atid << dendl;
- mds->queue_waiters(ack_waiters[atid]);
- ack_waiters.erase(atid);
+ // resend any pending lookups.
+ for (hash_map<inodeno_t, _pending_lookup>::iterator p = pending_lookup.begin();
+ p != pending_lookup.end();
+ p++) {
+ dout(10) << "resending lookup on " << p->first << dendl;
+ _lookup(p->first);
}
}
-
-/*
- * public async interface
- */
-
-
-/*
- * FIXME: we need to be able to resubmit messages if the anchortable mds fails.
- */
-
-
void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
{
- // send message
- MAnchor *req = new MAnchor(ANCHOR_OP_LOOKUP, ino);
-
assert(pending_lookup.count(ino) == 0);
- pending_lookup[ino].onfinish = onfinish;
- pending_lookup[ino].trace = &trace;
+ _pending_lookup& l = pending_lookup[ino];
+ l.onfinish = onfinish;
+ l.trace = &trace;
+ _lookup(ino);
+}
- mds->send_message_mds(req,
- mds->mdsmap->get_anchortable());
+void AnchorClient::_lookup(inodeno_t ino)
+{
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_QUERY, 0, 0);
+ ::encode(ino, req->bl);
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
}
-// PREPARE
+// FRIENDLY PREPARE
void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace,
version_t *patid, Context *onfinish)
{
dout(10) << "prepare_create " << ino << " " << trace << dendl;
-
- // send message
- MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_PREPARE, ino);
- req->set_trace(trace);
-
- pending_create_prepare[ino].trace = trace;
- pending_create_prepare[ino].patid = patid;
- pending_create_prepare[ino].onfinish = onfinish;
-
- mds->send_message_mds(req,
- mds->mdsmap->get_anchortable());
+ bufferlist bl;
+ __u32 op = ANCHOR_OP_CREATE;
+ ::encode(op, bl);
+ ::encode(ino, bl);
+ ::encode(trace, bl);
+ _prepare(bl, patid, onfinish);
}
void AnchorClient::prepare_destroy(inodeno_t ino,
- version_t *patid, Context *onfinish)
+ version_t *patid, Context *onfinish)
{
dout(10) << "prepare_destroy " << ino << dendl;
-
- // send message
- MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_PREPARE, ino);
- pending_destroy_prepare[ino].onfinish = onfinish;
- pending_destroy_prepare[ino].patid = patid;
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
+ bufferlist bl;
+ __u32 op = ANCHOR_OP_DESTROY;
+ ::encode(op, bl);
+ ::encode(ino, bl);
+ _prepare(bl, patid, onfinish);
}
version_t *patid, Context *onfinish)
{
dout(10) << "prepare_update " << ino << " " << trace << dendl;
-
- // send message
- MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_PREPARE, ino);
- req->set_trace(trace);
-
- pending_update_prepare[ino].trace = trace;
- pending_update_prepare[ino].patid = patid;
- pending_update_prepare[ino].onfinish = onfinish;
-
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-}
-
-
-// COMMIT
-
-void AnchorClient::commit(version_t atid, LogSegment *ls)
-{
- dout(10) << "commit " << atid << dendl;
-
- assert(pending_commit.count(atid) == 0);
- pending_commit[atid] = ls;
- ls->pending_commit_atids.insert(atid);
-
- // send message
- MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
-}
-
-
-
-// RECOVERY
-
-void AnchorClient::finish_recovery()
-{
- dout(7) << "finish_recovery" << dendl;
-
- resend_commits();
-}
-
-void AnchorClient::resend_commits()
-{
- for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
- p != pending_commit.end();
- ++p) {
- dout(10) << "resending commit on " << p->first << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first);
- mds->send_message_mds(req,
- mds->mdsmap->get_anchortable());
- }
-}
-
-void AnchorClient::resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op)
-{
- for (hash_map<inodeno_t, _pending_prepare>::iterator p = prepares.begin();
- p != prepares.end();
- p++) {
- dout(10) << "resending " << get_anchor_opname(op) << " on " << p->first << dendl;
- MAnchor *req = new MAnchor(op, p->first);
- req->set_trace(p->second.trace);
- mds->send_message_mds(req,
- mds->mdsmap->get_anchortable());
- }
-}
-
-
-void AnchorClient::handle_mds_recovery(int who)
-{
- dout(7) << "handle_mds_recovery mds" << who << dendl;
-
- if (who != mds->mdsmap->get_anchortable())
- return; // do nothing.
-
- // resend any pending lookups.
- for (hash_map<inodeno_t, _pending_lookup>::iterator p = pending_lookup.begin();
- p != pending_lookup.end();
- p++) {
- dout(10) << "resending lookup on " << p->first << dendl;
- mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first),
- mds->mdsmap->get_anchortable());
- }
-
- // resend any pending prepares.
- resend_prepares(pending_create_prepare, ANCHOR_OP_CREATE_PREPARE);
- resend_prepares(pending_update_prepare, ANCHOR_OP_UPDATE_PREPARE);
- resend_prepares(pending_destroy_prepare, ANCHOR_OP_DESTROY_PREPARE);
-
- // resend any pending commits.
- resend_commits();
+ bufferlist bl;
+ __u32 op = ANCHOR_OP_UPDATE;
+ ::encode(op, bl);
+ ::encode(ino, bl);
+ _prepare(bl, patid, onfinish);
}
#ifndef __ANCHORCLIENT_H
#define __ANCHORCLIENT_H
-#include <vector>
-using std::vector;
#include <ext/hash_map>
using __gnu_cxx::hash_map;
-#include "include/types.h"
-#include "msg/Dispatcher.h"
-
+#include "MDSTableClient.h"
#include "Anchor.h"
class Context;
class MDS;
class LogSegment;
-class AnchorClient : public Dispatcher {
- MDS *mds;
-
+class AnchorClient : public MDSTableClient {
// lookups
- struct _pending_lookup {
+ struct _pending_lookup {
vector<Anchor> *trace;
Context *onfinish;
};
hash_map<inodeno_t, _pending_lookup> pending_lookup;
- // prepares
- struct _pending_prepare {
- vector<Anchor> trace;
- Context *onfinish;
- version_t *patid; // ptr to atid
- };
- hash_map<inodeno_t, _pending_prepare> pending_create_prepare;
- hash_map<inodeno_t, _pending_prepare> pending_destroy_prepare;
- hash_map<inodeno_t, _pending_prepare> pending_update_prepare;
-
- // pending commits
- map<version_t, LogSegment*> pending_commit;
- map<version_t, list<Context*> > ack_waiters;
-
- void handle_anchor_reply(class MAnchor *m);
-
- class C_LoggedAck : public Context {
- AnchorClient *ac;
- version_t atid;
- public:
- C_LoggedAck(AnchorClient *a, version_t t) : ac(a), atid(t) {}
- void finish(int r) {
- ac->_logged_ack(atid);
- }
- };
- void _logged_ack(version_t atid);
-
public:
- AnchorClient(MDS *m) : mds(m) {}
+ AnchorClient(MDS *m) : MDSTableClient(m, TABLE_ANCHOR) {}
- void dispatch(Message *m);
-
- // async user interface
+ void handle_query_result(MMDSTableRequest *m);
void lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
+ void _lookup(inodeno_t ino);
+ void resend_queries();
void prepare_create(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish);
void prepare_update(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
-
- void commit(version_t atid, LogSegment *ls);
-
- // for recovery (by other nodes)
- void handle_mds_recovery(int mds); // called when someone else recovers
-
- void resend_commits();
- void resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op);
-
- // for recovery (by me)
- void got_journaled_agree(version_t atid, LogSegment *ls) {
- pending_commit[atid] = ls;
- }
- void got_journaled_ack(version_t atid) {
- pending_commit.erase(atid);
- }
- bool has_committed(version_t atid) {
- return pending_commit.count(atid) == 0;
- }
- void wait_for_ack(version_t atid, Context *c) {
- ack_waiters[atid].push_back(c);
- }
- void finish_recovery(); // called when i recover and go active
-
-
};
#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "AnchorServer.h"
+#include "MDS.h"
+#include "msg/Messenger.h"
+#include "messages/MMDSTableRequest.h"
+
+#define dout(x) if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchorserver "
+#define derr(x) if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchorserver "
+
+// table
+
+void AnchorServer::init_inode()
+{
+ ino = MDS_INO_ANCHORTABLE;
+ layout = g_default_file_layout;
+}
+
+void AnchorServer::reset_state()
+{
+ anchor_map.clear();
+ pending_create.clear();
+ pending_destroy.clear();
+ pending_update.clear();
+ pending_for_mds.clear();
+}
+
+void AnchorServer::dump()
+{
+ dout(7) << "dump v " << version << dendl;
+ for (hash_map<inodeno_t, Anchor>::iterator it = anchor_map.begin();
+ it != anchor_map.end();
+ it++)
+ dout(15) << "dump " << it->second << dendl;
+}
+
+
+
+/*
+ * basic updates
+ */
+
+bool AnchorServer::add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash)
+{
+ //dout(17) << "add " << ino << " dirfrag " << dirfrag << dendl;
+
+ // parent should be there
+ assert(dirino < MDS_INO_BASE || // base case,
+ anchor_map.count(dirino)); // or have it
+
+ if (anchor_map.count(ino) == 0) {
+ // new item
+ anchor_map[ino] = Anchor(ino, dirino, dn_hash);
+ dout(7) << "add added " << anchor_map[ino] << dendl;
+ return true;
+ } else {
+ dout(7) << "add had " << anchor_map[ino] << dendl;
+ return false;
+ }
+}
+
+void AnchorServer::inc(inodeno_t ino)
+{
+ dout(7) << "inc " << ino << dendl;
+
+ assert(anchor_map.count(ino));
+
+ while (1) {
+ Anchor &anchor = anchor_map[ino];
+ anchor.nref++;
+
+ dout(10) << "inc now " << anchor << dendl;
+ ino = anchor.dirino;
+
+ if (ino == 0) break;
+ if (anchor_map.count(ino) == 0) break;
+ }
+}
+
+void AnchorServer::dec(inodeno_t ino)
+{
+ dout(7) << "dec " << ino << dendl;
+ assert(anchor_map.count(ino));
+
+ while (true) {
+ Anchor &anchor = anchor_map[ino];
+ anchor.nref--;
+
+ if (anchor.nref == 0) {
+ dout(10) << "dec removing " << anchor << dendl;
+ inodeno_t dirino = anchor.dirino;
+ anchor_map.erase(ino);
+ ino = dirino;
+ } else {
+ dout(10) << "dec now " << anchor << dendl;
+ ino = anchor.dirino;
+ }
+
+ if (ino == 0) break;
+ if (anchor_map.count(ino) == 0) break;
+ }
+}
+
+
+// server
+
+void AnchorServer::_prepare(bufferlist &bl, __u64 reqid, int bymds)
+{
+ bufferlist::iterator p = bl.begin();
+ __u32 what;
+ inodeno_t ino;
+ vector<Anchor> trace;
+ ::decode(what, p);
+ ::decode(ino, p);
+ ::decode(trace, p);
+
+ switch (what) {
+ case ANCHOR_OP_CREATE:
+ version++;
+
+ // make sure trace is in table
+ for (unsigned i=0; i<trace.size(); i++)
+ add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
+ inc(ino);
+ pending_create[version] = ino; // so we can undo
+ break;
+
+
+ case ANCHOR_OP_DESTROY:
+ version++;
+ pending_destroy[version] = ino;
+ break;
+
+ case ANCHOR_OP_UPDATE:
+ version++;
+ pending_update[version].first = ino;
+ pending_update[version].second = trace;
+ break;
+
+ default:
+ assert(0);
+ }
+ //dump();
+}
+
+bool AnchorServer::_is_prepared(version_t tid)
+{
+ return
+ pending_create.count(tid) ||
+ pending_destroy.count(tid) ||
+ pending_update.count(tid);
+}
+
+void AnchorServer::_commit(version_t tid)
+{
+ if (pending_create.count(tid)) {
+ dout(7) << "commit " << tid << " create " << pending_create[tid] << dendl;
+ pending_create.erase(tid);
+ }
+
+ else if (pending_destroy.count(tid)) {
+ inodeno_t ino = pending_destroy[tid];
+ dout(7) << "commit " << tid << " destroy " << ino << dendl;
+
+ dec(ino); // destroy
+
+ pending_destroy.erase(tid);
+ }
+
+ else if (pending_update.count(tid)) {
+ inodeno_t ino = pending_update[tid].first;
+ vector<Anchor> &trace = pending_update[tid].second;
+
+ if (anchor_map.count(ino)) {
+ dout(7) << "commit " << tid << " update " << ino << dendl;
+
+ // remove old
+ dec(ino);
+
+ // add new
+ for (unsigned i=0; i<trace.size(); i++)
+ add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
+ inc(ino);
+ } else {
+ dout(7) << "commit " << tid << " update " << ino << " -- DNE" << dendl;
+ }
+
+ pending_update.erase(tid);
+ }
+ else
+ assert(0);
+
+ pending_for_mds.erase(tid);
+
+ // bump version.
+ version++;
+ //dump();
+}
+
+void AnchorServer::_rollback(version_t tid)
+{
+ if (pending_create.count(tid)) {
+ inodeno_t ino = pending_create[tid];
+ dout(7) << "rollback " << tid << " create " << ino << dendl;
+ dec(ino);
+ pending_create.erase(tid);
+ }
+
+ else if (pending_destroy.count(tid)) {
+ inodeno_t ino = pending_destroy[tid];
+ dout(7) << "rollback " << tid << " destroy " << ino << dendl;
+ pending_destroy.erase(tid);
+ }
+
+ else if (pending_update.count(tid)) {
+ inodeno_t ino = pending_update[tid].first;
+ dout(7) << "rollback " << tid << " update " << ino << dendl;
+ pending_update.erase(tid);
+ }
+ else
+ assert(0);
+
+ pending_for_mds.erase(tid);
+
+ // bump version.
+ version++;
+ //dump();
+}
+
+
+void AnchorServer::handle_query(MMDSTableRequest *req)
+{
+ bufferlist::iterator p = req->bl.begin();
+ inodeno_t curino;
+ ::decode(curino, p);
+ dout(7) << "handle_lookup " << *req << " ino " << curino << dendl;
+
+ vector<Anchor> trace;
+ while (true) {
+ assert(anchor_map.count(curino) == 1);
+ Anchor &anchor = anchor_map[curino];
+
+ dout(10) << "handle_lookup adding " << anchor << dendl;
+ trace.insert(trace.begin(), anchor); // lame FIXME
+
+ if (anchor.dirino < MDS_INO_BASE) break;
+ curino = anchor.dirino;
+ }
+
+ // reply
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_QUERY_REPLY, req->reqid, version);
+ ::encode(curino, req->bl);
+ ::encode(trace, req->bl);
+ mds->send_message_mds(reply, req->get_source().num());
+
+ delete req;
+}
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __ANCHORSERVER_H
+#define __ANCHORSERVER_H
+
+#include "MDSTableServer.h"
+#include "Anchor.h"
+
+class AnchorServer : public MDSTableServer {
+ public:
+ AnchorServer(MDS *mds) :
+ MDSTableServer(mds, TABLE_ANCHOR) {}
+
+ // table bits
+ hash_map<inodeno_t, Anchor> anchor_map;
+
+ // uncommitted operations
+ map<version_t, inodeno_t> pending_create;
+ map<version_t, inodeno_t> pending_destroy;
+ map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
+
+ void init_inode();
+ void reset_state();
+ void encode_state(bufferlist& bl) {
+ ::encode(anchor_map, bl);
+ ::encode(pending_create, bl);
+ ::encode(pending_destroy, bl);
+ ::encode(pending_update, bl);
+ ::encode(pending_for_mds, bl);
+ }
+ void decode_state(bufferlist::iterator& p) {
+ ::decode(anchor_map, p);
+ ::decode(pending_create, p);
+ ::decode(pending_destroy, p);
+ ::decode(pending_update, p);
+ ::decode(pending_for_mds, p);
+ }
+
+ bool add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash);
+ void inc(inodeno_t ino);
+ void dec(inodeno_t ino);
+
+ void dump();
+
+ // server bits
+ void _prepare(bufferlist &bl, __u64 reqid, int bymds);
+ bool _is_prepared(version_t tid);
+ void _commit(version_t tid);
+ void _rollback(version_t tid);
+ void handle_query(MMDSTableRequest *m);
+};
+
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "AnchorTable.h"
-#include "MDS.h"
-
-#include "osdc/Filer.h"
-
-#include "msg/Messenger.h"
-#include "messages/MAnchor.h"
-
-#include "common/Clock.h"
-
-#include "MDLog.h"
-#include "events/EAnchor.h"
-
-#include "config.h"
-
-#define dout(x) if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable "
-#define derr(x) if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".anchortable "
-
-
-void AnchorTable::dump()
-{
- dout(7) << "dump v " << version << dendl;
- for (hash_map<inodeno_t, Anchor>::iterator it = anchor_map.begin();
- it != anchor_map.end();
- it++)
- dout(15) << "dump " << it->second << dendl;
-}
-
-
-/*
- * basic updates
- */
-
-bool AnchorTable::add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash)
-{
- //dout(17) << "add " << ino << " dirfrag " << dirfrag << dendl;
-
- // parent should be there
- assert(dirino < MDS_INO_BASE || // base case,
- anchor_map.count(dirino)); // or have it
-
- if (anchor_map.count(ino) == 0) {
- // new item
- anchor_map[ino] = Anchor(ino, dirino, dn_hash);
- dout(7) << "add added " << anchor_map[ino] << dendl;
- return true;
- } else {
- dout(7) << "add had " << anchor_map[ino] << dendl;
- return false;
- }
-}
-
-void AnchorTable::inc(inodeno_t ino)
-{
- dout(7) << "inc " << ino << dendl;
-
- assert(anchor_map.count(ino));
-
- while (1) {
- Anchor &anchor = anchor_map[ino];
- anchor.nref++;
-
- dout(10) << "inc now " << anchor << dendl;
- ino = anchor.dirino;
-
- if (ino == 0) break;
- if (anchor_map.count(ino) == 0) break;
- }
-}
-
-void AnchorTable::dec(inodeno_t ino)
-{
- dout(7) << "dec " << ino << dendl;
- assert(anchor_map.count(ino));
-
- while (true) {
- Anchor &anchor = anchor_map[ino];
- anchor.nref--;
-
- if (anchor.nref == 0) {
- dout(10) << "dec removing " << anchor << dendl;
- inodeno_t dirino = anchor.dirino;
- anchor_map.erase(ino);
- ino = dirino;
- } else {
- dout(10) << "dec now " << anchor << dendl;
- ino = anchor.dirino;
- }
-
- if (ino == 0) break;
- if (anchor_map.count(ino) == 0) break;
- }
-}
-
-
-/*
- * high level
- */
-
-
-// LOOKUP
-
-void AnchorTable::handle_lookup(MAnchor *req)
-{
- inodeno_t curino = req->get_ino();
- dout(7) << "handle_lookup " << curino << dendl;
-
- vector<Anchor> trace;
- while (true) {
- assert(anchor_map.count(curino) == 1);
- Anchor &anchor = anchor_map[curino];
-
- dout(10) << "handle_lookup adding " << anchor << dendl;
- trace.insert(trace.begin(), anchor); // lame FIXME
-
- if (anchor.dirino < MDS_INO_BASE) break;
- curino = anchor.dirino;
- }
-
- // reply
- MAnchor *reply = new MAnchor(ANCHOR_OP_LOOKUP_REPLY, req->get_ino());
- reply->set_trace(trace);
- mds->messenger->send_message(reply, req->get_source_inst());
-
- delete req;
-}
-
-
-// MIDLEVEL
-
-void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
-{
- // make sure trace is in table
- for (unsigned i=0; i<trace.size(); i++)
- add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
- inc(ino);
-
- version++;
- pending_create[version] = ino; // so we can undo
- pending_reqmds[version] = reqmds;
- //dump();
-}
-
-void AnchorTable::destroy_prepare(inodeno_t ino, int reqmds)
-{
- version++;
- pending_destroy[version] = ino;
- pending_reqmds[version] = reqmds;
- //dump();
-}
-
-void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
-{
- version++;
- pending_update[version].first = ino;
- pending_update[version].second = trace;
- pending_reqmds[version] = reqmds;
- //dump();
-}
-
-void AnchorTable::commit(version_t atid)
-{
- if (pending_create.count(atid)) {
- dout(7) << "commit " << atid << " create " << pending_create[atid] << dendl;
- pending_create.erase(atid);
- }
-
- else if (pending_destroy.count(atid)) {
- inodeno_t ino = pending_destroy[atid];
- dout(7) << "commit " << atid << " destroy " << ino << dendl;
-
- dec(ino); // destroy
-
- pending_destroy.erase(atid);
- }
-
- else if (pending_update.count(atid)) {
- inodeno_t ino = pending_update[atid].first;
- vector<Anchor> &trace = pending_update[atid].second;
-
- if (anchor_map.count(ino)) {
- dout(7) << "commit " << atid << " update " << ino << dendl;
-
- // remove old
- dec(ino);
-
- // add new
- for (unsigned i=0; i<trace.size(); i++)
- add(trace[i].ino, trace[i].dirino, trace[i].dn_hash);
- inc(ino);
- } else {
- dout(7) << "commit " << atid << " update " << ino << " -- DNE" << dendl;
- }
-
- pending_update.erase(atid);
- }
- else
- assert(0);
-
- pending_reqmds.erase(atid);
-
- // bump version.
- version++;
- //dump();
-}
-
-void AnchorTable::rollback(version_t atid)
-{
- if (pending_create.count(atid)) {
- inodeno_t ino = pending_create[atid];
- dout(7) << "rollback " << atid << " create " << ino << dendl;
- dec(ino);
- pending_create.erase(atid);
- }
-
- else if (pending_destroy.count(atid)) {
- inodeno_t ino = pending_destroy[atid];
- dout(7) << "rollback " << atid << " destroy " << ino << dendl;
- pending_destroy.erase(atid);
- }
-
- else if (pending_update.count(atid)) {
- inodeno_t ino = pending_update[atid].first;
- dout(7) << "rollback " << atid << " update " << ino << dendl;
- pending_update.erase(atid);
- }
- else
- assert(0);
-
- pending_reqmds.erase(atid);
-
- // bump version.
- version++;
- //dump();
-}
-
-
-
-
-// CREATE
-
-class C_AT_CreatePrepare : public Context {
- AnchorTable *at;
- MAnchor *req;
- version_t atid;
-public:
- C_AT_CreatePrepare(AnchorTable *a, MAnchor *r, version_t t) :
- at(a), req(r), atid(t) { }
- void finish(int r) {
- at->_create_prepare_logged(req, atid);
- }
-};
-
-void AnchorTable::handle_create_prepare(MAnchor *req)
-{
- inodeno_t ino = req->get_ino();
- vector<Anchor>& trace = req->get_trace();
-
- dout(7) << "handle_create_prepare " << ino << dendl;
-
- create_prepare(ino, trace, req->get_source().num());
-
- // log it
- EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version, req->get_source().num());
- le->set_trace(trace);
- mds->mdlog->submit_entry(le,
- new C_AT_CreatePrepare(this, req, version));
-}
-
-void AnchorTable::_create_prepare_logged(MAnchor *req, version_t atid)
-{
- inodeno_t ino = req->get_ino();
- dout(7) << "_create_prepare_logged " << ino << " atid " << atid << dendl;
-
- // reply
- MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid);
- mds->messenger->send_message(reply, req->get_source_inst());
-
- delete req;
-}
-
-
-
-
-// DESTROY
-
-class C_AT_DestroyPrepare : public Context {
- AnchorTable *at;
- MAnchor *req;
- version_t atid;
-public:
- C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r, version_t t) :
- at(a), req(r), atid(t) { }
- void finish(int r) {
- at->_destroy_prepare_logged(req, atid);
- }
-};
-
-void AnchorTable::handle_destroy_prepare(MAnchor *req)
-{
- inodeno_t ino = req->get_ino();
- dout(7) << "handle_destroy_prepare " << ino << dendl;
-
- destroy_prepare(ino, req->get_source().num());
-
- mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version, req->get_source().num()),
- new C_AT_DestroyPrepare(this, req, version));
-}
-
-void AnchorTable::_destroy_prepare_logged(MAnchor *req, version_t atid)
-{
- inodeno_t ino = req->get_ino();
- dout(7) << "_destroy_prepare_logged " << ino << " atid " << atid << dendl;
-
- // reply
- MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid);
- mds->messenger->send_message(reply, req->get_source_inst());
- delete req;
-}
-
-
-
-// UPDATE
-
-class C_AT_UpdatePrepare : public Context {
- AnchorTable *at;
- MAnchor *req;
- version_t atid;
-public:
- C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r, version_t t) :
- at(a), req(r), atid(t) { }
- void finish(int r) {
- at->_update_prepare_logged(req, atid);
- }
-};
-
-void AnchorTable::handle_update_prepare(MAnchor *req)
-{
- inodeno_t ino = req->get_ino();
- vector<Anchor>& trace = req->get_trace();
-
- dout(7) << "handle_update_prepare " << ino << dendl;
-
- update_prepare(ino, trace, req->get_source().num());
-
- // log it
- EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version, req->get_source().num());
- le->set_trace(trace);
- mds->mdlog->submit_entry(le,
- new C_AT_UpdatePrepare(this, req, version));
-}
-
-void AnchorTable::_update_prepare_logged(MAnchor *req, version_t atid)
-{
- inodeno_t ino = req->get_ino();
- dout(7) << "_update_prepare_logged " << ino << " atid " << atid << dendl;
-
- // reply
- MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid);
- mds->messenger->send_message(reply, req->get_source_inst());
- delete req;
-}
-
-
-
-// COMMIT
-
-class C_AT_Commit : public Context {
- AnchorTable *at;
- MAnchor *req;
-public:
- C_AT_Commit(AnchorTable *a, MAnchor *r) :
- at(a), req(r) { }
- void finish(int r) {
- at->_commit_logged(req);
- }
-};
-
-void AnchorTable::handle_commit(MAnchor *req)
-{
- version_t atid = req->get_atid();
- dout(7) << "handle_commit " << atid << dendl;
-
- if (pending_create.count(atid) ||
- pending_destroy.count(atid) ||
- pending_update.count(atid)) {
- commit(atid);
- mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_COMMIT, atid, version));
- }
- else if (atid <= version) {
- dout(0) << "got commit for atid " << atid << " <= " << version
- << ", already committed, sending ack."
- << dendl;
- MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid);
- mds->messenger->send_message(reply, req->get_source_inst());
- delete req;
- return;
- }
- else {
- // wtf.
- dout(0) << "got commit for atid " << atid << " > " << version << dendl;
- assert(atid <= version);
- }
-
- // wait for it to journal
- mds->mdlog->wait_for_sync(new C_AT_Commit(this, req));
-}
-
-
-void AnchorTable::_commit_logged(MAnchor *req)
-{
- dout(7) << "_commit_logged, sending ACK" << dendl;
- MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid());
- mds->messenger->send_message(reply, req->get_source_inst());
- delete req;
-}
-
-
-
-// ROLLBACK
-
-void AnchorTable::handle_rollback(MAnchor *req)
-{
- version_t atid = req->get_atid();
- dout(7) << "handle_rollback " << atid << dendl;
- rollback(atid);
- delete req;
-}
-
-
-
-/*
- * messages
- */
-
-void AnchorTable::dispatch(Message *m)
-{
- switch (m->get_type()) {
- case MSG_MDS_ANCHOR:
- handle_anchor_request((MAnchor*)m);
- break;
-
- default:
- assert(0);
- }
-}
-
-
-void AnchorTable::handle_anchor_request(class MAnchor *req)
-{
- // make sure i'm open!
- if (!opened) {
- dout(7) << "not open yet" << dendl;
-
- waiting_for_open.push_back(new C_MDS_RetryMessage(mds, req));
-
- if (!opening) {
- opening = true;
- load(0);
- }
- return;
- }
-
- dout(10) << "handle_anchor_request " << *req << dendl;
-
- // go
- switch (req->get_op()) {
-
- case ANCHOR_OP_LOOKUP:
- handle_lookup(req);
- break;
-
- case ANCHOR_OP_CREATE_PREPARE:
- handle_create_prepare(req);
- break;
- case ANCHOR_OP_DESTROY_PREPARE:
- handle_destroy_prepare(req);
- break;
- case ANCHOR_OP_UPDATE_PREPARE:
- handle_update_prepare(req);
- break;
-
- case ANCHOR_OP_COMMIT:
- handle_commit(req);
- break;
-
- case ANCHOR_OP_ROLLBACK:
- handle_rollback(req);
- break;
-
- default:
- assert(0);
- }
-
-}
-
-
-
-
-// primitive load/save for now!
-
-// load/save entire table for now!
-
-class C_AT_Saved : public Context {
- AnchorTable *at;
- version_t version;
-public:
- C_AT_Saved(AnchorTable *a, version_t v) : at(a), version(v) {}
- void finish(int r) {
- at->_saved(version);
- }
-};
-
-void AnchorTable::save(Context *onfinish)
-{
- dout(7) << "save v " << version << dendl;
- if (!opened) {
- assert(!onfinish);
- return;
- }
-
- if (onfinish)
- waiting_for_save[version].push_back(onfinish);
-
- if (committing_version == version) {
- dout(7) << "save already committing v " << version << dendl;
- return;
- }
- committing_version = version;
-
- // build up write
- bufferlist bl;
-
- // version
- ::encode(version, bl);
- ::encode(anchor_map, bl);
-
- // pending
- ::encode(pending_reqmds, bl);
- ::encode(pending_create, bl);
- ::encode(pending_destroy, bl);
- ::encode(pending_update, bl);
-
- // write!
- object_t oid = object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0);
- vector<snapid_t> snaps;
- mds->objecter->write(oid,
- 0, bl.length(),
- mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_anchortable_layout),
- snaps,
- bl, 0,
- NULL, new C_AT_Saved(this, version));
-}
-
-void AnchorTable::_saved(version_t v)
-{
- dout(7) << "_saved v " << v << dendl;
-
- assert(v <= committing_version);
- assert(committed_version < v);
- committed_version = v;
-
- finish_contexts(waiting_for_save[v], 0);
- waiting_for_save.erase(v);
-}
-
-
-
-class C_AT_Load : public Context {
- AnchorTable *at;
-public:
- bufferlist bl;
- C_AT_Load(AnchorTable *a) : at(a) {}
- void finish(int result) {
- assert(result > 0);
- at->_loaded(bl);
- }
-};
-
-void AnchorTable::load(Context *onfinish)
-{
- dout(7) << "load" << dendl;
- assert(!opened);
-
- waiting_for_open.push_back(onfinish);
-
- C_AT_Load *fin = new C_AT_Load(this);
- object_t oid = object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0);
- vector<snapid_t> snaps;
- mds->objecter->read(oid,
- 0, 0,
- mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_anchortable_layout),
- snaps,
- &fin->bl, 0,
- fin);
-}
-
-void AnchorTable::_loaded(bufferlist& bl)
-{
- dout(10) << "_loaded got " << bl.length() << " bytes" << dendl;
-
- bufferlist::iterator p = bl.begin();
-
- ::decode(version, p);
- ::decode(anchor_map, p);
-
- ::decode(pending_reqmds, p);
- ::decode(pending_create, p);
- ::decode(pending_destroy, p);
- ::decode(pending_update, p);
-
- assert(p.end());
-
- // done.
- opened = true;
- opening = false;
-
- finish_contexts(waiting_for_open);
-}
-
-
-//////
-
-void AnchorTable::finish_recovery()
-{
- dout(7) << "finish_recovery" << dendl;
-
- // resend agrees for everyone.
- for (map<version_t,int>::iterator p = pending_reqmds.begin();
- p != pending_reqmds.end();
- p++)
- resend_agree(p->first, p->second);
-}
-
-
-void AnchorTable::resend_agree(version_t v, int who)
-{
- if (pending_create.count(v)) {
- MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, pending_create[v], v);
- mds->send_message_mds(reply, who);
- }
- else if (pending_destroy.count(v)) {
- MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, pending_destroy[v], v);
- mds->send_message_mds(reply, who);
- }
- else {
- assert(pending_update.count(v));
- MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, pending_update[v].first, v);
- mds->send_message_mds(reply, who);
- }
-}
-
-void AnchorTable::handle_mds_recovery(int who)
-{
- dout(7) << "handle_mds_recovery mds" << who << dendl;
-
- // resend agrees for recovered mds
- for (map<version_t,int>::iterator p = pending_reqmds.begin();
- p != pending_reqmds.end();
- p++) {
- if (p->second != who) continue;
- resend_agree(p->first, p->second);
- }
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __ANCHORTABLE_H
-#define __ANCHORTABLE_H
-
-#include "Anchor.h"
-#include "include/Context.h"
-
-#include <ext/hash_map>
-using namespace __gnu_cxx;
-
-class MDS;
-class MAnchor;
-
-class AnchorTable {
- MDS *mds;
-
- // keep the entire table in memory.
- hash_map<inodeno_t, Anchor> anchor_map;
-
- // uncommitted operations
- map<version_t, int> pending_reqmds;
- map<version_t, inodeno_t> pending_create;
- map<version_t, inodeno_t> pending_destroy;
- map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
-
- version_t version; // this includes anchor_map AND pending_* state.
- version_t committing_version;
- version_t committed_version;
-
- // load/save state
- bool opening, opened;
-
- // waiters
- list<Context*> waiting_for_open;
- map<version_t, list<Context*> > waiting_for_save;
-
-protected:
-
- // basic updates
- bool add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash);
- void inc(inodeno_t ino);
- void dec(inodeno_t ino);
-
- // mid-level
- void create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
- void destroy_prepare(inodeno_t ino, int reqmds);
- void update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
- void commit(version_t atid);
- void rollback(version_t atid);
- friend class EAnchor; // used for journal replay.
-
- // high level interface
- void handle_lookup(MAnchor *req);
-
- void handle_create_prepare(MAnchor *req);
- void _create_prepare_logged(MAnchor *req, version_t atid);
- friend class C_AT_CreatePrepare;
-
- void handle_destroy_prepare(MAnchor *req);
- void _destroy_prepare_logged(MAnchor *req, version_t atid);
- friend class C_AT_DestroyPrepare;
-
- void handle_update_prepare(MAnchor *req);
- void _update_prepare_logged(MAnchor *req, version_t atid);
- friend class C_AT_UpdatePrepare;
-
- void handle_commit(MAnchor *req);
- void _commit_logged(MAnchor *req);
- friend class C_AT_Commit;
-
- void handle_rollback(MAnchor *req);
-
- // messages
- void handle_anchor_request(MAnchor *m);
-
- void dump();
-
-public:
- AnchorTable(MDS *m) :
- mds(m),
- version(0), committing_version(0), committed_version(0),
- opening(false), opened(false) { }
-
- void dispatch(class Message *m);
-
- version_t get_version() { return version; }
- version_t get_committed_version() { return committed_version; }
-
- void create_fresh() {
- // reset (i.e. on mkfs) to empty, but unsaved table.
- version = 1;
- opened = true;
- opening = false;
- anchor_map.clear();
- pending_create.clear();
- pending_destroy.clear();
- pending_update.clear();
- }
-
- // load/save entire table for now!
- void save(Context *onfinish);
- void _saved(version_t v);
- void load(Context *onfinish);
- void _loaded(bufferlist& bl);
-
- // recovery
- void handle_mds_recovery(int who);
- void finish_recovery();
- void resend_agree(version_t v, int who);
-
-};
-
-#endif
#include "MDS.h"
#include "MDCache.h"
-#include "AnchorTable.h"
#include "snap.h"
#include "events/EPurgeFinish.h"
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
-
-#include "events/ESnap.h"
+#include "events/ETableClient.h"
+#include "events/ETableServer.h"
LogEvent *LogEvent::decode(bufferlist& bl)
case EVENT_PURGEFINISH: le = new EPurgeFinish; break;
- case EVENT_ANCHOR: le = new EAnchor; break;
- case EVENT_ANCHORCLIENT: le = new EAnchorClient; break;
-
- case EVENT_SNAP: le = new ESnap; break;
+ case EVENT_TABLECLIENT: le = new ETableClient; break;
+ case EVENT_TABLESERVER: le = new ETableServer; break;
default:
generic_dout(1) << "uh oh, unknown log event type " << type << dendl;
#define EVENT_PURGEFINISH 30
-#define EVENT_ANCHOR 40
-#define EVENT_ANCHORCLIENT 41
-
-#define EVENT_SNAP 50
+#define EVENT_TABLECLIENT 42
+#define EVENT_TABLESERVER 43
//xlist<CInode*> purging_inodes;
map<CInode*, map<loff_t,loff_t> > purging_inodes;
- // committed anchor transactions
- hash_set<version_t> pending_commit_atids;
+ map<int, hash_set<version_t> > pending_commit_tids; // mdstable
set<metareqid_t> uncommitted_masters;
// client request ids
// table version
version_t allocv;
version_t sessionmapv;
- version_t anchortablev;
- version_t snaptablev;
+ map<int,version_t> tablev;
// try to expire
C_Gather *try_to_expire(MDS *mds);
// cons
LogSegment(loff_t off) : offset(off), end(off), num_events(0), trimmable_at(0),
- allocv(0), sessionmapv(0), anchortablev(0), snaptablev(0)
+ allocv(0), sessionmapv(0)
{ }
};
EUpdate *le = new EUpdate(mds->mdlog, add ? "anchor_create":"anchor_destroy");
mds->locker->predirty_nested(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY);
le->metablob.add_primary_dentry(in->parent, true, 0, pi);
- le->metablob.add_anchor_transaction(atid);
+ le->metablob.add_table_transaction(TABLE_ANCHOR, atid);
mds->mdlog->submit_entry(le, new C_MDC_AnchorLogged(this, in, atid, mut));
}
map<__u32,entity_inst_t> imported_client_map;
map<CInode*, map<__u32,Capability::Export> > cap_imports;
+ // for snaps
+ version_t stid;
+
// called when slave commits or aborts
Context *slave_commit;
bufferlist rollback_bl;
More() :
src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
destdn_was_remote_inode(0), was_link_merge(false),
+ stid(0),
slave_commit(0),
fragment_in(0), fragment_bits(0) { }
} *_more;
#include "MDBalancer.h"
#include "Migrator.h"
-#include "AnchorTable.h"
+#include "AnchorServer.h"
#include "AnchorClient.h"
#include "IdAllocator.h"
#include "messages/MClientRequest.h"
#include "messages/MClientRequestForward.h"
-#include "messages/MAnchor.h"
+#include "messages/MMDSTableRequest.h"
#include "config.h"
mdlog = new MDLog(this);
balancer = new MDBalancer(this);
- anchorclient = new AnchorClient(this);
idalloc = new IdAllocator(this);
-
- anchortable = new AnchorTable(this);
snaptable = new SnapTable(this);
+ anchorserver = new AnchorServer(this);
+ anchorclient = new AnchorClient(this);
+
server = new Server(this);
locker = new Locker(this, mdcache);
if (mdlog) { delete mdlog; mdlog = NULL; }
if (balancer) { delete balancer; balancer = NULL; }
if (idalloc) { delete idalloc; idalloc = NULL; }
- if (anchortable) { delete anchortable; anchortable = NULL; }
+ if (anchorserver) { delete anchorserver; anchorserver = NULL; }
if (snaptable) { delete snaptable; snaptable = NULL; }
if (anchorclient) { delete anchorclient; anchorclient = NULL; }
if (osdmap) { delete osdmap; osdmap = 0; }
server->reopen_logger(start, append);
}
+
+
+
+MDSTableClient *MDS::get_table_client(int t)
+{
+ switch (t) {
+ case TABLE_ANCHOR: return anchorclient;
+ //case TABLE_SNAP: return snapserver;
+ default: assert(0);
+ }
+}
+
+MDSTableServer *MDS::get_table_server(int t)
+{
+ switch (t) {
+ case TABLE_ANCHOR: return anchorserver;
+ //case TABLE_SNAP: return snapserver;
+ default: assert(0);
+ }
+}
+
+
+
+
+
+
+
+
+
+
void MDS::send_message_mds(Message *m, int mds)
{
// send mdsmap first?
// fixme: fake out anchortable
if (mdsmap->get_anchortable() == whoami) {
dout(10) << "boot_create creating fresh anchortable" << dendl;
- anchortable->create_fresh();
- anchortable->save(fin->new_sub());
+ anchorserver->reset();
+ anchorserver->save(fin->new_sub());
}
if (whoami == 0) {
if (mdsmap->get_anchortable() == whoami) {
dout(2) << "boot_start " << step << ": opening anchor table" << dendl;
- anchortable->load(gather->new_sub());
+ anchorserver->load(gather->new_sub());
}
if (whoami == 0) {
dout(2) << "boot_start " << step << ": opening snap table" << dendl;
// kick anchortable (resent AGREEs)
if (mdsmap->get_anchortable() == whoami)
- anchortable->finish_recovery();
+ anchorserver->finish_recovery();
// kick anchorclient (resent COMMITs)
anchorclient->finish_recovery();
mdcache->handle_mds_recovery(who);
- if (anchortable)
- anchortable->handle_mds_recovery(who);
+ if (anchorserver)
+ anchorserver->handle_mds_recovery(who);
anchorclient->handle_mds_recovery(who);
queue_waiters(waiting_for_active_peer[who]);
balancer->proc_message(m);
break;
- // anchor
- case MSG_MDS_ANCHOR:
- if (((MAnchor*)m)->get_op() < 0)
- anchorclient->dispatch(m);
- else
- anchortable->dispatch(m);
+ case MSG_MDS_TABLE_REQUEST:
+ {
+ MMDSTableRequest *req = (MMDSTableRequest*)m;
+ if (req->op < 0) {
+ MDSTableClient *client = get_table_client(req->table);
+ client->handle_request(req);
+ } else {
+ MDSTableServer *server = get_table_server(req->table);
+ server->handle_request(req);
+ }
+ }
break;
// OSD
class Server;
class Locker;
-class AnchorTable;
-class AnchorClient;
class MDCache;
class MDLog;
class MDBalancer;
-class IdAllocator;
-class SnapTable;
class CInode;
class CDir;
class MMDSBeacon;
+class IdAllocator;
+class SnapTable;
+
+class MDSTableClient;
+class MDSTableServer;
+class AnchorServer;
+class AnchorClient;
class MDS : public Dispatcher {
public:
IdAllocator *idalloc;
- AnchorTable *anchortable;
+ AnchorServer *anchorserver;
AnchorClient *anchorclient;
SnapTable *snaptable;
+ MDSTableClient *get_table_client(int t);
+ MDSTableServer *get_table_server(int t);
+
Logger *logger, *logger2;
#define __MDSTABLE_H
#include "mdstypes.h"
+#include "mds_table_types.h"
#include "include/buffer.h"
#include "include/Context.h"
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <iostream>
+using std::cout;
+using std::cerr;
+
+#include "MDSMap.h"
+
+#include "include/Context.h"
+#include "msg/Messenger.h"
+
+#include "MDS.h"
+#include "MDLog.h"
+#include "LogSegment.h"
+
+#include "MDSTableClient.h"
+#include "events/ETableClient.h"
+
+#include "messages/MMDSTableRequest.h"
+
+#include "config.h"
+
+#define dout(x) if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableclient "
+#define derr(x) if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableclient "
+
+
+void MDSTableClient::handle_request(class MMDSTableRequest *m)
+{
+ dout(10) << "handle_request " << *m << dendl;
+ assert(m->table == table);
+
+ version_t tid = m->tid;
+ __u64 reqid = m->reqid;
+
+ switch (m->op) {
+ case TABLE_OP_QUERY_REPLY:
+ handle_query_result(m);
+ break;
+
+ case TABLE_OP_AGREE:
+ if (pending_prepare.count(reqid)) {
+ dout(10) << "got agree on " << reqid << " atid " << tid << dendl;
+ Context *onfinish = pending_prepare[reqid].onfinish;
+ *pending_prepare[reqid].ptid = tid;
+ pending_prepare.erase(reqid);
+ if (onfinish) {
+ onfinish->finish(0);
+ delete onfinish;
+ }
+ }
+ else if (pending_commit.count(tid)) {
+ dout(10) << "stray agree on " << reqid
+ << " tid " << tid
+ << ", already committing, resending COMMIT"
+ << dendl;
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_COMMIT, 0, tid);
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+ }
+ else {
+ dout(10) << "stray agree on " << reqid
+ << " tid " << tid
+ << ", sending ROLLBACK"
+ << dendl;
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_ROLLBACK, 0, tid);
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+ }
+ break;
+
+ case TABLE_OP_ACK:
+ dout(10) << "got ack on tid " << tid << ", logging" << dendl;
+
+ // remove from committing list
+ assert(pending_commit.count(tid));
+ assert(pending_commit[tid]->pending_commit_tids[table].count(tid));
+
+ // log ACK.
+ mds->mdlog->submit_entry(new ETableClient(table, TABLE_OP_ACK, tid),
+ new C_LoggedAck(this, tid));
+ break;
+
+ default:
+ assert(0);
+ }
+
+ delete m;
+}
+
+
+void MDSTableClient::_logged_ack(version_t tid)
+{
+ dout(10) << "_logged_ack" << dendl;
+
+ assert(pending_commit.count(tid));
+ assert(pending_commit[tid]->pending_commit_tids[table].count(tid));
+
+ pending_commit[tid]->pending_commit_tids[table].erase(tid);
+ pending_commit.erase(tid);
+
+ // kick any waiters (LogSegment trim)
+ if (ack_waiters.count(tid)) {
+ dout(15) << "kicking ack waiters on tid " << tid << dendl;
+ mds->queue_waiters(ack_waiters[tid]);
+ ack_waiters.erase(tid);
+ }
+}
+
+void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, Context *onfinish)
+{
+ __u64 reqid = ++last_reqid;
+ dout(10) << "_prepare " << reqid << dendl;
+
+ // send message
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_PREPARE, reqid);
+ req->bl = mutation;
+
+ pending_prepare[reqid].mutation = mutation;
+ pending_prepare[reqid].ptid = ptid;
+ pending_prepare[reqid].onfinish = onfinish;
+
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+}
+
+void MDSTableClient::commit(version_t tid, LogSegment *ls)
+{
+ dout(10) << "commit " << tid << dendl;
+
+ assert(pending_commit.count(tid) == 0);
+ pending_commit[tid] = ls;
+ ls->pending_commit_tids[table].insert(tid);
+
+ // send message
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_COMMIT, 0, tid);
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+}
+
+
+
+// recovery
+
+
+void MDSTableClient::finish_recovery()
+{
+ dout(7) << "finish_recovery" << dendl;
+ resend_commits();
+}
+
+void MDSTableClient::resend_commits()
+{
+ for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
+ p != pending_commit.end();
+ ++p) {
+ dout(10) << "resending commit on " << p->first << dendl;
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_COMMIT, 0, p->first);
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+ }
+}
+
+void MDSTableClient::handle_mds_recovery(int who)
+{
+ dout(7) << "handle_mds_recovery mds" << who << dendl;
+
+ if (who != mds->mdsmap->get_anchortable())
+ return; // do nothing.
+
+ resend_queries();
+
+ // prepares.
+ for (hash_map<__u64, _pending_prepare>::iterator p = pending_prepare.begin();
+ p != pending_prepare.end();
+ p++) {
+ dout(10) << "resending " << p->first << dendl;
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLE_OP_PREPARE, p->first);
+ req->bl = p->second.mutation;
+ mds->send_message_mds(req, mds->mdsmap->get_anchortable());
+ }
+
+ resend_commits();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __SNAPCLIENT_H
+#define __SNAPCLIENT_H
+
+#include <vector>
+using std::vector;
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+
+#include "include/types.h"
+#include "include/Context.h"
+#include "mds_table_types.h"
+
+class MDS;
+class LogSegment;
+class MMDSTableRequest;
+
+class MDSTableClient {
+protected:
+ MDS *mds;
+ int table;
+
+ __u64 last_reqid;
+
+ // prepares
+ struct _pending_prepare {
+ Context *onfinish;
+ version_t *ptid;
+ bufferlist mutation;
+ };
+
+ hash_map<__u64, _pending_prepare> pending_prepare;
+
+ // pending commits
+ map<version_t, LogSegment*> pending_commit;
+ map<version_t, list<Context*> > ack_waiters;
+
+ void handle_reply(class MMDSTableQuery *m);
+
+ class C_LoggedAck : public Context {
+ MDSTableClient *tc;
+ version_t tid;
+ public:
+ C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {}
+ void finish(int r) {
+ tc->_logged_ack(tid);
+ }
+ };
+ void _logged_ack(version_t tid);
+
+public:
+ MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(0) {}
+ virtual ~MDSTableClient() {}
+
+ void handle_request(MMDSTableRequest *m);
+
+ void _prepare(bufferlist& mutation, version_t *ptid, Context *onfinish);
+ void commit(version_t tid, LogSegment *ls);
+
+ // for recovery (by other nodes)
+ void handle_mds_recovery(int mds); // called when someone else recovers
+ void resend_commits();
+
+ // for recovery (by me)
+ void got_journaled_agree(version_t tid, LogSegment *ls) {
+ pending_commit[tid] = ls;
+ }
+ void got_journaled_ack(version_t tid) {
+ pending_commit.erase(tid);
+ }
+ bool has_committed(version_t tid) {
+ return pending_commit.count(tid) == 0;
+ }
+ void wait_for_ack(version_t tid, Context *c) {
+ ack_waiters[tid].push_back(c);
+ }
+ void finish_recovery(); // called when i recover and go active
+
+
+ // child must implement
+ virtual void resend_queries() = 0;
+ virtual void handle_query_result(MMDSTableRequest *m) = 0;
+
+ // and friendly front-end for _prepare.
+
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "MDSTableServer.h"
+#include "MDS.h"
+#include "MDLog.h"
+#include "msg/Messenger.h"
+
+#include "messages/MMDSTableRequest.h"
+#include "events/ETableServer.h"
+
+#define dout(x) if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableserver(" << get_mdstable_name(table) << ") "
+#define derr(x) if (x <= g_conf.debug_mds) *_derr << dbeginl << g_clock.now() << " " << mds->messenger->get_myname() << ".tableserver(" << get_mdstable_name(table) << ") "
+
+
+void MDSTableServer::handle_request(MMDSTableRequest *req)
+{
+ assert(req->op >= 0);
+ switch (req->op) {
+ case TABLE_OP_QUERY: return handle_query(req);
+ case TABLE_OP_PREPARE: return handle_prepare(req);
+ case TABLE_OP_COMMIT: return handle_commit(req);
+ case TABLE_OP_ROLLBACK: return handle_rollback(req);
+ default: assert(0);
+ }
+}
+
+// prepare
+
+void MDSTableServer::handle_prepare(MMDSTableRequest *req)
+{
+ dout(7) << "handle_prepare " << *req << dendl;
+ int from = req->get_source().num();
+
+ _prepare(req->bl, req->reqid, from);
+ pending_for_mds[req->reqid].mds = from;
+ pending_for_mds[req->reqid].reqid = req->reqid;
+ pending_for_mds[req->reqid].tid = version;
+
+ ETableServer *le = new ETableServer(table, TABLE_OP_PREPARE, req->reqid, from, version, version);
+ le->mutation = req->bl;
+ mds->mdlog->submit_entry(le, new C_Prepare(this, req, version));
+}
+
+void MDSTableServer::_prepare_logged(MMDSTableRequest *req, version_t tid)
+{
+ dout(7) << "_create_logged " << *req << " tid " << tid << dendl;
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_AGREE, req->reqid, tid);
+ mds->send_message_mds(reply, req->get_source().num());
+ delete req;
+}
+
+
+// commit
+
+void MDSTableServer::handle_commit(MMDSTableRequest *req)
+{
+ dout(7) << "handle_commit " << *req << dendl;
+
+ version_t tid = req->tid;
+
+ if (_is_prepared(tid)) {
+ _commit(tid);
+ mds->mdlog->submit_entry(new ETableServer(table, TABLE_OP_COMMIT, 0, -1, tid, version));
+ mds->mdlog->wait_for_sync(new C_Commit(this, req));
+ }
+ else if (tid <= version) {
+ dout(0) << "got commit for tid " << tid << " <= " << version
+ << ", already committed, sending ack."
+ << dendl;
+ _commit_logged(req);
+ }
+ else {
+ // wtf.
+ dout(0) << "got commit for tid " << tid << " > " << version << dendl;
+ assert(tid <= version);
+ }
+}
+
+void MDSTableServer::_commit_logged(MMDSTableRequest *req)
+{
+ dout(7) << "_commit_logged, sending ACK" << dendl;
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_ACK, req->reqid, req->tid);
+ mds->send_message_mds(reply, req->get_source().num());
+ delete req;
+}
+
+// ROLLBACK
+
+void MDSTableServer::handle_rollback(MMDSTableRequest *req)
+{
+ dout(7) << "handle_rollback " << *req << dendl;
+ _rollback(req->tid);
+ delete req;
+}
+
+
+// recovery
+
+void MDSTableServer::finish_recovery()
+{
+ dout(7) << "finish_recovery" << dendl;
+ handle_mds_recovery(-1); // resend agrees for everyone.
+}
+
+void MDSTableServer::handle_mds_recovery(int who)
+{
+ if (who >= 0)
+ dout(7) << "handle_mds_recovery mds" << who << dendl;
+
+ // resend agrees for recovered mds
+ for (map<version_t,_pending>::iterator p = pending_for_mds.begin();
+ p != pending_for_mds.end();
+ p++) {
+ if (who >= 0 && p->second.mds != who) continue;
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_AGREE, p->second.reqid, p->second.tid);
+ mds->send_message_mds(reply, who);
+ }
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDSTABLESERVER_H
+#define __MDSTABLESERVER_H
+
+#include "MDSTable.h"
+
+class MMDSTableRequest;
+
+class MDSTableServer : public MDSTable {
+public:
+ int table;
+
+ /* mds's requesting any pending ops. child needs to encodig the corresponding
+ * pending mutation state in the table.
+ */
+ struct _pending {
+ __u64 reqid;
+ __s32 mds;
+ version_t tid;
+ void encode(bufferlist& bl) const {
+ ::encode(reqid, bl);
+ ::encode(mds, bl);
+ ::encode(tid, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(reqid, bl);
+ ::decode(mds, bl);
+ ::decode(tid, bl);
+ }
+ };
+ WRITE_CLASS_ENCODER(_pending)
+ map<version_t,_pending> pending_for_mds; // ** child should encode this! **
+
+
+private:
+ void handle_prepare(MMDSTableRequest *m);
+ void _prepare_logged(MMDSTableRequest *m, version_t tid);
+ struct C_Prepare : public Context {
+ MDSTableServer *server;
+ MMDSTableRequest *req;
+ version_t tid;
+ C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : server(s), req(r), tid(v) {}
+ void finish(int r) {
+ server->_prepare_logged(req, tid);
+ }
+ };
+
+ void handle_commit(MMDSTableRequest *m);
+ void _commit_logged(MMDSTableRequest *m);
+ struct C_Commit : public Context {
+ MDSTableServer *server;
+ MMDSTableRequest *req;
+ C_Commit(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
+ void finish(int r) {
+ server->_commit_logged(req);
+ }
+ };
+
+ void handle_rollback(MMDSTableRequest *m);
+
+ public:
+ virtual void handle_query(MMDSTableRequest *m) = 0;
+ virtual void _prepare(bufferlist &bl, __u64 reqid, int bymds) = 0;
+ virtual bool _is_prepared(version_t tid) = 0;
+ virtual void _commit(version_t tid) = 0;
+ virtual void _rollback(version_t tid) = 0;
+
+ MDSTableServer(MDS *m, int tab) : MDSTable(m, get_mdstable_name(tab)), table(tab) {}
+ virtual ~MDSTableServer() {}
+
+ void handle_request(MMDSTableRequest *m);
+
+ // recovery
+ void finish_recovery();
+ void handle_mds_recovery(int who);
+};
+WRITE_CLASS_ENCODER(MDSTableServer::_pending)
+
+#endif
#include "events/ESession.h"
#include "events/EOpen.h"
#include "events/ECommitted.h"
-#include "events/ESnap.h"
#include "include/filepath.h"
#include "common/Timer.h"
}
if (mdr->more()->dst_reanchor_atid)
- le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
+ le->metablob.add_table_transaction(TABLE_ANCHOR, mdr->more()->dst_reanchor_atid);
// mark committing (needed for proper recovery)
mdr->committing = true;
}
if (mdr->more()->dst_reanchor_atid)
- le->metablob.add_anchor_transaction(mdr->more()->dst_reanchor_atid);
+ le->metablob.add_table_transaction(TABLE_ANCHOR, mdr->more()->dst_reanchor_atid);
// log + wait
mdlog->submit_entry(le, new C_MDS_unlink_local_finish(mds, mdr, dn, straydn));
// anchor updates?
if (mdr->more()->src_reanchor_atid)
- metablob->add_anchor_transaction(mdr->more()->src_reanchor_atid);
+ metablob->add_table_transaction(TABLE_ANCHOR, mdr->more()->src_reanchor_atid);
if (mdr->more()->dst_reanchor_atid)
- metablob->add_anchor_transaction(mdr->more()->dst_reanchor_atid);
+ metablob->add_table_transaction(TABLE_ANCHOR, mdr->more()->dst_reanchor_atid);
}
mdr->ls = mdlog->get_current_segment();
EUpdate *le = new EUpdate(mdlog, "mksnap");
le->metablob.add_client_req(req->get_reqid());
- le->metablob.add_snap_transaction(stid);
+ le->metablob.add_table_transaction(TABLE_SNAP, stid);
mds->locker->predirty_nested(mdr, &le->metablob, diri, 0, PREDIRTY_PRIMARY, false);
mdcache->journal_dirty_inode(&le->metablob, diri, diri->find_snaprealm()->get_latest_snap());
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __SNAPCLIENT_H
+#define __SNAPCLIENT_H
+
+#include <vector>
+using std::vector;
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+
+#include "include/types.h"
+#include "msg/Dispatcher.h"
+
+#include "snap.h"
+
+class Context;
+class MDS;
+class LogSegment;
+
+class MDSTableClient : public Dispatcher {
+ MDS *mds;
+
+ // prepares
+ struct _pending_prepare {
+ Context *onfinish;
+ version_t *patid;
+ bufferlist mutation;
+ };
+
+ hash_map<metareqid_t, _pending_prepare> pending_prepare;
+
+ // pending commits
+ map<version_t, LogSegment*> pending_commit;
+ map<version_t, list<Context*> > ack_waiters;
+
+ void handle_reply(class MAnchor *m);
+
+
+};
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef __MDS_EANCHOR_H
-#define __MDS_EANCHOR_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "../Anchor.h"
-
-class EAnchor : public LogEvent {
-protected:
- __u32 op;
- inodeno_t ino;
- version_t atid;
- vector<Anchor> trace;
- version_t version; // anchor table version
- __s32 reqmds;
-
- public:
- EAnchor() : LogEvent(EVENT_ANCHOR) { }
- EAnchor(int o, inodeno_t i, version_t v, int rm) :
- LogEvent(EVENT_ANCHOR),
- op(o), ino(i), atid(0), version(v), reqmds(rm) { }
- EAnchor(int o, version_t a, version_t v) :
- LogEvent(EVENT_ANCHOR),
- op(o), atid(a), version(v), reqmds(-1) { }
-
- void set_trace(vector<Anchor>& t) { trace = t; }
- vector<Anchor>& get_trace() { return trace; }
-
- void encode(bufferlist& bl) const {
- ::encode(op, bl);
- ::encode(ino, bl);
- ::encode(atid, bl);
- ::encode(trace, bl);
- ::encode(version, bl);
- ::encode(reqmds, bl);
- }
- void decode(bufferlist::iterator &bl) {
- ::decode(op, bl);
- ::decode(ino, bl);
- ::decode(atid, bl);
- ::decode(trace, bl);
- ::decode(version, bl);
- ::decode(reqmds, bl);
- }
-
- void print(ostream& out) {
- out << "EAnchor " << get_anchor_opname(op);
- if (ino) out << " " << ino;
- if (atid) out << " atid " << atid;
- if (version) out << " v " << version;
- if (reqmds >= 0) out << " by mds" << reqmds;
- }
-
- void update_segment();
- void replay(MDS *mds);
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef __MDS_EANCHORCLIENT_H
-#define __MDS_EANCHORCLIENT_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "../Anchor.h"
-
-class EAnchorClient : public LogEvent {
-protected:
- __u32 op;
- version_t atid;
-
- public:
- EAnchorClient() : LogEvent(EVENT_ANCHORCLIENT) { }
- EAnchorClient(int o, version_t at) :
- LogEvent(EVENT_ANCHORCLIENT),
- op(o), atid(at) { }
-
- void encode(bufferlist &bl) const {
- ::encode(op, bl);
- ::encode(atid, bl);
- }
- void decode(bufferlist::iterator &bl) {
- ::decode(op, bl);
- ::decode(atid, bl);
- }
-
- void print(ostream& out) {
- out << "EAnchorClient " << get_anchor_opname(op);
- if (atid) out << " atid " << atid;
- }
-
- void replay(MDS *mds);
-
-};
-
-#endif
list<dirfrag_t> lump_order;
map<dirfrag_t, dirlump> lump_map;
- list<version_t> atids; // anchor table transactions
- list<version_t> stids; // snap table transactions
+ list<pair<__u8,version_t> > table_tids; // tableclient transactions
// ino's i've allocated
list<inodeno_t> allocated_inos;
void encode(bufferlist& bl) const {
::encode(lump_order, bl);
::encode(lump_map, bl);
- ::encode(atids, bl);
- ::encode(stids, bl);
+ ::encode(table_tids, bl);
::encode(allocated_inos, bl);
if (!allocated_inos.empty())
::encode(alloc_tablev, bl);
void decode(bufferlist::iterator &bl) {
::decode(lump_order, bl);
::decode(lump_map, bl);
- ::decode(atids, bl);
- ::decode(stids, bl);
+ ::decode(table_tids, bl);
::decode(allocated_inos, bl);
if (!allocated_inos.empty())
::decode(alloc_tablev, bl);
client_reqs.push_back(r);
}
- void add_anchor_transaction(version_t atid) {
- atids.push_back(atid);
- }
- void add_snap_transaction(version_t stid) {
- stids.push_back(stid);
+ void add_table_transaction(int table, version_t tid) {
+ table_tids.push_back(pair<__u8, version_t>(table, tid));
}
void add_allocated_ino(inodeno_t ino, version_t tablev) {
out << "[metablob";
if (!lump_order.empty())
out << " " << lump_order.front() << ", " << lump_map.size() << " dirs";
- if (!atids.empty())
- out << " atids=" << atids;
- if (!stids.empty())
- out << " stids=" << stids;
+ if (!table_tids.empty())
+ out << " table_tids=" << table_tids;
if (!allocated_inos.empty())
out << " inos=" << allocated_inos << " v" << alloc_tablev;
out << "]";
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef __MDS_ESNAP_H
-#define __MDS_ESNAP_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "../snap.h"
-
-class ESnap : public LogEvent {
-public:
- bool create;
- SnapInfo snap;
- version_t version; // table version
-
- public:
- ESnap() : LogEvent(EVENT_SNAP) { }
- ESnap(bool c, SnapInfo &sn, version_t v) :
- LogEvent(EVENT_SNAP),
- create(c), snap(sn), version(v) { }
-
- void encode(bufferlist& bl) const {
- ::encode(create, bl);
- ::encode(snap, bl);
- ::encode(version, bl);
- }
- void decode(bufferlist::iterator &bl) {
- ::decode(create, bl);
- ::decode(snap, bl);
- ::decode(version, bl);
- }
-
- void print(ostream& out) {
- out << "ESnap " << (create ? "create":"remove")
- << " " << snap
- << " v " << version;
- }
-
- void update_segment();
- void replay(MDS *mds);
-};
-
-#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_ETABLECLIENT_H
+#define __MDS_ETABLECLIENT_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../mds_table_types.h"
+#include "../LogEvent.h"
+
+struct ETableClient : public LogEvent {
+ __u16 table;
+ __s16 op;
+ version_t tid;
+
+ ETableClient() : LogEvent(EVENT_TABLECLIENT) { }
+ ETableClient(int t, int o, version_t ti) :
+ LogEvent(EVENT_TABLECLIENT),
+ table(t), op(o), tid(ti) { }
+
+ void encode(bufferlist& bl) const {
+ ::encode(table, bl);
+ ::encode(op, bl);
+ ::encode(tid, bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ ::decode(table, bl);
+ ::decode(op, bl);
+ ::decode(tid, bl);
+ }
+
+ void print(ostream& out) {
+ out << "ETableClient " << get_mdstable_name(table) << " " << get_mdstable_opname(op);
+ if (tid) out << " tid " << tid;
+ }
+
+ //void update_segment();
+ void replay(MDS *mds);
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_ETABLESERVER_H
+#define __MDS_ETABLESERVER_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../mds_table_types.h"
+#include "../LogEvent.h"
+
+struct ETableServer : public LogEvent {
+ __u16 table;
+ __s16 op;
+ __u64 reqid;
+ __s32 bymds;
+ bufferlist mutation;
+ version_t tid;
+ version_t version;
+
+ ETableServer() : LogEvent(EVENT_TABLESERVER) { }
+ ETableServer(int t, int o, __u64 ri, int m, version_t ti, version_t v) :
+ LogEvent(EVENT_TABLESERVER),
+ table(t), op(o), reqid(ri), bymds(m), tid(ti), version(v) { }
+
+ void encode(bufferlist& bl) const {
+ ::encode(table, bl);
+ ::encode(op, bl);
+ ::encode(reqid, bl);
+ ::encode(bymds, bl);
+ ::encode(mutation, bl);
+ ::encode(tid, bl);
+ ::encode(version, bl);
+ }
+ void decode(bufferlist::iterator &bl) {
+ ::decode(table, bl);
+ ::decode(op, bl);
+ ::decode(reqid, bl);
+ ::decode(bymds, bl);
+ ::decode(mutation, bl);
+ ::decode(tid, bl);
+ ::decode(version, bl);
+ }
+
+ void print(ostream& out) {
+ out << "ETableServer " << get_mdstable_name(table) << " " << get_mdstable_opname(op);
+ if (reqid) out << " reqid " << reqid;
+ if (bymds >= 0) out << " mds" << bymds;
+ if (tid) out << " tid " << tid;
+ if (version) out << " version " << version;
+ }
+
+ void update_segment();
+ void replay(MDS *mds);
+};
+
+#endif
#include "events/EImportFinish.h"
#include "events/EFragment.h"
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
+#include "events/ETableClient.h"
+#include "events/ETableServer.h"
-#include "events/ESnap.h"
#include "LogSegment.h"
#include "MDCache.h"
#include "Server.h"
#include "Migrator.h"
-#include "AnchorTable.h"
-#include "AnchorClient.h"
#include "IdAllocator.h"
#include "SnapTable.h"
+#include "MDSTableClient.h"
+#include "MDSTableServer.h"
+
#include "Locker.h"
}
// pending commit atids
- for (hash_set<version_t>::iterator p = pending_commit_atids.begin();
- p != pending_commit_atids.end();
+ for (map<int, hash_set<version_t> >::iterator p = pending_commit_tids.begin();
+ p != pending_commit_tids.end();
++p) {
- if (!gather) gather = new C_Gather;
- assert(!mds->anchorclient->has_committed(*p));
- dout(10) << "try_to_expire anchor transaction " << *p
- << " pending commit (not yet acked), waiting" << dendl;
- mds->anchorclient->wait_for_ack(*p, gather->new_sub());
+ MDSTableClient *client = mds->get_table_client(p->first);
+ for (hash_set<version_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ if (!gather) gather = new C_Gather;
+ assert(!client->has_committed(*q));
+ dout(10) << "try_to_expire " << get_mdstable_name(p->first) << " transaction " << *q
+ << " pending commit (not yet acked), waiting" << dendl;
+ client->wait_for_ack(*q, gather->new_sub());
+ }
}
- // anchortable
- if (anchortablev > mds->anchortable->get_committed_version()) {
- dout(10) << "try_to_expire waiting for anchor table to save, need " << anchortablev << dendl;
- if (!gather) gather = new C_Gather;
- mds->anchortable->save(gather->new_sub());
- }
-
- // snaptable
- if (snaptablev > mds->snaptable->get_committed_version()) {
- dout(10) << "try_to_expire waiting for snap table to save, need " << snaptablev << dendl;
- if (!gather) gather = new C_Gather;
- mds->snaptable->save(gather->new_sub());
+ // table servers
+ for (map<int, version_t>::iterator p = tablev.begin();
+ p != tablev.end();
+ p++) {
+ MDSTableServer *server = mds->get_table_server(p->first);
+ if (p->second > server->get_committed_version()) {
+ dout(10) << "try_to_expire waiting for " << get_mdstable_name(p->first)
+ << " to save, need " << p->second << dendl;
+ if (!gather) gather = new C_Gather;
+ server->save(gather->new_sub());
+ }
}
// FIXME client requests...?
}
}
- // anchor transactions
- for (list<version_t>::iterator p = atids.begin();
- p != atids.end();
+ // table client transactions
+ for (list<pair<__u8,version_t> >::iterator p = table_tids.begin();
+ p != table_tids.end();
++p) {
- dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << dendl;
- mds->anchorclient->got_journaled_agree(*p, logseg);
+ dout(10) << "EMetaBlob.replay noting " << get_mdstable_name(p->first) << " transaction " << p->second << dendl;
+ MDSTableClient *client = mds->get_table_client(p->first);
+ client->got_journaled_agree(p->second, logseg);
}
// allocated_inos
-// -----------------------
-// EAnchor
-void EAnchor::update_segment()
+void ETableServer::update_segment()
{
- _segment->anchortablev = version;
+ _segment->tablev[table] = version;
}
-void EAnchor::replay(MDS *mds)
+void ETableServer::replay(MDS *mds)
{
- if (mds->anchortable->get_version() >= version) {
- dout(10) << "EAnchor.replay event " << version
- << " <= table " << mds->anchortable->get_version() << dendl;
+ MDSTableServer *server = mds->get_table_server(table);
+ if (server->get_version() >= version) {
+ dout(10) << "ETableServer.replay " << get_mdstable_name(table) << " event " << version
+ << " <= table " << server->get_version() << dendl;
return;
}
- dout(10) << " EAnchor.replay event " << version
- << " - 1 == table " << mds->anchortable->get_version() << dendl;
- assert(version-1 == mds->anchortable->get_version());
+ dout(10) << " ETableServer.replay " << get_mdstable_name(table) << " event " << version
+ << " - 1 == table " << server->get_version() << dendl;
+ assert(version-1 == server->get_version());
switch (op) {
- // anchortable
- case ANCHOR_OP_CREATE_PREPARE:
- mds->anchortable->create_prepare(ino, trace, reqmds);
+ case TABLE_OP_PREPARE:
+ server->_prepare(mutation, reqid, bymds);
break;
- case ANCHOR_OP_DESTROY_PREPARE:
- mds->anchortable->destroy_prepare(ino, reqmds);
+ case TABLE_OP_COMMIT:
+ server->_commit(tid);
break;
- case ANCHOR_OP_UPDATE_PREPARE:
- mds->anchortable->update_prepare(ino, trace, reqmds);
- break;
- case ANCHOR_OP_COMMIT:
- mds->anchortable->commit(atid);
- break;
-
default:
assert(0);
}
- assert(version == mds->anchortable->get_version());
+ assert(version == server->get_version());
}
-// EAnchorClient
-
-void EAnchorClient::replay(MDS *mds)
+void ETableClient::replay(MDS *mds)
{
- dout(10) << " EAnchorClient.replay op " << op << " atid " << atid << dendl;
-
- switch (op) {
- // anchorclient
- case ANCHOR_OP_ACK:
- mds->anchorclient->got_journaled_ack(atid);
- break;
+ dout(10) << " ETableClient.replay " << get_mdstable_name(table)
+ << " op " << get_mdstable_opname(op)
+ << " tid " << tid << dendl;
- default:
- assert(0);
- }
+ MDSTableClient *client = mds->get_table_client(table);
+ assert(op == TABLE_OP_ACK);
+ client->got_journaled_ack(tid);
}
// -----------------------
// ESnap
-
+/*
void ESnap::update_segment()
{
- _segment->snaptablev = version;
+ _segment->tablev[TABLE_SNAP] = version;
}
void ESnap::replay(MDS *mds)
assert(version == mds->snaptable->get_version());
}
+*/
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDSTABLETYPES_H
+#define __MDSTABLETYPES_H
+
+// MDS TABLES
+
+enum {
+ TABLE_ANCHOR,
+ TABLE_SNAP,
+};
+
+inline const char *get_mdstable_name(int t) {
+ switch (t) {
+ case TABLE_ANCHOR: return "anchortable";
+ case TABLE_SNAP: return "snaptable";
+ default: assert(0);
+ }
+}
+
+enum {
+ TABLE_OP_QUERY = 1,
+ TABLE_OP_QUERY_REPLY = -2,
+ TABLE_OP_PREPARE = 3,
+ TABLE_OP_AGREE = -4,
+ TABLE_OP_COMMIT = 5,
+ TABLE_OP_ACK = -6,
+ TABLE_OP_ROLLBACK = 7,
+};
+
+inline const char *get_mdstable_opname(int op) {
+ switch (op) {
+ case TABLE_OP_QUERY: return "query";
+ case TABLE_OP_QUERY_REPLY: return "query_reply";
+ case TABLE_OP_PREPARE: return "prepare";
+ case TABLE_OP_AGREE: return "agree";
+ case TABLE_OP_COMMIT: return "commit";
+ case TABLE_OP_ACK: return "ack";
+ case TABLE_OP_ROLLBACK: return "rollback";
+ default: assert(0); return 0;
+
+ }
+};
+
+#endif
+// CAPS
+
typedef __u32 capseq_t;
inline string cap_string(int cap)
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MANCHORREQUEST_H
-#define __MANCHORREQUEST_H
-
-#include <vector>
-
-#include "msg/Message.h"
-#include "mds/Anchor.h"
-
-
-class MAnchor : public Message {
- __u32 op;
- inodeno_t ino;
- vector<Anchor> trace;
- version_t atid; // anchor table version.
-
- public:
- MAnchor() {}
- MAnchor(int o, inodeno_t i, version_t v=0) :
- Message(MSG_MDS_ANCHOR),
- op(o), ino(i), atid(v) { }
-
- virtual const char *get_type_name() { return "anchor"; }
- void print(ostream& o) {
- o << "anchor(" << get_anchor_opname(op);
- if (ino) o << " " << ino;
- if (atid) o << " atid " << atid;
- if (!trace.empty()) o << ' ' << trace;
- o << ")";
- }
-
- void set_trace(vector<Anchor>& trace) {
- this->trace = trace;
- }
-
- int get_op() { return op; }
- inodeno_t get_ino() { return ino; }
- vector<Anchor>& get_trace() { return trace; }
- version_t get_atid() { return atid; }
-
- virtual void decode_payload() {
- bufferlist::iterator p = payload.begin();
- ::decode(op, p);
- ::decode(ino, p);
- ::decode(atid, p);
- ::decode(trace, p);
- }
-
- virtual void encode_payload() {
- ::encode(op, payload);
- ::encode(ino, payload);
- ::encode(atid, payload);
- ::encode(trace, payload);
- }
-};
-
-#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MMDSTABLEREQUEST_H
+#define __MMDSTABLEREQUEST_H
+
+#include "msg/Message.h"
+#include "mds/mds_table_types.h"
+
+class MMDSTableRequest : public Message {
+ public:
+ __u16 table;
+ __s16 op;
+ __u64 reqid;
+ version_t tid;
+ bufferlist bl;
+
+ MMDSTableRequest() {}
+ MMDSTableRequest(int tab, int o, __u64 r, version_t v=0) :
+ Message(MSG_MDS_TABLE_REQUEST),
+ table(tab), op(o), reqid(r), tid(v) { }
+
+ virtual const char *get_type_name() { return "mds_table_request"; }
+ void print(ostream& o) {
+ o << "mds_table_request(" << get_mdstable_opname(op) << " " << reqid;
+ if (tid) o << " tid " << tid;
+ if (bl.length()) o << " " << bl.length() << " bytes";
+ o << ")";
+ }
+
+ virtual void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(table, p);
+ ::decode(op, p);
+ ::decode(reqid, p);
+ ::decode(tid, p);
+ ::decode(bl, p);
+ }
+
+ virtual void encode_payload() {
+ ::encode(table, payload);
+ ::encode(op, payload);
+ ::encode(reqid, payload);
+ ::encode(tid, payload);
+ ::encode(bl, payload);
+ }
+};
+
+#endif
#include "messages/MHeartbeat.h"
-#include "messages/MAnchor.h"
+#include "messages/MMDSTableRequest.h"
//#include "messages/MInodeUpdate.h"
#include "messages/MCacheExpire.h"
m = new MCacheExpire();
break;
- case MSG_MDS_ANCHOR:
- m = new MAnchor();
+ case MSG_MDS_TABLE_REQUEST:
+ m = new MMDSTableRequest;
break;
/* case MSG_MDS_INODEUPDATE:
#define MSG_MDS_BEACON 90 // to monitor
#define MSG_MDS_SLAVE_REQUEST 91
+#define MSG_MDS_TABLE_REQUEST 92
-#define MSG_MDS_ANCHOR 0x100
#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer