int mon = monmap->pick_mon();
dout(2) << "sending client_mount to mon" << mon << " as instance " << my_instance << dendl;
messenger->set_dispatcher(this);
- messenger->send_message(new MClientMount(messenger->get_myaddr(), my_instance),
- monmap->get_inst(mon));
+ messenger->send_message(new MClientMount, monmap->get_inst(mon));
// schedule timeout?
assert(mount_timeout_event == 0);
// send unmount!
int mon = monmap->pick_mon();
dout(2) << "sending client_unmount to mon" << mon << dendl;
- messenger->send_message(new MClientUnmount(messenger->get_myinst()),
- monmap->get_inst(mon));
+ messenger->send_message(new MClientUnmount, monmap->get_inst(mon));
while (mounted)
mount_cond.Wait(client_lock);
#define MAX_DENTRY_LEN 255
+// --
+
+inline ostream& operator<<(ostream& out, ceph_fsid_t& f) {
+ return out << hex << f.major << '.' << f.minor << dec;
+}
+
// -- io helpers --
class MClientMount : public Message {
public:
- entity_addr_t addr;
- int32_t instance; // on this node
-
MClientMount() : Message(CEPH_MSG_CLIENT_MOUNT) { }
- MClientMount(entity_addr_t a, int i = 0) :
+ /*MClientMount(entity_addr_t a, int i = 0) :
Message(CEPH_MSG_CLIENT_MOUNT),
addr(a), instance(i) { }
+ */
char *get_type_name() { return "client_mount"; }
void decode_payload() {
+ /*
int off = 0;
::_decode(addr, payload, off);
::_decode(instance, payload, off);
+ */
}
void encode_payload() {
+ /*
::_encode(addr, payload);
::_encode(instance, payload);
+ */
}
};
class MClientUnmount : public Message {
public:
- entity_inst_t inst;
-
MClientUnmount() : Message(CEPH_MSG_CLIENT_UNMOUNT) { }
+ /*
MClientUnmount(entity_inst_t i) :
Message(CEPH_MSG_CLIENT_UNMOUNT),
inst(i) { }
+ */
char *get_type_name() { return "client_unmount"; }
void decode_payload() {
- int off = 0;
- ::_decode(inst, payload, off);
+ //int off = 0;
+ //::_decode(inst, payload, off);
}
void encode_payload() {
- ::_encode(inst, payload);
+ //::_encode(inst, payload);
}
};
class MMDSGetMap : public Message {
public:
- MMDSGetMap() : Message(CEPH_MSG_MDS_GETMAP) {
- }
+ MMDSGetMap() : Message(CEPH_MSG_MDS_GETMAP) { }
char *get_type_name() { return "mdsgetmap"; }
case CEPH_MSG_CLIENT_MOUNT:
{
// already mounted?
- MClientMount *mount = (MClientMount*)m;
entity_addr_t addr = m->get_source_addr();
- pair<entity_addr_t,int> addrinst(addr, mount->instance);
- if (client_map.addr_client.count(addrinst)) {
- int client = client_map.addr_client[addrinst];
+ if (client_map.addr_client.count(addr)) {
+ int client = client_map.addr_client[addr];
dout(7) << " client" << client << " already mounted" << dendl;
_mounted(client, (MClientMount*)m);
return true;
switch (m->get_type()) {
case CEPH_MSG_CLIENT_MOUNT:
{
- MClientMount *mount = (MClientMount*)m;
- pair<entity_addr_t,int> addrinst(mount->addr, mount->instance);
+ entity_addr_t addr = m->get_source_addr();
int client = -1;
- if (mount->get_source().is_client())
- client = mount->get_source().num();
+ if (m->get_source().is_client())
+ client = m->get_source().num();
// choose a client id
if (client < 0) {
client = pending_inc.next_client;
- dout(10) << "mount: assigned client" << client << " to " << mount->addr << dendl;
+ dout(10) << "mount: assigned client" << client << " to " << addr << dendl;
} else {
- dout(10) << "mount: client" << client << " requested by "
- << mount->addr << "i" << mount->instance
- << dendl;
+ dout(10) << "mount: client" << client << " requested by " << addr << dendl;
if (client_map.client_addr.count(client)) {
- assert(client_map.client_addr[client] != addrinst);
- dout(0) << "mount: WARNING: client" << client << " requested by "
- << mount->addr << "." << mount->instance
- << ", which used to be "
- << client_map.client_addr[client].first << "i" << client_map.client_addr[client].second
- << dendl;
+ assert(client_map.client_addr[client] != addr);
+ dout(0) << "mount: WARNING: client" << client << " requested by " << addr
+ << ", which used to be " << client_map.client_addr[client] << dendl;
}
}
- pending_inc.add_mount(client, mount->addr, mount->instance);
- paxos->wait_for_commit(new C_Mounted(this, client, mount));
+ pending_inc.add_mount(client, addr);
+ paxos->wait_for_commit(new C_Mounted(this, client, (MClientMount*)m));
}
return true;
case CEPH_MSG_CLIENT_UNMOUNT:
{
- MClientUnmount *unmount = (MClientUnmount*)m;
- assert(unmount->inst.name.is_client());
- int client = unmount->inst.name.num();
+ assert(m->get_source().is_client());
+ int client = m->get_source().num();
assert(client_map.client_addr.count(client));
pending_inc.add_unmount(client);
- paxos->wait_for_commit(new C_Unmounted(this, unmount));
+ paxos->wait_for_commit(new C_Unmounted(this, (MClientUnmount*)m));
}
return true;
void ClientMonitor::_mounted(int client, MClientMount *m)
{
entity_inst_t to;
- to.addr = m->addr;
+ to.addr = m->get_source_addr();
to.name = entity_name_t::CLIENT(client);
dout(10) << "_mounted client" << client << " at " << to << dendl;
void ClientMonitor::_unmounted(MClientUnmount *m)
{
- dout(10) << "_unmounted " << m->inst << dendl;
+ dout(10) << "_unmounted " << m->get_source_inst() << dendl;
// reply with (same) unmount message
- mon->messenger->send_message(m, m->inst);
+ mon->messenger->send_message(m, m->get_source_inst());
// auto-shutdown?
// (hack for fakesyn/newsyn, mostly)
struct Incremental {
version_t version;
uint32_t next_client;
- map<int32_t, pair<entity_addr_t,int> > mount;
+ map<int32_t, entity_addr_t> mount;
set<int32_t> unmount;
Incremental() : version(0), next_client() {}
bool is_empty() { return mount.empty() && unmount.empty(); }
- void add_mount(uint32_t client, entity_addr_t addr, int instance) {
+ void add_mount(uint32_t client, entity_addr_t addr) {
next_client = MAX(next_client, client+1);
- mount[client] = pair<entity_addr_t,int>(addr, instance);
+ mount[client] = addr;
}
void add_unmount(uint32_t client) {
assert(client < next_client);
struct Map {
version_t version;
uint32_t next_client;
- map<uint32_t,pair<entity_addr_t,int> > client_addr;
- map<pair<entity_addr_t,int>,uint32_t> addr_client;
+ map<uint32_t,entity_addr_t> client_addr;
+ map<entity_addr_t,uint32_t> addr_client;
Map() : version(0), next_client(0) {}
void reverse() {
addr_client.clear();
- for (map<uint32_t,pair<entity_addr_t,int> >::iterator p = client_addr.begin();
+ for (map<uint32_t,entity_addr_t>::iterator p = client_addr.begin();
p != client_addr.end();
++p) {
addr_client[p->second] = p->first;
assert(inc.version == version+1);
version = inc.version;
next_client = inc.next_client;
- for (map<int32_t,pair<entity_addr_t,int> >::iterator p = inc.mount.begin();
+ for (map<int32_t,entity_addr_t>::iterator p = inc.mount.begin();
p != inc.mount.end();
++p) {
client_addr[p->first] = p->second;
class MonMap {
public:
epoch_t epoch; // what epoch/version of the monmap
+ ceph_fsid_t fsid;
int32_t num_mon;
vector<entity_inst_t> mon_inst;
int last_mon; // last mon i talked to
- MonMap(int s=0) : epoch(0), num_mon(s), mon_inst(s), last_mon(-1) {}
+ MonMap(int s=0) : epoch(0), num_mon(s), mon_inst(s), last_mon(-1) {
+ generate_fsid();
+ }
void add_mon(entity_inst_t inst) {
mon_inst.push_back(inst);
void encode(bufferlist& blist) {
::_encode(epoch, blist);
+ ::_encode(fsid, blist);
::_encode(num_mon, blist);
::_encode(mon_inst, blist);
}
void decode(bufferlist& blist) {
int off = 0;
::_decode(epoch, blist, off);
+ ::_decode(fsid, blist, off);
::_decode(num_mon, blist, off);
::_decode(mon_inst, blist, off);
}
+
+ void generate_fsid() {
+ fsid.major = ((uint64_t)rand() << 32) + rand();
+ fsid.minor = ((uint64_t)rand() << 32) + rand();
+ }
+
// read from/write to a file
int write(char *fn) {
// encode
assert(mon->is_leader());
assert(paxos->get_version() == 0);
- dout(1) << "create_initial -- creating initial osdmap from g_conf" << dendl;
+ dout(1) << "create_initial for " << mon->monmap->fsid << " from g_conf" << dendl;
// <HACK set up OSDMap from g_conf>
OSDMap newmap;
+ newmap.set_fsid(mon->monmap->fsid);
newmap.mon_epoch = mon->mon_epoch;
newmap.ctime = g_clock.now();
void OSDMonitor::create_pending()
{
pending_inc = OSDMap::Incremental(osdmap.epoch+1);
- dout(10) << "create_pending e " << pending_inc.epoch
- << dendl;
+ pending_inc.fsid = mon->monmap->fsid;
+
+ dout(10) << "create_pending e " << pending_inc.epoch << dendl;
}
void OSDMonitor::encode_pending(bufferlist &bl)
dout(10) << "connecting to " << peer_addr.v.ipaddr << dendl;
rc = ::connect(newsd, (sockaddr*)&peer_addr.v.ipaddr, sizeof(peer_addr.v.ipaddr));
if (rc < 0) {
- dout(10) << "connect error " << peer_addr.v.ipaddr
+ dout(2) << "connect error " << peer_addr.v.ipaddr
<< ", " << errno << ": " << strerror(errno) << dendl;
goto fail;
}
// identify peer
rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr));
if (rc < 0) {
- dout(0) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
+ dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
goto fail;
}
dout(20) << "connect read peer addr " << paddr << dendl;
// FINISH
if (state != STATE_CONNECTING) {
dout(2) << "connect hmm, race durring connect(), not connecting anymore, failing" << dendl;
- goto fail2; // hmm!
+ goto fail_locked; // hmm!
}
if (tag == CEPH_MSGR_TAG_REJECT) {
if (connect_seq != cseq) {
} else {
dout(0) << "connect got REJECT, connection race (harmless), connect_seq=" << connect_seq << dendl;
}
- goto fail2;
+ goto fail_locked;
}
assert(tag == CEPH_MSGR_TAG_READY);
state = STATE_OPEN;
fail:
lock.Lock();
- fail2:
+ fail_locked:
if (newsd > 0) ::close(newsd);
- fault(tag == CEPH_MSGR_TAG_REJECT);
+ fault(tag == CEPH_MSGR_TAG_REJECT); // quiet if reject (not socket error)
return -1;
}