EXTRA_CFLAGS = #-I${HOME}/include -L${HOME}/lib
EXTRA_CFLAGS += -g
EXTRA_CFLAGS += -pg
-EXTRA_CFLAGS += -O3
+#EXTRA_CFLAGS += -O3
# base
CFLAGS = -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE ${EXTRA_CFLAGS}
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
syn_iargs.push_back( atoi(args[++i]) );
+ } else if (strcmp(args[i],"makefiles2") == 0) {
+ syn_modes.push_back( SYNCLIENT_MODE_MAKEFILES2 );
+ syn_iargs.push_back( atoi(args[++i]) );
+ syn_iargs.push_back( atoi(args[++i]) );
+ syn_iargs.push_back( atoi(args[++i]) );
} else if (strcmp(args[i],"linktest") == 0) {
syn_modes.push_back( SYNCLIENT_MODE_LINKTEST );
} else if (strcmp(args[i],"createshared") == 0) {
if (more) {
client->lstat(d, &st);
int fd = client->open(d, O_RDONLY);
- client->unlink(d);
- client->close(fd);
+ //client->unlink(d);
+ //client->close(fd);
}
if (time_to_stop()) return 0;
version_t get_committing() { return committing; }
version_t get_committed() { return committed; }
+ void set_version(version_t v) { version = v; }
version_t inc_projected() { return ++projected; }
void reset_projected() { projected = version; }
void set_committing(version_t v) { committing = v; }
void add_opening(int c) { opening.insert(c); }
bool is_closing(int c) { return closing.count(c); }
void add_closing(int c) { closing.insert(c); }
+ void remove_closing(int c) { closing.erase(c); }
bool have_session(int client) {
return client_inst.count(client);
}
client_inst.erase(client);
version++;
}
+ void noop() {
+ version++;
+ }
+ void open_sessions(map<int,entity_inst_t>& cm) {
+ for (map<int,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+ client_inst[p->first] = p->second;
+ sessions.insert(p->first);
+ }
+ version++;
+ }
private:
// -- push sequence --
if (seq > 0 &&
!it->second.is_suppress()) {
dout(7) << " sending MClientFileCaps to client" << it->first << " seq " << it->second.get_last_seq() << " new pending " << cap_string(it->second.pending()) << " was " << cap_string(before) << dendl;
- mds->send_message_client_maybe_opening(new MClientFileCaps(MClientFileCaps::OP_GRANT,
+ mds->send_message_client(new MClientFileCaps(MClientFileCaps::OP_GRANT,
in->inode,
it->second.get_last_seq(),
it->second.pending(),
MClientFileCaps *r = new MClientFileCaps(MClientFileCaps::OP_RELEASE,
in->inode,
0, 0, 0);
- mds->send_message_client_maybe_open(r, m->get_source_inst());
+ mds->send_message_client(r, m->get_source_inst());
}
// merge in atime?
}
-class C_MDS_SendMessageClientSession : public Context {
- MDS *mds;
- Message *msg;
- entity_inst_t clientinst;
-public:
- C_MDS_SendMessageClientSession(MDS *md, Message *ms, entity_inst_t& ci) :
- mds(md), msg(ms), clientinst(ci) {}
- void finish(int r) {
- mds->clientmap.open_session(clientinst);
- mds->send_message_client(msg, clientinst.name.num());
- }
-};
-
-void MDS::send_message_client_maybe_opening(Message *m, int c)
-{
- send_message_client_maybe_open(m, clientmap.get_inst(c));
-}
-
-void MDS::send_message_client_maybe_open(Message *m, entity_inst_t clientinst)
-{
- // FIXME
- // _most_ ppl shoudl check for a client session, since migration may call this,
- // start opening, and then e.g. locker sends something else (through non-maybe_open
- // version)
- int client = clientinst.name.num();
- if (!clientmap.have_session(client)) {
- // no session!
- dout(10) << "send_message_client opening session with " << clientinst << dendl;
- clientmap.add_opening(client);
- mdlog->submit_entry(new ESession(clientinst, true, clientmap.inc_projected()),
- new C_MDS_SendMessageClientSession(this, m, clientinst));
- } else {
- // we have a session.
- send_message_client(m, clientinst);
- }
-}
-
-
int MDS::init(bool standby)
{
void send_message_client(Message *m, int client);
void send_message_client(Message *m, entity_inst_t clientinst);
- void send_message_client_maybe_opening(Message *m, int);
- void send_message_client_maybe_open(Message *m, entity_inst_t clientinst);
// start up, shutdown
#include "Migrator.h"
#include "Locker.h"
#include "Migrator.h"
+#include "Server.h"
#include "MDBalancer.h"
#include "MDLog.h"
it->second.get_last_seq(),
it->second.pending(),
it->second.wanted());
- entity_inst_t inst = mds->clientmap.get_inst(it->first);
- mds->send_message_client_maybe_open(m, inst);
+ mds->send_message_client(m, it->first);
}
in->clear_client_caps();
bufferlist::iterator blp = m->get_dirstate().begin();
::_decode_simple(imported_client_map, blp);
+ mds->server->force_open_sessions(imported_client_map);
+
int num_imported_inodes = 0;
while (!blp.end()) {
num_imported_inodes +=
oldauth,
dir, // import root
le,
- imported_client_map,
mds->mdlog->get_current_segment(),
import_updated_scatterlocks[dir]);
}
dout(10) << " " << m->get_bounds().size() << " imported bounds" << dendl;
+ // include imported sessions in EImportStart
+ le->client_map.claim(m->get_dirstate());
+
// include bounds in EImportStart
set<CDir*> import_bounds;
cache->get_subtree_bounds(dir, import_bounds);
void Migrator::decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth,
- map<int,entity_inst_t>& imported_client_map,
LogSegment *ls,
list<ScatterLock*>& updated_scatterlocks)
{
// adjust replica list
//assert(!in->is_replica(oldauth)); // not true on failed export
- in->add_replica( oldauth, CInode::EXPORT_NONCE );
+ in->add_replica(oldauth, CInode::EXPORT_NONCE);
if (in->is_replica(mds->get_nodeid()))
in->remove_replica(mds->get_nodeid());
in->client_caps[*it].pending(),
in->client_caps[*it].wanted());
caps->set_mds( oldauth ); // reap from whom?
- mds->send_message_client_maybe_open(caps, imported_client_map[*it]);
+ mds->send_message_client(caps, *it);
}
}
int oldauth,
CDir *import_root,
EImportStart *le,
- map<int,entity_inst_t>& imported_client_map,
LogSegment *ls,
list<ScatterLock*>& updated_scatterlocks)
{
}
else if (icode == 'I') {
// inode
- decode_import_inode(dn, blp, oldauth, imported_client_map, ls, updated_scatterlocks);
+ decode_import_inode(dn, blp, oldauth, ls, updated_scatterlocks);
}
// add dentry to journal entry
public:
void decode_import_inode(CDentry *dn, bufferlist::iterator& blp, int oldauth,
- map<int,entity_inst_t>& imported_client_map,
LogSegment *ls,
list<ScatterLock*>& updated_scatterlocks);
int decode_import_dir(bufferlist::iterator& blp,
int oldauth,
CDir *import_root,
EImportStart *le,
- map<int,entity_inst_t>& imported_client_map,
LogSegment *ls,
list<ScatterLock*>& updated_scatterlocks);
if (open) {
assert(mds->clientmap.is_opening(from));
mds->clientmap.open_session(client_inst);
- } else {
- assert(mds->clientmap.is_closing(from));
+ } else if (mds->clientmap.is_closing(from)) {
mds->clientmap.close_session(from);
// purge completed requests from clientmap
mds->clientmap.trim_completed_requests(from, 0);
+ } else {
+ // close must have been canceled (by an import?) ...
+ assert(!open);
+ mds->clientmap.noop();
}
assert(cmapv == mds->clientmap.get_version());
mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst);
}
+void Server::force_open_sessions(map<int,entity_inst_t>& cm)
+{
+ dout(10) << "force_open_sessions on " << cm.size() << " clients" << dendl;
+ version_t v = mds->clientmap.get_version();
+ for (map<int,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+ if (mds->clientmap.is_closing(p->first)) {
+ dout(15) << "force_open_sessions canceling close on " << p->second << dendl;
+ mds->clientmap.remove_closing(p->first);
+ continue;
+ }
+ if (mds->clientmap.have_session(p->first)) {
+ dout(15) << "force_open_sessions have session " << p->second << dendl;
+ continue;
+ }
+
+ dout(10) << "force_open_sessions opening " << p->second << dendl;
+ mds->clientmap.open_session(p->second);
+ mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), p->second);
+ }
+ mds->clientmap.set_version(v+1);
+}
+
void Server::terminate_sessions()
{
_rename_prepare(mdr, &le->metablob, srcdn, destdn, straydn);
+ if (!srcdn->is_auth() && srcdn->is_primary()) {
+ // importing inode; also journal imported client map
+
+ // ** DER FIXME **
+ }
+
// -- commit locally --
C_MDS_rename_finish *fin = new C_MDS_rename_finish(mds, mdr, srcdn, destdn, straydn);
version_t siv;
if (srcdn->is_auth())
siv = srcdn->inode->get_projected_version();
- else
+ else
siv = mdr->more()->inode_import_v;
mdr->more()->pvmap[destdn] = destdn->pre_dirty(siv+1);
}
map<int,entity_inst_t> imported_client_map;
list<ScatterLock*> updated_scatterlocks; // we clear_updated explicitly below
::_decode_simple(imported_client_map, blp);
+ force_open_sessions(imported_client_map);
mdcache->migrator->decode_import_inode(destdn, blp,
srcdn->authority().first,
- imported_client_map,
mdr->ls,
updated_scatterlocks);
destdn->inode->dirlock.clear_updated();
void handle_client_session(class MClientSession *m);
void _session_logged(entity_inst_t ci, bool open, version_t cmapv);
+ void force_open_sessions(map<int,entity_inst_t> &cm);
void terminate_sessions();
void reconnect_clients();
void handle_client_reconnect(class MClientReconnect *m);
public:
EMetaBlob metablob;
+ bufferlist client_map; // encoded map<int,entity_inst_t>
+ version_t cmapv;
EImportStart(dirfrag_t di,
list<dirfrag_t>& b) : LogEvent(EVENT_IMPORTSTART),
bl.append((char*)&base, sizeof(base));
metablob._encode(bl);
::_encode(bounds, bl);
+ ::_encode(cmapv, bl);
+ ::_encode(client_map, bl);
}
void decode_payload(bufferlist& bl, int& off) {
bl.copy(off, sizeof(base), (char*)&base);
off += sizeof(base);
metablob._decode(bl, off);
::_decode(bounds, bl, off);
+ ::_decode(cmapv, bl, off);
+ ::_decode(client_map, bl, off);
}
bool has_expired(MDS *mds);
}
void encode_payload(bufferlist& bl) {
- ::_encode(client_inst, bl);
- ::_encode(open, bl);
- ::_encode(cmapv, bl);
+ ::_encode(client_inst, bl);
+ ::_encode(open, bl);
+ ::_encode(cmapv, bl);
}
void decode_payload(bufferlist& bl, int& off) {
- ::_decode(client_inst, bl, off);
- ::_decode(open, bl, off);
- ::_decode(cmapv, bl, off);
+ ::_decode(client_inst, bl, off);
+ ::_decode(open, bl, off);
+ ::_decode(cmapv, bl, off);
}
public:
EMetaBlob metablob;
string type;
+ bufferlist client_map;
EUpdate() : LogEvent(EVENT_UPDATE) { }
EUpdate(MDLog *mdlog, const char *s) :
void encode_payload(bufferlist& bl) {
::_encode(type, bl);
metablob._encode(bl);
+ ::_encode(client_map, bl);
}
void decode_payload(bufferlist& bl, int& off) {
::_decode(type, bl, off);
metablob._decode(bl, off);
+ ::_decode(client_map, bl, off);
}
void update_segment();
// put in ambiguous import list
mds->mdcache->add_ambiguous_import(base, bounds);
+
+ // open client sessions?
+ if (mds->clientmap.get_version() >= cmapv) {
+ dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version()
+ << " >= " << cmapv << ", noop" << dendl;
+ } else {
+ dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version()
+ << " < " << cmapv << dendl;
+ map<int,entity_inst_t> cm;
+ bufferlist::iterator blp = client_map.begin();
+ ::_decode_simple(cm, blp);
+ mds->clientmap.open_sessions(cm);
+ assert(mds->clientmap.get_version() == cmapv);
+ }
}
// -----------------------