mds/CDir.o\
mds/CInode.o\
mds/AnchorTable.o\
+ mds/AnchorClient.o\
mds/MDStore.o\
- mds/LogStream.o\
+ mds/LogEvent.o\
mds/IdAllocator.o\
mds/MDLog.o
OSDC_OBJS= \
osdc/Filer.o\
osdc/ObjectCacher.o\
- osdc/Objecter.o
+ osdc/Objecter.o\
+ osdc/LogStreamer.o
MON_OBJS= \
mon/Monitor.o\
${CC} -fPIC ${CFLAGS} -c $< -o $@
count:
- cat *.cc */*.cc */*.h */*/*.h | wc -l
- cat *.cc */*.cc */*.h */*/*.h | grep -c \;
+ cat ${SRCS} | wc -l
+ cat ${SRCS} | grep -c \;
.depend:
touch .depend
//FileLayout g_OSD_MDLogLayout( 1<<20, 1, 1<<20 ); // old way
// fake osd failures: osd -> time
-map<int,float> g_fake_osd_down;
-map<int,float> g_fake_osd_out;
+std::map<int,float> g_fake_osd_down;
+std::map<int,float> g_fake_osd_out;
md_config_t g_debug_after_conf;
#include <stdlib.h>
#include <string.h>
-#include <iostream>
-using namespace std;
-void env_to_vec(vector<char*>& args)
+void env_to_vec(std::vector<char*>& args)
{
const char *p = getenv("CEPH_ARGS");
if (!p) return;
void argv_to_vec(int argc, char **argv,
- vector<char*>& args)
+ std::vector<char*>& args)
{
for (int i=1; i<argc; i++)
args.push_back(argv[i]);
}
-void vec_to_argv(vector<char*>& args,
+void vec_to_argv(std::vector<char*>& args,
int& argc, char **&argv)
{
argv = (char**)malloc(sizeof(char*) * argc);
argv[argc++] = args[i];
}
-void parse_config_options(vector<char*>& args)
+void parse_config_options(std::vector<char*>& args)
{
- vector<char*> nargs;
+ std::vector<char*> nargs;
for (unsigned i=0; i<args.size(); i++) {
if (strcmp(args[i], "--nummon") == 0)
#include <vector>
#include <map>
-using namespace std;
-extern map<int,float> g_fake_osd_down;
-extern map<int,float> g_fake_osd_out;
+extern std::map<int,float> g_fake_osd_down;
+extern std::map<int,float> g_fake_osd_out;
#define OSD_REP_PRIMARY 0
#define OSD_REP_SPLAY 1
extern md_config_t g_conf;
extern md_config_t g_debug_after_conf;
-#define dout(x) if ((x) <= g_conf.debug) cout
-#define dout2(x) if ((x) <= g_conf.debug) cout
+#define dout(x) if ((x) <= g_conf.debug) std::cout
+#define dout2(x) if ((x) <= g_conf.debug) std::cout
-void env_to_vec(vector<char*>& args);
+void env_to_vec(std::vector<char*>& args);
void argv_to_vec(int argc, char **argv,
- vector<char*>& args);
-void vec_to_argv(vector<char*>& args,
+ std::vector<char*>& args);
+void vec_to_argv(std::vector<char*>& args,
int& argc, char **&argv);
-void parse_config_options(vector<char*>& args);
+void parse_config_options(std::vector<char*>& args);
#endif
args = nargs;
vec_to_argv(args, argc, argv);
- MonMap *monmap = new MonMap;
-
+ MonMap *monmap = new MonMap(g_conf.num_mon);
+
Monitor *mon[g_conf.num_mon];
for (int i=0; i<g_conf.num_mon; i++) {
mon[i] = new Monitor(i, new FakeMessenger(MSG_ADDR_MON(i)), monmap);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __ANCHOR_H
+#define __ANCHOR_H
+
+#include <string>
+using std::string;
+
+#include "include/types.h"
+#include "include/bufferlist.h"
+
+class Anchor {
+public:
+ inodeno_t ino; // my ino
+ inodeno_t dirino; // containing dir
+ string ref_dn; // referring dentry
+ int nref; // reference count
+
+ Anchor() {}
+ Anchor(inodeno_t ino, inodeno_t dirino, string& ref_dn, int nref=0) {
+ this->ino = ino;
+ this->dirino = dirino;
+ this->ref_dn = ref_dn;
+ this->nref = nref;
+ }
+
+ void _encode(bufferlist &bl) {
+ bl.append((char*)&ino, sizeof(ino));
+ bl.append((char*)&dirino, sizeof(dirino));
+ bl.append((char*)&nref, sizeof(nref));
+ ::_encode(ref_dn, bl);
+ }
+ void _decode(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(ino), (char*)&ino);
+ off += sizeof(ino);
+ bl.copy(off, sizeof(dirino), (char*)&dirino);
+ off += sizeof(dirino);
+ bl.copy(off, sizeof(nref), (char*)&nref);
+ off += sizeof(nref);
+ ::_decode(ref_dn, bl, off);
+ }
+} ;
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <iostream>
+using std::cout;
+using std::cerr;
+using std::endl;
+
+#include "Anchor.h"
+#include "AnchorClient.h"
+#include "MDSMap.h"
+
+#include "include/Context.h"
+#include "msg/Messenger.h"
+
+#include "MDS.h"
+
+#include "messages/MAnchorRequest.h"
+#include "messages/MAnchorReply.h"
+
+#include "config.h"
+#undef dout
+#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient "
+#define derr(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << messenger->get_myaddr() << ".anchorclient "
+
+
+void AnchorClient::dispatch(Message *m)
+{
+ switch (m->get_type()) {
+ case MSG_MDS_ANCHORREPLY:
+ handle_anchor_reply((MAnchorReply*)m);
+ break;
+
+ default:
+ assert(0);
+ }
+}
+
+void AnchorClient::handle_anchor_reply(class MAnchorReply *m)
+{
+ switch (m->get_op()) {
+
+ case ANCHOR_OP_LOOKUP:
+ {
+ assert(pending_lookup_trace.count(m->get_ino()) == 1);
+
+ *(pending_lookup_trace[ m->get_ino() ]) = m->get_trace();
+ Context *onfinish = pending_lookup_context[ m->get_ino() ];
+
+ pending_lookup_trace.erase(m->get_ino());
+ pending_lookup_context.erase(m->get_ino());
+
+ if (onfinish) {
+ onfinish->finish(0);
+ delete onfinish;
+ }
+ }
+ break;
+
+ case ANCHOR_OP_UPDATE:
+ case ANCHOR_OP_CREATE:
+ case ANCHOR_OP_DESTROY:
+ {
+ assert(pending_op.count(m->get_ino()) == 1);
+
+ Context *onfinish = pending_op[m->get_ino()];
+ pending_op.erase(m->get_ino());
+
+ if (onfinish) {
+ onfinish->finish(0);
+ delete onfinish;
+ }
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+
+}
+
+
+
+/*
+ * public async interface
+ */
+
+void AnchorClient::lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+{
+ // send message
+ MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_LOOKUP, ino);
+
+ pending_lookup_trace[ino] = &trace;
+ pending_lookup_context[ino] = onfinish;
+
+ messenger->send_message(req,
+ MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+{
+ // send message
+ MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_CREATE, ino);
+ req->set_trace(trace);
+
+ pending_op[ino] = onfinish;
+
+ messenger->send_message(req,
+ MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+{
+ // send message
+ MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_UPDATE, ino);
+ req->set_trace(trace);
+
+ pending_op[ino] = onfinish;
+
+ messenger->send_message(req,
+ MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::destroy(inodeno_t ino, Context *onfinish)
+{
+ // send message
+ MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_DESTROY, ino);
+
+ pending_op[ino] = onfinish;
+
+ messenger->send_message(req,
+ MSG_ADDR_MDS(mdsmap->get_anchortable()), mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+}
+
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __ANCHORCLIENT_H
+#define __ANCHORCLIENT_H
+
+#include <vector>
+using std::vector;
+#include <ext/hash_map>
+using __gnu_cxx::hash_map;
+
+#include "include/types.h"
+#include "msg/Dispatcher.h"
+
+#include "Anchor.h"
+
+class Messenger;
+class MDSMap;
+class Context;
+
+class AnchorClient : public Dispatcher {
+ Messenger *messenger;
+ MDSMap *mdsmap;
+
+ // remote state
+ hash_map<inodeno_t, Context*> pending_op;
+ hash_map<inodeno_t, Context*> pending_lookup_context;
+ hash_map<inodeno_t, vector<Anchor*>*> pending_lookup_trace;
+
+ void handle_anchor_reply(class MAnchorReply *m);
+
+
+public:
+ AnchorClient(Messenger *ms, MDSMap *mm) : messenger(ms), mdsmap(mm) {}
+
+ // async user interface
+ void lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
+ void create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
+ void update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
+ void destroy(inodeno_t ino, Context *onfinish);
+
+ void dispatch(Message *m);
+};
+
+#endif
*
*/
-
-
#include "AnchorTable.h"
#include "MDS.h"
#include "messages/MAnchorRequest.h"
#include "messages/MAnchorReply.h"
+#include "common/Clock.h"
+
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug) cout << "anchortable: "
+#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
+#define derr(x) if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
AnchorTable::AnchorTable(MDS *mds)
{
bool AnchorTable::add(inodeno_t ino, inodeno_t dirino, string& ref_dn)
{
- dout(7) << "add " << ino << " dirino " << dirino << " ref_dn " << ref_dn << endl;
-
+ dout(7) << "add " << std::hex << ino << " dirino " << dirino << std::dec << " ref_dn " << ref_dn << endl;
+
// parent should be there
assert(dirino < 1000 || // system dirino
anchor_map.count(dirino)); // have
if (anchor_map.count(ino) == 0) {
// new item
anchor_map[ ino ] = new Anchor(ino, dirino, ref_dn);
- dout(10) << " add: added " << ino << endl;
+ dout(10) << " add: added " << std::hex << ino << std::dec << endl;
return true;
} else {
- dout(10) << " add: had " << ino << endl;
+ dout(10) << " add: had " << std::hex << ino << std::dec << endl;
return false;
}
}
void AnchorTable::inc(inodeno_t ino)
{
- dout(7) << "inc " << ino << endl;
+ dout(7) << "inc " << std::hex << ino << std::dec << endl;
assert(anchor_map.count(ino) != 0);
Anchor *anchor = anchor_map[ino];
while (1) {
anchor->nref++;
- dout(10) << " inc: record " << ino << " now " << anchor->nref << endl;
+ dout(10) << " inc: record " << std::hex << ino << std::dec << " now " << anchor->nref << endl;
ino = anchor->dirino;
if (ino == 0) break;
void AnchorTable::dec(inodeno_t ino)
{
- dout(7) << "dec " << ino << endl;
+ dout(7) << "dec " << std::hex << ino << std::dec << endl;
assert(anchor_map.count(ino) != 0);
Anchor *anchor = anchor_map[ino];
anchor->nref--;
if (anchor->nref == 0) {
- dout(10) << " dec: record " << ino << " now 0, removing" << endl;
+ dout(10) << " dec: record " << std::hex << ino << std::dec << " now 0, removing" << endl;
inodeno_t dirino = anchor->dirino;
anchor_map.erase(ino);
delete anchor;
ino = dirino;
} else {
- dout(10) << " dec: record " << ino << " now " << anchor->nref << endl;
+ dout(10) << " dec: record " << std::hex << ino << std::dec << " now " << anchor->nref << endl;
ino = anchor->dirino;
}
void AnchorTable::lookup(inodeno_t ino, vector<Anchor*>& trace)
{
- dout(7) << "lookup " << ino << endl;
+ dout(7) << "lookup " << std::hex << ino << std::dec << endl;
assert(anchor_map.count(ino) == 1);
Anchor *anchor = anchor_map[ino];
assert(anchor);
while (true) {
- dout(10) << " record " << anchor->ino << " dirino " << anchor->dirino << " ref_dn " << anchor->ref_dn << endl;
+ dout(10) << " record " << std::hex << anchor->ino << " dirino " << anchor->dirino << std::dec << " ref_dn " << anchor->ref_dn << endl;
trace.insert(trace.begin(), anchor); // lame FIXME
if (anchor->dirino < MDS_INO_BASE) break;
void AnchorTable::create(inodeno_t ino, vector<Anchor*>& trace)
{
- dout(7) << "create " << ino << endl;
+ dout(7) << "create " << std::hex << ino << std::dec << endl;
// make sure trace is in table
for (unsigned i=0; i<trace.size(); i++)
case MSG_MDS_ANCHORREQUEST:
handle_anchor_request((MAnchorRequest*)m);
break;
-
- case MSG_MDS_ANCHORREPLY:
- handle_anchor_reply((MAnchorReply*)m);
- break;
default:
assert(0);
delete m;
}
-void AnchorTable::handle_anchor_reply(class MAnchorReply *m)
-{
- switch (m->get_op()) {
-
- case ANCHOR_OP_LOOKUP:
- {
- assert(pending_lookup_trace.count(m->get_ino()) == 1);
-
- *(pending_lookup_trace[ m->get_ino() ]) = m->get_trace();
- Context *onfinish = pending_lookup_context[ m->get_ino() ];
-
- pending_lookup_trace.erase(m->get_ino());
- pending_lookup_context.erase(m->get_ino());
-
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
- }
- break;
-
- case ANCHOR_OP_UPDATE:
- case ANCHOR_OP_CREATE:
- case ANCHOR_OP_DESTROY:
- {
- assert(pending_op.count(m->get_ino()) == 1);
-
- Context *onfinish = pending_op[m->get_ino()];
- pending_op.erase(m->get_ino());
-
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
- }
- break;
-
- default:
- assert(0);
- }
-
-}
-
-
-/*
- * public async interface
- */
-
-void AnchorTable::lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
-{
- // me?
- if (false && mds->get_nodeid() == 0) {
- lookup(ino, trace);
- onfinish->finish(0);
- delete onfinish;
- return;
- }
-
- // send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_LOOKUP, ino);
-
- pending_lookup_trace[ino] = &trace;
- pending_lookup_context[ino] = onfinish;
-
- mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-void AnchorTable::create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
-{
- // me?
- if (false && mds->get_nodeid() == 0) {
- create(ino, trace);
- onfinish->finish(0);
- delete onfinish;
- return;
- }
-
- // send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_CREATE, ino);
- req->set_trace(trace);
-
- pending_op[ino] = onfinish;
-
- mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-void AnchorTable::update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
-{
- // send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_UPDATE, ino);
- req->set_trace(trace);
-
- pending_op[ino] = onfinish;
-
- mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-void AnchorTable::destroy(inodeno_t ino, Context *onfinish)
-{
- // me?
- if (false && mds->get_nodeid() == 0) {
- destroy(ino);
- onfinish->finish(0);
- delete onfinish;
- return;
- }
-
- // send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_DESTROY, ino);
-
- pending_op[ino] = onfinish;
-
- mds->messenger->send_message(req, MSG_ADDR_MDS(0), MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORMGR);
-}
-
-
-
if (!opened) return;
// build up write
- crope tab;
+ bufferlist tabbl;
int num = anchor_map.size();
- tab.append((char*)&num, sizeof(int));
+ tabbl.append((char*)&num, sizeof(int));
for (hash_map<inodeno_t, Anchor*>::iterator it = anchor_map.begin();
it != anchor_map.end();
it++) {
- dout(14) << "adding anchor for " << it->first << endl;
+ dout(14) << " saving anchor for " << std::hex << it->first << std::dec << endl;
Anchor *a = it->second;
assert(a);
- a->_rope(tab);
+ a->_encode(tabbl);
}
- size_t size = tab.length();
- tab.insert(0, (char*)&size, sizeof(size));
+ bufferlist bl;
+ size_t size = tabbl.length();
+ bl.append((char*)&size, sizeof(size));
+ bl.claim_append(tabbl);
dout(7) << " " << num << " anchors, " << size << " bytes" << endl;
// write!
- bufferlist bl;
- bl.append(tab.c_str(), tab.length());
mds->filer->write(table_inode,
0, bl.length(),
bl, 0,
class C_AT_Load : public Context {
AnchorTable *at;
- Context *onfinish;
public:
size_t size;
bufferlist bl;
- C_AT_Load(size_t size, AnchorTable *at, Context *onfinish) {
+ C_AT_Load(size_t size, AnchorTable *at) {
this->size = size;
this->at = at;
- this->onfinish = onfinish;
}
void finish(int result) {
assert(result > 0);
- at->load_2(size, bl, onfinish);
+ at->load_2(size, bl);
}
};
class C_AT_LoadSize : public Context {
AnchorTable *at;
MDS *mds;
- Context *onfinish;
public:
bufferlist bl;
- C_AT_LoadSize(AnchorTable *at, MDS *mds, Context *onfinish) {
+ C_AT_LoadSize(AnchorTable *at, MDS *mds) {
this->at = at;
this->mds = mds;
- this->onfinish = onfinish;
}
void finish(int r) {
size_t size = 0;
bl.copy(0, sizeof(size), (char*)&size);
cout << "r is " << r << " size is " << size << endl;
if (r > 0 && size > 0) {
- C_AT_Load *c = new C_AT_Load(size, at, onfinish);
+ C_AT_Load *c = new C_AT_Load(size, at);
mds->filer->read(at->table_inode,
sizeof(size), size,
&c->bl,
} else {
// fail
bufferlist empty;
- at->load_2(0, empty, onfinish);
+ at->load_2(0, empty);
}
}
};
dout(7) << "load" << endl;
assert(!opened);
+
+ waiting_for_open.push_back(onfinish);
- C_AT_LoadSize *c = new C_AT_LoadSize(this, mds, onfinish);
+ C_AT_LoadSize *c = new C_AT_LoadSize(this, mds);
mds->filer->read(table_inode,
0, sizeof(size_t),
&c->bl,
c);
}
-void AnchorTable::load_2(size_t size, bufferlist& bl, Context *onfinish)
+void AnchorTable::load_2(size_t size, bufferlist& bl)
{
- // make a rope to be easy.. FIXME someday
- crope r;
- bl._rope(r);
-
// num
int off = 0;
int num;
- r.copy(0, sizeof(num), (char*)&num);
+ bl.copy(0, sizeof(num), (char*)&num);
off += sizeof(num);
// parse anchors
for (int i=0; i<num; i++) {
Anchor *a = new Anchor;
- a->_unrope(r, off);
- dout(10) << " load_2 unroped " << a->ino << " dirino " << a->dirino << " ref_dn " << a->ref_dn << endl;
+ a->_decode(bl, off);
+ dout(10) << "load_2 decoded " << std::hex << a->ino << " dirino " << a->dirino << std::dec << " ref_dn " << a->ref_dn << endl;
anchor_map[a->ino] = a;
}
opening = false;
// finish
- if (onfinish) {
- onfinish->finish(0);
- delete onfinish;
- }
finish_contexts(waiting_for_open);
}
#ifndef __ANCHORTABLE_H
#define __ANCHORTABLE_H
-#include "include/types.h"
+#include "Anchor.h"
#include "include/Context.h"
-#include "include/bufferlist.h"
#include <ext/hash_map>
using namespace __gnu_cxx;
class MDS;
-class Anchor {
-public:
- inodeno_t ino; // my ino
- inodeno_t dirino; // containing dir
- string ref_dn; // referring dentry
- int nref; // reference count
-
- Anchor() {}
- Anchor(inodeno_t ino, inodeno_t dirino, string& ref_dn, int nref=0) {
- this->ino = ino;
- this->dirino = dirino;
- this->ref_dn = ref_dn;
- this->nref = nref;
- }
-
- void _rope(crope& r) {
- r.append((char*)&ino, sizeof(ino));
- r.append((char*)&dirino, sizeof(dirino));
- r.append((char*)&nref, sizeof(nref));
- ::_rope(ref_dn, r);
- }
- void _unrope(crope& r, int& off) {
- r.copy(off, sizeof(ino), (char*)&ino);
- off += sizeof(ino);
- r.copy(off, sizeof(dirino), (char*)&dirino);
- off += sizeof(dirino);
- r.copy(off, sizeof(nref), (char*)&nref);
- off += sizeof(nref);
- ::_unrope(ref_dn, r, off);
- }
-} ;
-
class AnchorTable {
MDS *mds;
bool opening, opened;
list<Context*> waiting_for_open;
- // remote state
- hash_map<inodeno_t, Context*> pending_op;
- hash_map<inodeno_t, Context*> pending_lookup_context;
- hash_map<inodeno_t, vector<Anchor*>*> pending_lookup_trace;
-
public:
inode_t table_inode;
void proc_message(class Message *m);
protected:
void handle_anchor_request(class MAnchorRequest *m);
- void handle_anchor_reply(class MAnchorReply *m);
public:
- // user interface
- void lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
- void create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
- void update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
- void destroy(inodeno_t ino, Context *onfinish);
-
-
// load/save entire table for now!
void reset() {
}
void save(Context *onfinish);
void load(Context *onfinish);
- void load_2(size_t size, bufferlist& bl, Context *onfinish);
+ void load_2(size_t size, bufferlist& bl);
};
#include "MDSMap.h"
#include "include/Context.h"
+#include "common/Clock.h"
#include <cassert>
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << "mds" << mds->get_nodeid() << " cdir: "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << " cdir: "
// PINS
#include "MDS.h"
#include "AnchorTable.h"
+#include "common/Clock.h"
+
#include <string>
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << "cinode: "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << " cinode: "
int cinode_pins[CINODE_NUM_PINS]; // counts
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "LogEvent.h"
+
+#include "MDS.h"
+
+// events i know of
+#include "events/EString.h"
+#include "events/EInodeUpdate.h"
+#include "events/EDirUpdate.h"
+#include "events/EUnlink.h"
+#include "events/EAlloc.h"
+
+LogEvent *LogEvent::decode(bufferlist& bl)
+{
+ // parse type, length
+ int off = 0;
+ __uint32_t type;
+ bl.copy(off, sizeof(type), (char*)&type);
+ off += sizeof(type);
+
+ int length = bl.length() - off;
+ dout(15) << "decode_log_event type " << type << ", size " << length << endl;
+
+ assert(type > 0);
+
+ // create event
+ LogEvent *le;
+ switch (type) {
+ case EVENT_STRING: // string
+ le = new EString();
+ break;
+
+ case EVENT_INODEUPDATE:
+ le = new EInodeUpdate();
+ break;
+
+ case EVENT_DIRUPDATE:
+ le = new EDirUpdate();
+ break;
+
+ case EVENT_UNLINK:
+ le = new EUnlink();
+ break;
+
+ case EVENT_ALLOC:
+ le = new EAlloc();
+ break;
+
+ default:
+ dout(1) << "uh oh, unknown event type " << type << endl;
+ assert(0);
+ }
+
+ // decode
+ le->decode_payload(bl, off);
+
+ return le;
+}
+
#ifndef __LOGEVENT_H
#define __LOGEVENT_H
-#include <stdlib.h>
-#include <string>
-#include <ext/rope>
-using namespace std;
-
#define EVENT_STRING 1
#define EVENT_INODEUPDATE 2
#define EVENT_DIRUPDATE 3
#define EVENT_UNLINK 4
#define EVENT_ALLOC 5
-#include "config.h"
+#include <string>
+using namespace std;
+
+#include "include/bufferlist.h"
+#include "include/Context.h"
+
+class MDS;
// generic log event
class LogEvent {
assert(_type > 0);
bl.append((char*)&_type, sizeof(_type));
- // len placeholder
- int len = 0; // we don't know just yet...
- int off = bl.length();
- bl.append((char*)&len, sizeof(len));
-
// payload
encode_payload(bl);
- // HACK: pad payload to match md log layout?
+ /*// HACK: pad payload to match md log layout?
int elen = bl.length() - off + sizeof(_type);
if (elen % g_conf.mds_log_pad_entry > 0) {
int add = g_conf.mds_log_pad_entry - (elen % g_conf.mds_log_pad_entry);
}
len = bl.length() - off - sizeof(len);
-
bl.copy_in(off, sizeof(len), (char*)&len);
+ */
}
+
+ static LogEvent *decode(bufferlist &bl);
+ // ...
virtual bool obsolete(MDS *m) {
return true;
}
c->finish(0);
delete c;
}
+
};
#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-
-#include "LogStream.h"
-#include "MDS.h"
-#include "LogEvent.h"
-
-#include "osdc/Filer.h"
-
-#include "common/Logger.h"
-
-#include "events/EString.h"
-#include "events/EInodeUpdate.h"
-#include "events/EDirUpdate.h"
-#include "events/EUnlink.h"
-#include "events/EAlloc.h"
-
-#include <iostream>
-using namespace std;
-
-#include "config.h"
-#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".logstream "
-
-
-
-
-LogStream::LogStream(MDS *mds, Filer *filer, inodeno_t log_ino, Logger *l)
-{
- this->mds = mds;
- this->filer = filer;
- this->logger = l;
-
- // inode
- memset(&log_inode, 0, sizeof(log_inode));
- log_inode.ino = log_ino;
- log_inode.layout = g_OSD_MDLogLayout;
-
- if (g_conf.mds_local_osd) {
- log_inode.layout.object_layout = OBJECT_LAYOUT_STARTOSD;
- log_inode.layout.osd = mds->get_nodeid() + 10000; // hack
- }
-
- // wr
- sync_pos = flush_pos = append_pos = 0;
- autoflush = true;
-
- // rd
- read_pos = 0;
- reading = false;
-}
-
-
-
-// ----------------------------
-// writing
-
-class C_LS_Append : public Context {
- LogStream *ls;
- off_t off;
-public:
- C_LS_Append(LogStream *ls, off_t off) {
- this->ls = ls;
- this->off = off;
- }
- void finish(int r) {
- ls->_append_2(off);
- }
-};
-
-
-off_t LogStream::append(LogEvent *e)
-{
- // serialize
- bufferlist bl;
- e->encode(bl);
-
- size_t elen = bl.length();
-
- // append
- dout(15) << "append event type " << e->get_type() << " size " << elen << " at log offset " << append_pos << endl;
-
- off_t off = append_pos;
- append_pos += elen;
-
- //dout(15) << "write buf was " << write_buf.length() << " bl " << write_buf << endl;
- write_buf.claim_append(bl);
- //dout(15) << "write buf now " << write_buf.length() << " bl " << write_buf << endl;
-
- return off;
-}
-
-void LogStream::_append_2(off_t off)
-{
- dout(15) << g_clock.now() << " sync_pos now " << off << " skew " << off % g_conf.mds_log_pad_entry << endl;
- sync_pos = off;
-
- // discard written bufferlist
- assert(writing_buffers.count(off) == 1);
- delete writing_buffers[off];
- writing_buffers.erase(off);
-
- utime_t now = g_clock.now();
- now -= writing_latency[off];
- writing_latency.erase(off);
- logger->finc("lsum", (double)now);
- logger->inc("lnum", 1);
-
- // wake up waiters
- map< off_t, list<Context*> >::iterator it = waiting_for_sync.begin();
- while (it != waiting_for_sync.end()) {
- if (it->first > sync_pos) break;
-
- // wake them up!
- dout(15) << it->second.size() << " waiters at offset " << it->first << " <= " << sync_pos << endl;
- for (list<Context*>::iterator cit = it->second.begin();
- cit != it->second.end();
- cit++)
- mds->finished_queue.push_back(*cit);
-
- // continue
- waiting_for_sync.erase(it++);
- }
-}
-
-
-void LogStream::wait_for_sync(Context *c, off_t off)
-{
- if (off == 0) off = append_pos;
- assert(off > sync_pos);
-
- dout(15) << "sync " << c << " waiting for " << off << " (sync_pos currently " << sync_pos << ")" << endl;
- waiting_for_sync[off].push_back(c);
-
- // initiate flush now? (since we have a waiter...)
- if (autoflush) flush();
-}
-
-void LogStream::flush()
-{
- // write to disk
- if (write_buf.length()) {
- dout(15) << "flush flush_pos " << flush_pos << " < append_pos " << append_pos << ", writing " << write_buf.length() << " bytes" << endl;
-
- assert(write_buf.length() == append_pos - flush_pos);
-
- // tuck writing buffer away until write finishes
- writing_buffers[append_pos] = new bufferlist;
- writing_buffers[append_pos]->claim(write_buf);
-
- writing_latency[append_pos] = g_clock.now();
-
- // write it
- mds->filer->write(log_inode,
- flush_pos, writing_buffers[append_pos]->length(),
- *writing_buffers[append_pos],
- 0,
- new C_LS_Append(this, append_pos), // on ack
- NULL); // on safe
-
- flush_pos = append_pos;
- } else {
- dout(15) << "flush flush_pos " << flush_pos << " == append_pos " << append_pos << ", nothing to do" << endl;
- }
-
-}
-
-
-
-
-
-
-// -------------------------------------------------
-// reading
-
-
-LogEvent *LogStream::get_next_event()
-{
- if (read_buf.length() < 2*sizeof(__uint32_t))
- return 0;
-
- // parse type, length
- int off = 0;
- __uint32_t type, length;
- read_buf.copy(off, sizeof(__uint32_t), (char*)&type);
- off += sizeof(__uint32_t);
- read_buf.copy(off, sizeof(__uint32_t), (char*)&length);
- off += sizeof(__uint32_t);
-
- dout(15) << "getting next event from " << read_pos << ", type " << type << ", size " << length << endl;
-
- assert(type > 0);
-
- if (read_buf.length() < off + length)
- return 0;
-
- // create event
- LogEvent *le;
- switch (type) {
- case EVENT_STRING: // string
- le = new EString();
- break;
-
- case EVENT_INODEUPDATE:
- le = new EInodeUpdate();
- break;
-
- case EVENT_DIRUPDATE:
- le = new EDirUpdate();
- break;
-
- case EVENT_UNLINK:
- le = new EUnlink();
- break;
-
- case EVENT_ALLOC:
- le = new EAlloc();
- break;
-
- default:
- dout(1) << "uh oh, unknown event type " << type << endl;
- assert(0);
- }
-
- // decode
- le->decode_payload(read_buf, off);
- off = sizeof(type) + sizeof(length) + length; // advance past any padding that wasn't decoded..
-
- // discard front of read_buf
- read_pos += off;
- read_buf.splice(0, off);
-
- dout(15) << "get_next_event got event, read_pos now " << read_pos << " (append_pos is " << append_pos << ")" << endl;
-
- // ok!
- return le;
-}
-
-
-
-class C_LS_ReadAfterSync : public Context {
-public:
- LogStream *ls;
- Context *waitfornext;
- C_LS_ReadAfterSync(LogStream *l, Context *c) : ls(l), waitfornext(c) {}
- void finish(int) {
- ls->wait_for_next_event(waitfornext);
- }
-};
-
-class C_LS_ReadChunk : public Context {
-public:
- bufferlist bl;
- LogStream *ls;
-
- C_LS_ReadChunk(LogStream *ls) {
- this->ls = ls;
- }
- void finish(int result) {
- ls->_did_read(bl);
- }
-};
-
-
-void LogStream::wait_for_next_event(Context *c)
-{
- if (read_pos == sync_pos) {
- dout(-15) << "waiting for sync before initiating read at " << read_pos << endl;
- wait_for_sync(new C_LS_ReadAfterSync(this, c));
- return;
- }
-
- // add waiter
- if (c) waiting_for_read.push_back(c);
-
- // issue read
- off_t tail = read_pos + read_buf.length();
- size_t size = g_conf.mds_log_read_inc;
- if (tail + (off_t)size > sync_pos) {
- size = sync_pos - tail;
- dout(15) << "wait_for_next_event ugh.. read_pos is " << read_pos << ", tail is " << tail << ", sync_pos only " << sync_pos << ", flush_pos " << flush_pos << ", append_pos " << append_pos << endl;
-
- if (size == 0) {
- // assert(size > 0); // bleh, wait for sync, etc.
- // just do it. communication is ordered, right? FIXME SOMEDAY this is totally gross blech
- //size = flush_pos - tail;
- // read tiny bit, kill some time
- assert(flush_pos > sync_pos);
- size = 1;
- }
- }
-
- dout(15) << "wait_for_next_event reading from pos " << tail << " len " << size << endl;
- C_LS_ReadChunk *readc = new C_LS_ReadChunk(this);
- mds->filer->read(log_inode,
- tail, g_conf.mds_log_read_inc,
- &readc->bl,
- readc);
-}
-
-
-void LogStream::_did_read(bufferlist& blist)
-{
- dout(15) << "_did_read got " << blist.length() << " bytes at offset " << (read_pos + read_buf.length()) << endl;
- read_buf.claim_append(blist);
-
- list<Context*> finished;
- finished.splice(finished.begin(), waiting_for_read);
- finish_contexts(finished, 0);
-}
-
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __LOGSTREAM_H
-#define __LOGSTREAM_H
-
-#include "include/types.h"
-#include "include/Context.h"
-
-#include "include/buffer.h"
-#include "include/bufferlist.h"
-
-#include "common/Clock.h"
-
-#include <map>
-#include <list>
-
-class LogEvent;
-class Filer;
-class MDS;
-class Logger;
-
-class LogStream {
- protected:
- MDS *mds;
- Filer *filer;
- Logger *logger;
-
- inode_t log_inode;
-
- // writing
- off_t sync_pos; // first non-written byte
- off_t flush_pos; // first non-writing byte, beginning of write_buf
- off_t append_pos; // where next event will be written
- bufferlist write_buf; // unwritten (between flush_pos and append_pos)
-
- std::map< off_t, bufferlist* > writing_buffers;
- std::map< off_t, utime_t > writing_latency;
-
- // reading
- off_t read_pos; // abs position in file
- //off_t read_buf_start; // where read buf begins
- bufferlist read_buf;
- bool reading;
-
- std::map< off_t, std::list<Context*> > waiting_for_sync;
- std::list<Context*> waiting_for_read;
-
-
- bool autoflush;
-
- public:
- LogStream(MDS *mds, Filer *filer, inodeno_t log_ino, Logger *l);
-
- off_t get_read_pos() { return read_pos; }
- off_t get_append_pos() { return append_pos; }
-
- // write (append to end)
- off_t append(LogEvent *e); // returns offset it will be written to
- void _append_2(off_t off);
- void wait_for_sync(Context *c, off_t off=0); // wait for flush
- void flush(); // initiate flush
-
- // read (from front)
- //bool has_next_event();
- LogEvent *get_next_event();
- void wait_for_next_event(Context *c);
- void _did_read(bufferlist& blist);
-
-
- // old interface
- /*
- // WARNING: non-reentrant; single reader only
- int read_next(LogEvent **le, Context *c, int step=1);
- void did_read_bit(crope& next_bit, LogEvent **le, Context *c) ;
-
- int append(LogEvent *e, Context *c); // append at cur_pos, mind you!
- */
-};
-
-#endif
#include "MDSMap.h"
#include "MDLog.h"
#include "MDBalancer.h"
-#include "AnchorTable.h"
+#include "AnchorClient.h"
#include "include/filepath.h"
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << "mds" << mds->get_nodeid() << ".cache "
+#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".cache "
dout(7) << "open_remote_ino on " << ino << endl;
C_MDC_OpenRemoteInoLookup *c = new C_MDC_OpenRemoteInoLookup(this, ino, req, onfinish);
- mds->anchormgr->lookup(ino, c->anchortrace, c);
+ mds->anchorclient->lookup(ino, c->anchortrace, c);
}
void MDCache::open_remote_ino_2(inodeno_t ino,
in->make_anchor_trace(trace);
// do it
- mds->anchormgr->create(in->ino(), trace,
+ mds->anchorclient->create(in->ino(), trace,
new C_MDC_AnchorInode( in ));
}
}
vector<Anchor*> atrace;
in->make_anchor_trace(atrace);
assert(atrace.size() == 1); // it's dangling
- mds->anchormgr->update(in->ino(), atrace,
+ mds->anchorclient->update(in->ino(), atrace,
new C_MDC_DentryUnlink(this, dn, dir, c));
return;
}
dout(7) << "nlink=1+primary or 0+dangling, removing anchor" << endl;
// remove anchor (async)
- mds->anchormgr->destroy(dn->inode->ino(), NULL);
+ mds->anchorclient->destroy(dn->inode->ino(), NULL);
}
} else {
int auth = dn->inode->authority();
in->mark_clean(); // mark it clean.
// remove anchor (async)
- mds->anchormgr->destroy(in->ino(), NULL);
+ mds->anchorclient->destroy(in->ino(), NULL);
}
else {
in->mark_dirty();
dout(7) << "nlink=1, removing anchor" << endl;
// remove anchor (async)
- mds->anchormgr->destroy(in->ino(), NULL);
+ mds->anchorclient->destroy(in->ino(), NULL);
}
}
*
*/
-
-
-#include "include/types.h"
#include "MDLog.h"
#include "MDS.h"
-#include "LogStream.h"
#include "LogEvent.h"
+#include "osdc/LogStreamer.h"
+
#include "common/LogType.h"
#include "common/Logger.h"
-#include "msg/Message.h"
-
-LogType mdlog_logtype;
#include "config.h"
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_mds_log) cout << "mds" << mds->get_nodeid() << ".log "
+#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".log "
+#define derr(l) if (l<=g_conf.debug || l <= g_conf.debug_mds_log) cout << g_clock.now() << " mds" << mds->get_nodeid() << ".log "
// cons/des
+LogType mdlog_logtype;
+
MDLog::MDLog(MDS *m)
{
mds = m;
num_events = 0;
- max_events = 0;
-
waiting_for_read = false;
+ max_events = g_conf.mds_log_max_len;
+
+ // logger
char name[80];
sprintf(name, "mds%d.log", mds->get_nodeid());
- logger = new Logger(name, (LogType*)&mdlog_logtype);
-
- logstream = new LogStream(mds, mds->filer, MDS_INO_LOG_OFFSET + mds->get_nodeid(), logger);
+ logger = new Logger(name, &mdlog_logtype);
static bool didit = false;
if (!didit) {
mdlog_logtype.add_inc("lsum");
mdlog_logtype.add_inc("lnum");
}
+
+ // inode
+ memset(&log_inode, 0, sizeof(log_inode));
+ log_inode.ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
+ log_inode.layout = g_OSD_MDLogLayout;
+
+ if (g_conf.mds_local_osd) {
+ log_inode.layout.object_layout = OBJECT_LAYOUT_STARTOSD;
+ log_inode.layout.osd = mds->get_nodeid() + 10000; // hack
+ }
+
+ // log streamer
+ logstreamer = new LogStreamer(log_inode, mds->objecter, logger);
+
}
MDLog::~MDLog()
{
- if (logstream) { delete logstream; logstream = 0; }
+ if (logstreamer) { delete logstreamer; logstreamer = 0; }
if (logger) { delete logger; logger = 0; }
}
void MDLog::submit_entry( LogEvent *e,
- Context *c )
+ Context *c )
{
dout(5) << "submit_entry at " << num_events << endl;
if (g_conf.mds_log) {
- off_t off = logstream->append(e);
+ // encode and append
+ bufferlist bl;
+ e->encode(bl);
+ logstreamer->append_entry(bl);
+
delete e;
num_events++;
logger->inc("add");
logger->set("size", num_events);
- logger->set("append", logstream->get_append_pos());
+ logger->set("append", logstreamer->get_write_pos());
if (c)
- logstream->wait_for_sync(c, off);
+ logstreamer->flush(c);
} else {
- // hack: log is disabled..
+ // hack: log is disabled.
if (c) {
c->finish(0);
delete c;
{
if (g_conf.mds_log) {
// wait
- logstream->wait_for_sync(c);
+ logstreamer->flush(c);
} else {
- // hack: bypass
+ // hack: bypass.
c->finish(0);
delete c;
}
void MDLog::flush()
{
- logstream->flush();
+ logstreamer->flush();
// trim
trim(NULL);
trim(0);
}
+void MDLog::_trimmed(LogEvent *le)
+{
+ dout(7) << " trimmed " << le << endl;
+ trimming.erase(le);
+ delete le;
+
+ logger->set("trim", trimming.size());
+ logger->set("read", logstreamer->get_read_pos());
+
+ trim(0);
+}
+
+
+
void MDLog::trim(Context *c)
{
// add waiter
- if (c) trim_waiters.push_back(c);
+ if (c)
+ trim_waiters.push_back(c);
// trim!
while (num_events > max_events) {
+ off_t gap = logstreamer->get_write_pos() - logstreamer->get_read_pos();
+ dout(5) << "trim num_events " << num_events << " > max " << max_events
+ << ", trimming " << trimming.size()
+ << ", byte gap " << gap
+ << endl;
+
if ((int)trimming.size() >= g_conf.mds_log_max_trimming) {
- dout(7) << " already trimming max, waiting" << endl;
+ dout(7) << "trim already trimming max, waiting" << endl;
return;
}
-
- off_t gap = logstream->get_append_pos() - logstream->get_read_pos();
- dout(5) << "trim: num_events " << num_events << " - trimming " << trimming.size() << " > max " << max_events << " .. gap " << gap << endl;
- LogEvent *le = logstream->get_next_event();
-
- if (le) {
+ bufferlist bl;
+ if (logstreamer->try_read_entry(bl)) {
+ // decode logevent
+ LogEvent *le = LogEvent::decode(bl);
num_events--;
// we just read an event.
if (le->obsolete(mds) == true) {
// obsolete
- dout(7) << " obsolete " << le << endl;
+ dout(7) << "trim obsolete " << le << endl;
delete le;
logger->inc("obs");
} else {
assert ((int)trimming.size() < g_conf.mds_log_max_trimming);
// trim!
- dout(7) << " trimming " << le << endl;
+ dout(7) << "trim trimming " << le << endl;
trimming.insert(le);
le->retire(mds, new C_MDL_Trimmed(this, le));
logger->inc("retire");
logger->set("trim", trimming.size());
}
- logger->set("read", logstream->get_read_pos());
+ logger->set("read", logstreamer->get_read_pos());
logger->set("size", num_events);
} else {
// need to read!
if (!waiting_for_read) {
waiting_for_read = true;
- dout(7) << " waiting for read" << endl;
- logstream->wait_for_next_event(new C_MDL_Reading(this));
+ dout(7) << "trim waiting for read" << endl;
+ logstreamer->wait_for_readable(new C_MDL_Reading(this));
} else {
- dout(7) << " already waiting for read" << endl;
+ dout(7) << "trim already waiting for read" << endl;
}
return;
}
}
+
+ dout(5) << "trim num_events " << num_events << " <= max " << max_events
+ << ", trimming " << trimming.size()
+ << ", done for now."
+ << endl;
// trimmed!
- list<Context*> finished;
- finished.splice(finished.begin(), trim_waiters);
-
+ std::list<Context*> finished;
+ finished.swap(trim_waiters);
finish_contexts(finished, 0);
}
-void MDLog::_trimmed(LogEvent *le)
-{
- dout(7) << " trimmed " << le << endl;
- trimming.erase(le);
- delete le;
-
- logger->set("trim", trimming.size());
- logger->set("read", logstream->get_read_pos());
-
- trim(0);
-}
-
#ifndef __MDLOG_H
#define __MDLOG_H
-/*
-
-hmm, some things that go in the MDS log:
-
-
-prepare + commit versions of many of these?
-
-- inode update
- ??? entry will include mdloc_t of dir it resides in...
-
-- directory operation
- unlink,
- rename= atomic link+unlink (across multiple dirs, possibly...)
-
-- import
-- export
-
-
-*/
-
-#include "../include/Context.h"
+#include "include/types.h"
+#include "include/Context.h"
#include <list>
-using namespace std;
-
#include <ext/hash_set>
using namespace __gnu_cxx;
-class LogStream;
+class LogStreamer;
class LogEvent;
class MDS;
size_t num_events; // in events
size_t max_events;
- LogStream *logstream;
+ inode_t log_inode;
+ LogStreamer *logstreamer;
hash_set<LogEvent*> trimming; // events currently being trimmed
- list<Context*> trim_waiters; // contexts waiting for trim
+ std::list<Context*> trim_waiters; // contexts waiting for trim
bool trim_reading;
bool waiting_for_read;
#include "MDLog.h"
#include "MDBalancer.h"
#include "IdAllocator.h"
+
#include "AnchorTable.h"
+#include "AnchorClient.h"
#include "include/filepath.h"
monmap = mm;
messenger = m;
- mdsmap = 0;
+ mdsmap = new MDSMap;
+
+ objecter = new Objecter(messenger, monmap, osdmap);
+ filer = new Filer(objecter);
mdcache = new MDCache(this);
mdstore = new MDStore(this);
mdlog = new MDLog(this);
balancer = new MDBalancer(this);
- anchormgr = new AnchorTable(this);
+
+ anchorclient = new AnchorClient(messenger, mdsmap);
+
+ // hack
+ if (whoami == 0)
+ anchormgr = new AnchorTable(this);
+ else
+ anchormgr = 0;
req_rate = 0;
osdmap = 0;
- objecter = new Objecter(messenger, monmap, osdmap);
- filer = new Filer(objecter);
- mdlog->set_max_events(g_conf.mds_log_max_len);
shutting_down = false;
shut_down = false;
if (balancer) { delete balancer; balancer = NULL; }
if (idalloc) { delete idalloc; idalloc = NULL; }
if (anchormgr) { delete anchormgr; anchormgr = NULL; }
+ if (anchorclient) { delete anchorclient; anchorclient = NULL; }
if (osdmap) { delete osdmap; osdmap = 0; }
if (filer) { delete filer; filer = 0; }
case MDS_PORT_ANCHORMGR:
anchormgr->proc_message(m);
break;
+ case MDS_PORT_ANCHORCLIENT:
+ anchorclient->dispatch(m);
+ break;
case MDS_PORT_CACHE:
mdcache->proc_message(m);
void MDS::handle_mds_map(MMDSMap *m)
{
- if (!mdsmap)
- mdsmap = new MDSMap;
-
map<epoch_t, bufferlist>::reverse_iterator p = m->maps.rbegin();
dout(1) << "handle_mds_map epoch " << p->first << endl;
// fake out idalloc (reset, pretend loaded)
idalloc->reset();
+ // fake out anchortable
+ if (mdsmap->get_anchortable() == whoami)
+ anchormgr->reset();
+
// init osds too
//mkfs(new C_MDS_Unpause(this));
//waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m));
if (event) mdlog->submit_entry(event);
if (event2) mdlog->submit_entry(event2);
- if (g_conf.mds_log_before_reply && g_conf.mds_log) {
+ if (g_conf.mds_log_before_reply && g_conf.mds_log && event) {
// SAFE mode!
// pin inode so it doesn't go away!
#define MDS_PORT_STORE 102
#define MDS_PORT_BALANCER 103
-#define MDS_PORT_ANCHORMGR 200
+#define MDS_PORT_ANCHORCLIENT 200
+#define MDS_PORT_ANCHORMGR 201
#define MDS_PORT_OSDMON 300
#define MDS_PORT_PGMGR 301
class Filer;
class AnchorTable;
+class AnchorClient;
class CInode;
class CDir;
class CDentry;
Objecter *objecter;
Filer *filer; // for reading/writing to/from osds
AnchorTable *anchormgr;
+ AnchorClient *anchorclient;
// PGManager *pgmanager;
ClientMap clientmap;
epoch_t epoch;
utime_t ctime;
+ int anchortable;
+
set<int> all_mds;
set<int> down_mds;
map<int,entity_inst_t> mds_inst;
friend class MDSMonitor;
public:
- MDSMap() : epoch(0) {}
+ MDSMap() : epoch(0), anchortable(0) {}
epoch_t get_epoch() const { return epoch; }
void inc_epoch() { epoch++; }
const utime_t& get_ctime() const { return ctime; }
+ int get_anchortable() const { return anchortable; }
+
int get_num_mds() const { return all_mds.size(); }
int get_num_up_mds() const { return all_mds.size() - down_mds.size(); }
void encode(bufferlist& blist) {
blist.append((char*)&epoch, sizeof(epoch));
blist.append((char*)&ctime, sizeof(ctime));
+ blist.append((char*)&anchortable, sizeof(anchortable));
_encode(all_mds, blist);
_encode(down_mds, blist);
off += sizeof(epoch);
blist.copy(off, sizeof(ctime), (char*)&ctime);
off += sizeof(ctime);
+ blist.copy(off, sizeof(anchortable), (char*)&anchortable);
+ off += sizeof(anchortable);
_decode(all_mds, blist, off);
_decode(down_mds, blist, off);
inodeno_t get_ino() { return ino; }
vector<Anchor*>& get_trace() { return trace; }
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(op), (char*)&op);
+ virtual void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(op), (char*)&op);
off += sizeof(op);
- s.copy(off, sizeof(ino), (char*)&ino);
+ payload.copy(off, sizeof(ino), (char*)&ino);
off += sizeof(ino);
int n;
- s.copy(off, sizeof(int), (char*)&n);
+ payload.copy(off, sizeof(int), (char*)&n);
off += sizeof(int);
for (int i=0; i<n; i++) {
Anchor *a = new Anchor;
- a->_unrope(s, off);
+ a->_decode(payload, off);
trace.push_back(a);
}
}
- virtual void encode_payload(crope& r) {
- r.append((char*)&op, sizeof(op));
- r.append((char*)&ino, sizeof(ino));
+ virtual void encode_payload() {
+ payload.append((char*)&op, sizeof(op));
+ payload.append((char*)&ino, sizeof(ino));
int n = trace.size();
- r.append((char*)&n, sizeof(int));
+ payload.append((char*)&n, sizeof(int));
for (int i=0; i<n; i++)
- trace[i]->_rope(r);
+ trace[i]->_encode(payload);
}
};
inodeno_t get_ino() { return ino; }
vector<Anchor*>& get_trace() { return trace; }
- virtual void decode_payload(crope& s, int& off) {
- s.copy(off, sizeof(op), (char*)&op);
+ virtual void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(op), (char*)&op);
off += sizeof(op);
- s.copy(off, sizeof(ino), (char*)&ino);
+ payload.copy(off, sizeof(ino), (char*)&ino);
off += sizeof(ino);
int n;
- s.copy(off, sizeof(int), (char*)&n);
+ payload.copy(off, sizeof(int), (char*)&n);
off += sizeof(int);
for (int i=0; i<n; i++) {
Anchor *a = new Anchor;
- a->_unrope(s, off);
+ a->_decode(payload, off);
trace.push_back(a);
}
}
- virtual void encode_payload(crope& r) {
- r.append((char*)&op, sizeof(op));
- r.append((char*)&ino, sizeof(ino));
+ virtual void encode_payload() {
+ payload.append((char*)&op, sizeof(op));
+ payload.append((char*)&ino, sizeof(ino));
int n = trace.size();
- r.append((char*)&n, sizeof(int));
+ payload.append((char*)&n, sizeof(int));
for (int i=0; i<n; i++)
- trace[i]->_rope(r);
+ trace[i]->_encode(payload);
}
};
if (write_pos == flush_pos) {
assert(write_buf.length() == 0);
dout(10) << "flush nothing to flush, write pointers at " << write_pos << "/" << flush_pos << "/" << ack_pos << endl;
+
+ if (onsync) {
+ onsync->finish(0);
+ delete onsync;
+ }
return;
}