dn->lease_ttl = dttl;
dn->lease_mds = mds;
dn->lease_seq = dlease->seq;
- dn->lease_gen = mds_sessions[mds].cap_gen;
+ dn->lease_gen = mds_sessions[mds]->cap_gen;
}
}
dn->cap_shared_gen = dir->parent_inode->shared_gen;
{
//this function shouldn't be called unless we lost a connection
assert (mds_sessions.count(mds));
- MDSSession &s = mds_sessions[mds];
- if (!s.requests.empty()) {
+ MDSSession *s = mds_sessions[mds];
+ if (!s->requests.empty()) {
const MDSMap::mds_info_t& info = mdsmap->get_mds_info(mds);
for (set<int>::const_iterator q = info.export_targets.begin();
q != info.export_targets.end();
{
dout(10) << "handle_client_session " << *m << dendl;
int from = m->get_source().num();
-
+ MDSSession *mds_session = mds_sessions[from];
switch (m->get_op()) {
case CEPH_SESSION_OPEN:
- mds_sessions[from].mds_num = from;
- mds_sessions[from].seq = 0;
- mds_sessions[from].inst = m->get_source_inst();
+ mds_sessions[from] = mds_session = new MDSSession();
+ mds_session->mds_num = from;
+ mds_session->seq = 0;
+ mds_session->inst = m->get_source_inst();
renew_caps(from);
break;
mount_cond.Signal();
remove_session_caps(from);
kick_requests(from, true);
+ delete mds_session;
mds_sessions.erase(from);
break;
case CEPH_SESSION_RENEWCAPS:
- if (mds_sessions[from].cap_renew_seq == m->get_seq()) {
- mds_sessions[from].cap_ttl =
- mds_sessions[from].last_cap_renew_request + mdsmap->get_session_timeout();
+ if (mds_session->cap_renew_seq == m->get_seq()) {
+ mds_session->cap_ttl =
+ mds_session->last_cap_renew_request + mdsmap->get_session_timeout();
wake_inode_waiters(from);
}
break;
case CEPH_SESSION_STALE:
- mds_sessions[from].was_stale = true;
+ mds_session->was_stale = true;
renew_caps(from);
break;
if (request->inode && request->inode->caps.count(mds))
request->sent_on_mseq = request->inode->caps[mds]->mseq;
- mds_sessions[mds].requests.push_back(&request->item);
+ mds_sessions[mds]->requests.push_back(&request->item);
dout(10) << "send_request " << *r << " to mds" << mds << dendl;
messenger->send_message(r, mdsmap->get_inst(mds));
if (!request->got_unsafe) {
request->got_unsafe = true;
- mds_sessions[mds_num].unsafe_requests.push_back(&request->unsafe_item);
+ mds_sessions[mds_num]->unsafe_requests.push_back(&request->unsafe_item);
Cond cond;
request->dispatch_cond = &cond;
mdsmap->decode(m->get_encoded());
// reset session
- for (map<int,MDSSession>::iterator p = mds_sessions.begin();
+ for (map<int,MDSSession*>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
p++) {
int oldstate = oldmap->get_state(p->first);
int newstate = mdsmap->get_state(p->first);
if (!mdsmap->is_up(p->first) ||
- mdsmap->get_inst(p->first) != p->second.inst) {
- messenger->mark_down(p->second.inst.addr);
+ mdsmap->get_inst(p->first) != p->second->inst) {
+ messenger->mark_down(p->second->inst.addr);
if (mdsmap->is_up(p->first))
- p->second.inst = mdsmap->get_inst(p->first);
+ p->second->inst = mdsmap->get_inst(p->first);
} else if (oldstate == newstate)
continue; // no change
}
// reset my cap seq number
- mds_sessions[mds].seq = 0;
+ mds_sessions[mds]->seq = 0;
//connect to the mds' offload targets
connect_mds_targets(mds);
void Client::resend_unsafe_requests(int mds_num)
{
- MDSSession& mds = mds_sessions[mds_num];
- for (xlist<MetaRequest*>::iterator iter = mds.unsafe_requests.begin();
+ MDSSession *mds = mds_sessions[mds_num];
+ for (xlist<MetaRequest*>::iterator iter = mds->unsafe_requests.begin();
!iter.end();
++iter)
send_request(*iter, mds_num);
void Client::got_mds_push(int mds)
{
- MDSSession& s = mds_sessions[mds];
+ MDSSession *s = mds_sessions[mds];
- s.seq++;
- dout(10) << " mds" << mds << " seq now " << s.seq << dendl;
- if (s.closing)
- messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s.seq),
- s.inst);
+ s->seq++;
+ dout(10) << " mds" << mds << " seq now " << s->seq << dendl;
+ if (s->closing)
+ messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, s->seq),
+ s->inst);
}
void Client::handle_lease(MClientLease *m)
flush = in->dirty_caps;
if (flush && !in->flushing_caps) {
dout(10) << " " << *in << " flushing" << dendl;
- mds_sessions[mds].flushing_caps.push_back(&in->flushing_cap_item);
+ mds_sessions[mds]->flushing_caps.push_back(&in->flushing_cap_item);
in->flushing_cap_seq = ++last_flush_seq;
in->get();
num_flushing_caps++;
void Client::wake_inode_waiters(int mds_num)
{
- MDSSession * mds = &mds_sessions[mds_num];
+ MDSSession * mds = mds_sessions[mds_num];
xlist<InodeCap*>::iterator iter = mds->caps.begin();
while (!iter.end()){
signal_cond_list((*iter)->inode->waitfor_caps);
int flags)
{
InodeCap *cap = 0;
+ MDSSession *mds_session = mds_sessions[mds];
if (in->caps.count(mds)) {
cap = in->caps[mds];
} else {
- mds_sessions[mds].num_caps++;
+ mds_session->num_caps++;
if (!in->is_any_caps()) {
assert(in->snaprealm == 0);
in->snaprealm = get_snap_realm(realm);
in->exporting_mseq = 0;
}
in->caps[mds] = cap = new InodeCap;
- mds_sessions[mds].caps.push_back(&cap->cap_item);
- cap->session = &mds_sessions[mds];
+ mds_session->caps.push_back(&cap->cap_item);
+ cap->session = mds_session;
cap->inode = in;
- cap->gen = mds_sessions[mds].cap_gen;
+ cap->gen = mds_session->cap_gen;
cap_list.push_back(&in->cap_item);
}
return;
}
InodeCap *cap = in->caps[mds];
- MDSSession *session = &mds_sessions[mds];
+ MDSSession *session = mds_sessions[mds];
if (!session->release)
session->release = new MClientCapRelease;
void Client::remove_session_caps(int mds_num)
{
if (mds_sessions.count(mds_num)) {
- MDSSession* mds = &mds_sessions[mds_num];
+ MDSSession* mds = mds_sessions[mds_num];
while (mds->caps.size())
remove_cap((*mds->caps.begin())->inode, mds_num);
}
void Client::trim_caps(int mds, int max)
{
dout(10) << "trim_caps mds" << mds << " max" << max << dendl;
- MDSSession *s = &mds_sessions[mds];
+ MDSSession *s = mds_sessions[mds];
int trimmed = 0;
xlist<InodeCap*>::iterator p = s->caps.begin();
retry:
dout(10) << "wait_sync_caps want " << want << " (last is " << last_flush_seq << ", "
<< num_flushing_caps << " total flushing)" << dendl;
- for (map<int,MDSSession>::iterator p = mds_sessions.begin();
+ for (map<int,MDSSession*>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
p++) {
- if (p->second.flushing_caps.empty())
+ if (p->second->flushing_caps.empty())
continue;
- Inode *in = p->second.flushing_caps.front();
+ Inode *in = p->second->flushing_caps.front();
if (in->flushing_cap_seq <= want) {
dout(10) << " waiting on mds" << p->first << " tid " << in->flushing_cap_seq
<< " (want " << want << ")" << dendl;
void Client::kick_flushing_caps(int mds)
{
dout(10) << "kick_flushing_caps" << dendl;
- MDSSession *session = &mds_sessions[mds];
+ MDSSession *session = mds_sessions[mds];
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
// send session closes!
- for (map<int,MDSSession>::iterator p = mds_sessions.begin();
+ for (map<int,MDSSession*>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
++p) {
dout(2) << "sending client_session close to mds" << p->first
- << " seq " << p->second.seq << dendl;
- p->second.closing = true;
- messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second.seq),
+ << " seq " << p->second->seq << dendl;
+ p->second->closing = true;
+ messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_CLOSE, p->second->seq),
mdsmap->get_inst(p->first));
}
renew_caps();
// send any cap releases
- for (map<int,MDSSession>::iterator p = mds_sessions.begin();
+ for (map<int,MDSSession*>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
p++) {
- if (p->second.release) {
- messenger->send_message(p->second.release, mdsmap->get_inst(p->first));
- p->second.release = 0;
+ if (p->second->release) {
+ messenger->send_message(p->second->release, mdsmap->get_inst(p->first));
+ p->second->release = 0;
}
}
}
dout(10) << "renew_caps()" << dendl;
last_cap_renew = g_clock.now();
- for (map<int,MDSSession>::iterator p = mds_sessions.begin();
+ for (map<int,MDSSession*>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
p++) {
dout(15) << "renew_caps requesting from mds" << p->first << dendl;
void Client::renew_caps(const int mds) {
dout(10) << "renew_caps mds" << mds << dendl;
- mds_sessions[mds].last_cap_renew_request = g_clock.now();
- uint64_t seq = ++mds_sessions[mds].cap_renew_seq;
+ MDSSession *session = mds_sessions[mds];
+ session->last_cap_renew_request = g_clock.now();
+ uint64_t seq = ++session->cap_renew_seq;
messenger->send_message(new MClientSession(CEPH_SESSION_REQUEST_RENEWCAPS, seq),
mdsmap->get_inst(mds));
if (dn->lease_mds >= 0 &&
dn->lease_ttl > now &&
mds_sessions.count(dn->lease_mds)) {
- MDSSession &s = mds_sessions[dn->lease_mds];
- if (s.cap_ttl > now &&
- s.cap_gen == dn->lease_gen) {
+ MDSSession *s = mds_sessions[dn->lease_mds];
+ if (s->cap_ttl > now &&
+ s->cap_gen == dn->lease_gen) {
*target = dn->inode;
goto done;
}
- dout(20) << " bad lease, cap_ttl " << s.cap_ttl << ", cap_gen " << s.cap_gen
+ dout(20) << " bad lease, cap_ttl " << s->cap_ttl << ", cap_gen " << s->cap_gen
<< " vs lease_gen " << dn->lease_gen << dendl;
}
// dir lease?
dn->lease_mds >= 0 &&
dn->lease_ttl > now &&
mds_sessions.count(dn->lease_mds)) {
- MDSSession &s = mds_sessions[dn->lease_mds];
- if (s.cap_ttl > now &&
- s.cap_gen == dn->lease_gen) {
+ MDSSession *s = mds_sessions[dn->lease_mds];
+ if (s->cap_ttl > now &&
+ s->cap_gen == dn->lease_gen) {
if (expect_null)
return -EEXIST;
else