+- clean up client mds session vs mdsmap behavior
+
+
mds bugs
- open file rejournaling vs capped log...
- open files vs shutdown in general! need to export any caps on replicated metadata
-- export caps to auth on unlinked inodes
- stray purge on shutdown
- rename slave in-memory rollback on failure
- fix purge_stray bug
- try_remove_unlinked_dn thing
-- client session open from locker.. doesn't work properly with delays
- -> journal the session open _with_ the import(start)
-
- proper handling of cache expire messages during rejoin phase?
-- verify once-per-segment jouranl context is working...
-
mds
- extend/clean up filepath to allow paths relative to an ino
- fix path_traverse
- fix reconnect/rejoin open file weirdness
-- get rid of replicate objects for replicate_to .. encode to bufferlists directly
+- get rid of C*Discover objects for replicate_to .. encode to bufferlists directly
- stray reintegration
-- verify stray is empty on shutdown
- real chdir (directory "open")
- relative metadata ops
if (!diri || g_conf.client_use_random_mds) {
// no root info, pick a random MDS
mds = mdsmap->get_random_in_mds();
+ dout(0) << "random mds" << mds << dendl;
if (mds < 0) mds = 0;
if (0) {
if (mds_sessions.count(mds) == 0) {
Cond cond;
- if (!mdsmap->have_inst(mds)) {
+ if (!mdsmap->is_active(mds)) {
dout(10) << "no address for mds" << mds << ", requesting new mdsmap" << dendl;
int mon = monmap->pick_mon();
- messenger->send_message(new MMDSGetMap(),
+ messenger->send_message(new MMDSGetMap(mdsmap->get_epoch()),
monmap->get_inst(mon));
waiting_for_mdsmap.push_back(&cond);
cond.Wait(client_lock);
- if (!mdsmap->have_inst(mds)) {
+ if (!mdsmap->is_active(mds)) {
dout(10) << "hmm, still have no address for mds" << mds << ", trying a random mds" << dendl;
request.resend_mds = mdsmap->get_random_in_mds();
continue;
mds_sessions[mds]++;
// reap?
- if (m->get_op() == MClientFileCaps::OP_REAP) {
+ if (m->get_op() == MClientFileCaps::OP_IMPORT) {
int other = m->get_mds();
if (in && in->stale_caps.count(other)) {
- dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " reap on mds" << other << dendl;
+ dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " imported from mds" << other << dendl;
// fresh from new mds?
if (!in->caps.count(mds)) {
// fall-thru!
} else {
- dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) reap on mds" << other << dendl;
+ dout(5) << "handle_file_caps on ino " << m->get_ino() << " from mds" << mds << " premature (!!) import from mds" << other << dendl;
// delay!
cap_reap_queue[in->ino()][other] = m;
return;
assert(in);
// stale?
- if (m->get_op() == MClientFileCaps::OP_STALE) {
- dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now stale" << dendl;
+ if (m->get_op() == MClientFileCaps::OP_EXPORT) {
+ dout(5) << "handle_file_caps on ino " << m->get_ino() << " seq " << m->get_seq() << " from mds" << mds << " now exported/stale" << dendl;
// move to stale list
assert(in->caps.count(mds));
if (more) {
client->lstat(d, &st);
int fd = client->open(d, O_RDONLY);
- //client->unlink(d);
- //client->close(fd);
+ client->unlink(d);
+ client->close(fd);
}
if (time_to_stop()) return 0;
fds.push_back(fd);
}
+ if (1)
+ for (int n=0; n<num; n++) {
+ sprintf(d,"file.%d", n);
+ client->unlink(d);
+ }
+
while (!fds.empty()) {
int fd = fds.front();
fds.pop_front();
::_encode_simple(replica_map, bl);
- map<int,Capability::Export> cap_map;
- export_client_caps(cap_map);
- ::_encode_simple(cap_map, bl);
-
authlock._encode(bl);
linklock._encode(bl);
dirfragtreelock._encode(bl);
}
void CInode::decode_import(bufferlist::iterator& p,
- map<CInode*, map<int,Capability::Export> >& imported_cap_map,
LogSegment *ls)
{
utime_t old_mtime = inode.mtime;
::_decode_simple(replica_map, p);
if (!replica_map.empty()) get(PIN_REPLICATED);
- map<int,Capability::Export> cap_map;
- ::_decode_simple(cap_map, p);
- if (!cap_map.empty()) {
- imported_cap_map[this].swap(cap_map);
- get(PIN_IMPORTINGCAPS);
- }
-
authlock._decode(p);
linklock._decode(p);
dirfragtreelock._decode(p);
dirlock._decode(p);
}
-/*
- * the cap import is delayed so that we can journal before
- * contacting clients
- */
-void CInode::import_caps(map<int,Capability::Export>& cap_map,
- set<int>& new_caps)
-{
- merge_client_caps(cap_map, new_caps);
- put(PIN_IMPORTINGCAPS);
-}
+
static const int STATE_FREEZING = (1<<7);
static const int STATE_FROZEN = (1<<8);
static const int STATE_AMBIGUOUSAUTH = (1<<9);
+ static const int STATE_EXPORTINGCAPS = (1<<10);
// -- waiters --
//static const int WAIT_SLAVEAGREE = (1<<0);
void abort_export() {
put(PIN_TEMPEXPORTING);
}
- void decode_import(bufferlist::iterator& p,
- map<CInode*, map<int,Capability::Export> >& imported_cap_map,
- LogSegment *ls);
- void import_caps(map<int,Capability::Export>& cap_map,
- set<int>& new_caps);
-
+ void decode_import(bufferlist::iterator& p, LogSegment *ls);
+
// -- locks --
public:
// events i know of
#include "events/EString.h"
-#include "events/ESession.h"
#include "events/ESubtreeMap.h"
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
#include "events/EFragment.h"
+#include "events/ESession.h"
+#include "events/ESessions.h"
+
#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
#include "events/EOpen.h"
switch (type) {
case EVENT_STRING: le = new EString; break;
- case EVENT_SESSION: le = new ESession; break;
case EVENT_SUBTREEMAP: le = new ESubtreeMap; break;
case EVENT_EXPORT: le = new EExport; break;
case EVENT_IMPORTSTART: le = new EImportStart; break;
case EVENT_IMPORTFINISH: le = new EImportFinish; break;
case EVENT_FRAGMENT: le = new EFragment; break;
+ case EVENT_SESSION: le = new ESession; break;
+ case EVENT_SESSIONS: le = new ESessions; break;
+
case EVENT_UPDATE: le = new EUpdate; break;
case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
case EVENT_OPEN: le = new EOpen; break;
#define EVENT_STRING 1
-#define EVENT_SESSION 7
#define EVENT_SUBTREEMAP 2
-#define EVENT_EXPORT 30
-#define EVENT_IMPORTSTART 31
-#define EVENT_IMPORTFINISH 32
-#define EVENT_FRAGMENT 33
+#define EVENT_EXPORT 3
+#define EVENT_IMPORTSTART 4
+#define EVENT_IMPORTFINISH 5
+#define EVENT_FRAGMENT 6
-#define EVENT_UPDATE 3
-#define EVENT_SLAVEUPDATE 4
-#define EVENT_OPEN 5
+#define EVENT_SESSION 10
+#define EVENT_SESSIONS 11
-#define EVENT_PURGEFINISH 22
+#define EVENT_UPDATE 20
+#define EVENT_SLAVEUPDATE 21
+#define EVENT_OPEN 22
+
+#define EVENT_PURGEFINISH 30
#define EVENT_ANCHOR 40
#define EVENT_ANCHORCLIENT 41
// send REAP
// FIXME client session weirdness.
- MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_REAP,
+ MClientFileCaps *reap = new MClientFileCaps(MClientFileCaps::OP_IMPORT,
in->inode,
in->client_caps[client].get_last_seq(),
in->client_caps[client].pending(),
dn->dir->unlink_inode(dn);
assert(straydn);
straydn->dir->link_primary_inode(straydn, in);
+
+ // send caps to auth (if we're not already)
+ if (in->is_any_caps() &&
+ !in->state_test(CInode::STATE_EXPORTINGCAPS))
+ migrator->export_caps(in, in->authority().first);
+
+ lru.lru_bottouch(straydn); // move stray to end of lru
+
} else {
assert(dn->is_remote());
dn->dir->unlink_inode(dn);
void submit_entry( LogEvent *e, Context *c = 0 );
void wait_for_sync( Context *c );
void flush();
+ bool is_flushed() {
+ return unflushed == 0;
+ }
private:
class C_MaybeExpiredSegment : public Context {
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
+#include "events/ESessions.h"
#include "msg/Messenger.h"
#include "messages/MExportDirNotifyAck.h"
#include "messages/MExportDirFinish.h"
+#include "messages/MExportCaps.h"
+#include "messages/MExportCapsAck.h"
+
#include "config.h"
#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds || l <= g_conf.debug_mds_migrator) *_dout << dbeginl << g_clock.now() << " mds" << mds->get_nodeid() << ".migrator "
handle_export_notify((MExportDirNotify*)m);
break;
+ // caps
+ case MSG_MDS_EXPORTCAPS:
+ handle_export_caps((MExportCaps*)m);
+ break;
+ case MSG_MDS_EXPORTCAPSACK:
+ handle_export_caps_ack((MExportCapsAck*)m);
+ break;
+
default:
assert(0);
}
}
+class C_M_ExportGo : public Context {
+ Migrator *migrator;
+ CDir *dir;
+public:
+ C_M_ExportGo(Migrator *m, CDir *d) : migrator(m), dir(d) {}
+ void finish(int r) {
+ migrator->export_go_synced(dir);
+ }
+};
+
void Migrator::export_go(CDir *dir)
-{
+{
assert(export_peer.count(dir));
int dest = export_peer[dir];
dout(7) << "export_go " << *dir << " to " << dest << dendl;
+ // first sync log to flush out e.g. any cap imports
+ mds->mdlog->wait_for_sync(new C_M_ExportGo(this, dir));
+}
+
+void Migrator::export_go_synced(CDir *dir)
+{
+ assert(export_peer.count(dir));
+ int dest = export_peer[dir];
+ dout(7) << "export_go_synced " << *dir << " to " << dest << dendl;
+
cache->show_subtrees();
export_warning_ack_waiting.erase(dir);
::_encode_simple(in->inode.ino, enc_state);
in->encode_export(enc_state);
+ // caps
+ encode_export_inode_caps(in, enc_state, exported_client_map);
+}
+
+void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl,
+ map<int,entity_inst_t>& exported_client_map)
+{
+ // encode caps
+ map<int,Capability::Export> cap_map;
+ in->export_client_caps(cap_map);
+ ::_encode_simple(cap_map, bl);
+
+ in->state_set(CInode::STATE_EXPORTINGCAPS);
+
// make note of clients named by exported capabilities
for (map<int, Capability>::iterator it = in->client_caps.begin();
it != in->client_caps.end();
void Migrator::finish_export_inode_caps(CInode *in)
{
- // tell (all) clients about migrating caps.. mark STALE
+ in->state_clear(CInode::STATE_EXPORTINGCAPS);
+
+ // tell (all) clients about migrating caps..
for (map<int, Capability>::iterator it = in->client_caps.begin();
it != in->client_caps.end();
it++) {
dout(7) << "finish_export_inode telling client" << it->first
- << " stale caps on " << *in << dendl;
- MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_STALE,
+ << " exported caps on " << *in << dendl;
+ MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_EXPORT,
in->inode,
it->second.get_last_seq(),
it->second.pending(),
in->export_client_caps(cap_map);
finish_export_inode_caps(in);
}
-
+
// log our failure
mds->mdlog->submit_entry(new EImportFinish(dir, false)); // log failure
-
+
// bystanders?
if (import_bystanders[dir].empty()) {
dout(7) << "no bystanders, finishing reverse now" << dendl;
}
-void Migrator::finish_import_caps(CInode *in, int from, map<int,Capability::Export> &cap_map)
-{
- set<int> new_caps;
- in->import_caps(cap_map, new_caps);
-
- for (set<int>::iterator it = new_caps.begin();
- it != new_caps.end();
- it++) {
- dout(0) << "merged caps for client" << *it << " on " << *in << dendl;
- MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_REAP,
- in->inode,
- in->client_caps[*it].get_last_seq(),
- in->client_caps[*it].pending(),
- in->client_caps[*it].wanted());
- caps->set_mds(from); // reap from whom?
- mds->send_message_client(caps, *it);
- }
-}
+
void Migrator::import_logged_start(CDir *dir, int from,
- map<int,entity_inst_t> &imported_client_map)
+ map<int,entity_inst_t>& imported_client_map)
{
dout(7) << "import_logged " << *dir << dendl;
for (map<CInode*, map<int,Capability::Export> >::iterator p = import_caps[dir].begin();
p != import_caps[dir].end();
++p) {
- finish_import_caps(p->first, from, p->second);
+ finish_import_inode_caps(p->first, from, p->second);
}
// send notify's etc.
}
// state after link -- or not! -sage
- in->decode_import(blp, cap_imports, ls); // cap imports are noted for later action
-
+ in->decode_import(blp, ls); // cap imports are noted for later action
+
+ // caps
+ decode_import_inode_caps(in, blp, cap_imports);
+
// link before state -- or not! -sage
if (dn->inode != in) {
assert(!dn->inode);
}
+void Migrator::decode_import_inode_caps(CInode *in,
+ bufferlist::iterator &blp,
+ map<CInode*, map<int,Capability::Export> >& cap_imports)
+{
+ map<int,Capability::Export> cap_map;
+ ::_decode_simple(cap_map, blp);
+ if (!cap_map.empty()) {
+ cap_imports[in].swap(cap_map);
+ in->get(CInode::PIN_IMPORTINGCAPS);
+ }
+}
+
+void Migrator::finish_import_inode_caps(CInode *in, int from,
+ map<int,Capability::Export> &cap_map)
+{
+ assert(!cap_map.empty());
+
+ set<int> new_caps;
+ in->merge_client_caps(cap_map, new_caps);
+ in->put(CInode::PIN_IMPORTINGCAPS);
+
+ for (set<int>::iterator it = new_caps.begin();
+ it != new_caps.end();
+ it++) {
+ dout(0) << "finish_import_inode_caps for client" << *it << " on " << *in << dendl;
+ MClientFileCaps *caps = new MClientFileCaps(MClientFileCaps::OP_IMPORT,
+ in->inode,
+ in->client_caps[*it].get_last_seq(),
+ in->client_caps[*it].pending(),
+ in->client_caps[*it].wanted());
+ caps->set_mds(from); // from whom?
+ mds->send_message_client(caps, *it);
+ }
+}
int Migrator::decode_import_dir(bufferlist::iterator& blp,
int oldauth,
+/** cap exports **/
+
+
+
+void Migrator::export_caps(CInode *in, int dest)
+{
+ dout(7) << "export_caps to mds" << dest << " " << *in << dendl;
+
+ assert(in->is_any_caps());
+ assert(!in->is_auth());
+ assert(!in->is_ambiguous_auth());
+ assert(!in->state_test(CInode::STATE_EXPORTINGCAPS));
+
+ MExportCaps *ex = new MExportCaps;
+ ex->ino = in->ino();
+
+ encode_export_inode_caps(in, ex->cap_bl, ex->client_map);
+
+ mds->send_message_mds(ex, dest, MDS_PORT_MIGRATOR);
+}
+
+void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
+{
+ CInode *in = cache->get_inode(ack->ino);
+ assert(in);
+ dout(10) << "handle_export_caps_ack " << *ack << " from " << ack->get_source()
+ << " on " << *in
+ << dendl;
+
+ finish_export_inode_caps(in);
+ delete ack;
+}
+
+
+class C_M_LoggedImportCaps : public Context {
+ Migrator *migrator;
+ CInode *in;
+ int from;
+public:
+ map<CInode*, map<int,Capability::Export> > cap_imports;
+
+ C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {}
+ void finish(int r) {
+ migrator->logged_import_caps(in, from, cap_imports);
+ }
+};
+
+void Migrator::handle_export_caps(MExportCaps *ex)
+{
+ dout(10) << "handle_export_caps " << *ex << " from " << ex->get_source() << dendl;
+ CInode *in = cache->get_inode(ex->ino);
+
+ assert(in->is_auth());
+ /*
+ * note: i may be frozen, but i won't have been encoded for export (yet)!
+ * see export_go() vs export_go_synced().
+ */
+
+ C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num());
+ ESessions *le = new ESessions(mds->clientmap.inc_projected());
+
+ // decode new caps
+ bufferlist::iterator blp = ex->cap_bl.begin();
+ decode_import_inode_caps(in, blp, finish->cap_imports);
+ assert(!finish->cap_imports.empty()); // thus, inode is pinned.
+
+ // journal open client sessions
+ mds->server->prepare_force_open_sessions(ex->client_map);
+ le->client_map.swap(ex->client_map);
+
+ mds->mdlog->submit_entry(le, finish);
+ delete ex;
+}
+void Migrator::logged_import_caps(CInode *in,
+ int from,
+ map<CInode*, map<int,Capability::Export> >& cap_imports)
+{
+ dout(10) << "logged_import_caps on " << *in << dendl;
+ assert(cap_imports.count(in));
+ finish_import_inode_caps(in, from, cap_imports[in]);
+ mds->send_message_mds(new MExportCapsAck(in->ino()), from, MDS_PORT_MIGRATOR);
+}
class MExportDirNotifyAck;
class MExportDirFinish;
+class MExportCaps;
+class MExportCapsAck;
+
class EImportStart;
void clear_export_queue() {
export_queue.clear();
}
-
- void encode_export_inode(CInode *in, bufferlist& enc_state,
+
+ void encode_export_inode(CInode *in, bufferlist& bl,
map<int,entity_inst_t>& exported_client_map);
+ void encode_export_inode_caps(CInode *in, bufferlist& bl,
+ map<int,entity_inst_t>& exported_client_map);
void finish_export_inode(CInode *in, utime_t now, list<Context*>& finished);
void finish_export_inode_caps(CInode *in);
}
void clear_export_proxy_pins(CDir *dir);
+ void export_caps(CInode *in, int dest);
+
protected:
void handle_export_discover_ack(MExportDirDiscoverAck *m);
void export_frozen(CDir *dir);
void handle_export_prep_ack(MExportDirPrepAck *m);
void export_go(CDir *dir);
+ void export_go_synced(CDir *dir);
void export_reverse(CDir *dir);
void handle_export_ack(MExportDirAck *m);
void export_logged_finish(CDir *dir);
void handle_export_notify_ack(MExportDirNotifyAck *m);
void export_finish(CDir *dir);
+ void handle_export_caps_ack(MExportCapsAck *m);
+
friend class C_MDC_ExportFreeze;
friend class C_MDS_ExportFinishLogged;
-
+ friend class C_M_ExportGo;
// importer
void handle_export_discover(MExportDirDiscover *m);
LogSegment *ls,
map<CInode*, map<int,Capability::Export> >& cap_imports,
list<ScatterLock*>& updated_scatterlocks);
+ void decode_import_inode_caps(CInode *in,
+ bufferlist::iterator &blp,
+ map<CInode*, map<int,Capability::Export> >& cap_imports);
+ void finish_import_inode_caps(CInode *in, int from, map<int,Capability::Export> &cap_map);
int decode_import_dir(bufferlist::iterator& blp,
int oldauth,
CDir *import_root,
LogSegment *ls,
map<CInode*, map<int,Capability::Export> >& cap_imports,
list<ScatterLock*>& updated_scatterlocks);
- void finish_import_caps(CInode *in, int from, map<int,Capability::Export> &cap_map);
public:
void import_reverse(CDir *dir);
void import_finish(CDir *dir);
protected:
+ void handle_export_caps(MExportCaps *m);
+ void logged_import_caps(CInode *in,
+ int from,
+ map<CInode*, map<int,Capability::Export> >& cap_imports);
+
+
friend class C_MDS_ImportDirLoggedStart;
friend class C_MDS_ImportDirLoggedFinish;
+ friend class C_M_LoggedImportCaps;
// bystander
void handle_export_notify(MExportDirNotify *m);
// mark client caps stale.
inode_t fake_inode;
fake_inode.ino = p->first;
- MClientFileCaps *stale = new MClientFileCaps(MClientFileCaps::OP_STALE,
+ MClientFileCaps *stale = new MClientFileCaps(MClientFileCaps::OP_EXPORT,
fake_inode,
0,
0, // doesn't matter.
// finish cap imports
finish_force_open_sessions(mdr->more()->imported_client_map);
if (mdr->more()->cap_imports.count(destdn->inode))
- mds->mdcache->migrator->finish_import_caps(destdn->inode, srcdn->authority().first,
- mdr->more()->cap_imports[destdn->inode]);
+ mds->mdcache->migrator->finish_import_inode_caps(destdn->inode, srcdn->authority().first,
+ mdr->more()->cap_imports[destdn->inode]);
// hack: fix auth bit
destdn->inode->state_set(CInode::STATE_AUTH);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_ESESSIONS_H
+#define __MDS_ESESSIONS_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+
+class ESessions : public LogEvent {
+protected:
+ version_t cmapv; // client map version
+
+public:
+ map<int,entity_inst_t> client_map;
+
+ ESessions() : LogEvent(EVENT_SESSION) { }
+ ESessions(version_t v) :
+ LogEvent(EVENT_SESSION),
+ cmapv(v) {
+ }
+
+ void encode_payload(bufferlist& bl) {
+ ::_encode(client_map, bl);
+ ::_encode(cmapv, bl);
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ ::_decode(client_map, bl, off);
+ ::_decode(cmapv, bl, off);
+ }
+
+
+ void print(ostream& out) {
+ out << "ESessions " << client_map.size() << " opens cmapv " << cmapv;
+ }
+
+ void update_segment();
+ void replay(MDS *mds);
+};
+
+#endif
#include "events/EString.h"
#include "events/ESubtreeMap.h"
#include "events/ESession.h"
+#include "events/ESessions.h"
#include "events/EMetaBlob.h"
}
}
+void ESessions::update_segment()
+{
+ _segment->clientmapv = cmapv;
+}
+
+void ESessions::replay(MDS *mds)
+{
+ if (mds->clientmap.get_version() >= cmapv) {
+ dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version()
+ << " >= " << cmapv << ", noop" << dendl;
+ } else {
+ dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version()
+ << " < " << cmapv << dendl;
+ mds->clientmap.open_sessions(client_map);
+ assert(mds->clientmap.get_version() == cmapv);
+ mds->clientmap.reset_projected(); // make it follow version.
+ }
+}
+
// -----------------------
::_decode_simple(cm, blp);
mds->clientmap.open_sessions(cm);
assert(mds->clientmap.get_version() == cmapv);
+ mds->clientmap.reset_projected(); // make it follow version.
}
}
static const int OP_GRANT = 0; // mds->client grant.
static const int OP_ACK = 1; // client->mds ack (if prior grant was a recall)
static const int OP_RELEASE = 2; // mds closed the cap
- static const int OP_STALE = 3; // mds has exported the cap
- static const int OP_REAP = 4; // mds has imported the cap from get_mds()
+ static const int OP_EXPORT = 3; // mds has exported the cap
+ static const int OP_IMPORT = 4; // mds has imported the cap from get_mds()
static const char* get_opname(int op) {
switch (op) {
case OP_GRANT: return "grant";
case OP_ACK: return "ack";
case OP_RELEASE: return "release";
- case OP_STALE: return "stale";
- case OP_REAP: return "reap";
+ case OP_EXPORT: return "export";
+ case OP_IMPORT: return "import";
default: assert(0); return 0;
}
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MEXPORTCAPS_H
+#define __MEXPORTCAPS_H
+
+#include "msg/Message.h"
+
+
+class MExportCaps : public Message {
+ public:
+ inodeno_t ino;
+ bufferlist cap_bl;
+ map<int,entity_inst_t> client_map;
+
+ MExportCaps() :
+ Message(MSG_MDS_EXPORTCAPS) {}
+
+ virtual char *get_type_name() { return "export_caps"; }
+ void print(ostream& o) {
+ o << "export_caps(" << ino << ")";
+ }
+
+ virtual void decode_payload() {
+ int off = 0;
+ ::_decode(ino, payload, off);
+ ::_decode(cap_bl, payload, off);
+ ::_decode(client_map, payload, off);
+ }
+ virtual void encode_payload() {
+ ::_encode(ino, payload);
+ ::_encode(cap_bl, payload);
+ ::_encode(client_map, payload);
+ }
+
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MEXPORTCAPSACK_H
+#define __MEXPORTCAPSACK_H
+
+#include "msg/Message.h"
+
+
+class MExportCapsAck : public Message {
+ public:
+ inodeno_t ino;
+
+ MExportCapsAck() :
+ Message(MSG_MDS_EXPORTCAPSACK) {}
+ MExportCapsAck(inodeno_t i) :
+ Message(MSG_MDS_EXPORTCAPSACK), ino(i) {}
+
+ virtual char *get_type_name() { return "export_caps_ack"; }
+ void print(ostream& o) {
+ o << "export_caps_ack(" << ino << ")";
+ }
+
+ virtual void decode_payload() {
+ int off = 0;
+ ::_decode(ino, payload, off);
+ }
+ virtual void encode_payload() {
+ ::_encode(ino, payload);
+ }
+
+};
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MEXPORTSTRAYS_H
+#define __MEXPORTSTRAYS_H
+
+#include "msg/Message.h"
+
+
+class MExportStrays : public Message {
+ public:
+ bufferlist state;
+
+ MExportStrays() :
+ Message(MSG_MDS_EXPORTSTRAYS) {}
+
+ virtual char *get_type_name() { return "SEx"; }
+ void print(ostream& o) {
+ o << "export_strays";
+ }
+
+ virtual void decode_payload() {
+ state = payload;
+ }
+ virtual void encode_payload() {
+ payload = state;
+ }
+
+};
+
+#endif
#include "msg/Message.h"
#include "include/types.h"
+#include "include/encodable.h"
class MMDSGetMap : public Message {
public:
- MMDSGetMap() : Message(MSG_MDS_GETMAP) {
- }
+ epoch_t have;
+
+ MMDSGetMap(epoch_t h=0) : Message(MSG_MDS_GETMAP), have (h) { }
char *get_type_name() { return "mdsgetmap"; }
void encode_payload() {
- //payload.append((char*)&sb, sizeof(sb));
+ ::_encode_simple(have, payload);
}
void decode_payload() {
- //int off = 0;
- //payload.copy(off, sizeof(sb), (char*)&sb);
- //off += sizeof(sb);
+ bufferlist::iterator p = payload.begin();
+ ::_decode_simple(have, p);
}
};
// my methods
-void MDSMonitor::print_map(MDSMap &m)
+void MDSMonitor::print_map(MDSMap &m, int dbl)
{
dout(7) << "print_map epoch " << m.get_epoch() << " target_num " << m.target_num << dendl;
entity_inst_t blank;
for (set<int>::iterator p = all.begin();
p != all.end();
++p) {
- dout(7) << " mds" << *p << "." << m.mds_inc[*p]
+ dout(dbl) << " mds" << *p << "." << m.mds_inc[*p]
<< " : " << MDSMap::get_state_name(m.get_state(*p))
<< " : " << (m.have_inst(*p) ? m.get_inst(*p) : blank)
<< dendl;
mdsmap.decode(mdsmap_bl);
// new map
- dout(7) << "new map:" << dendl;
- print_map(mdsmap);
+ dout(0) << "new map" << dendl;
+ print_map(mdsmap, 0);
// bcast map to mds, waiters
if (mon->is_leader())
return preprocess_beacon((MMDSBeacon*)m);
case MSG_MDS_GETMAP:
- send_full(m->get_source_inst());
+ handle_mds_getmap((MMDSGetMap*)m);
return true;
case MSG_MON_COMMAND:
}
}
+void MDSMonitor::handle_mds_getmap(MMDSGetMap *m)
+{
+ if (m->have < mdsmap.get_epoch())
+ send_full(m->get_source_inst());
+ else
+ waiting_for_map.push_back(m->get_source_inst());
+}
+
bool MDSMonitor::preprocess_beacon(MMDSBeacon *m)
{
#include "PaxosService.h"
class MMDSBeacon;
+class MMDSGetMap;
class MDSMonitor : public PaxosService {
public:
MDSMap pending_mdsmap; // current + pending updates
// my helpers
- void print_map(MDSMap &m);
+ void print_map(MDSMap &m, int dbl=7);
class C_Updated : public Context {
MDSMonitor *mm;
bool preprocess_beacon(class MMDSBeacon *m);
bool handle_beacon(class MMDSBeacon *m);
bool handle_command(class MMonCommand *m);
+ void handle_mds_getmap(MMDSGetMap *m);
// beacons
map<int, utime_t> last_beacon;
#include "messages/MExportDirNotifyAck.h"
#include "messages/MExportDirFinish.h"
+#include "messages/MExportCaps.h"
+#include "messages/MExportCapsAck.h"
+
#include "messages/MDentryUnlink.h"
#include "messages/MHeartbeat.h"
m = new MExportDirWarningAck;
break;
+
+ case MSG_MDS_EXPORTCAPS:
+ m = new MExportCaps;
+ break;
+ case MSG_MDS_EXPORTCAPSACK:
+ m = new MExportCapsAck;
+ break;
case MSG_MDS_DENTRYUNLINK:
#define MSG_MDS_EXPORTDIRNOTIFYACK 159
#define MSG_MDS_EXPORTDIRFINISH 160
+#define MSG_MDS_EXPORTSTRAY 161
+#define MSG_MDS_EXPORTSTRAYNOTIFY 162
+#define MSG_MDS_EXPORTSTRAYNOTIFYACK 163
+#define MSG_MDS_EXPORTSTRAYACK 164
+#define MSG_MDS_EXPORTSTRAYFINISH 165
+
+#define MSG_MDS_EXPORTCAPS 166
+#define MSG_MDS_EXPORTCAPSACK 167
+
#define MSG_MDS_SLAVE_REQUEST 170
#define MSG_MDS_DENTRYUNLINK 200