f->open_array_section("sessions");
for (const auto &p : mds_sessions) {
f->open_object_section("session");
- p.second.dump(f, cap_dump);
+ p.second->dump(f, cap_dump);
f->close_section();
}
f->close_section();
if (use_mds >= 0)
request->resend_mds = use_mds;
- MetaSession *session = NULL;
+ MetaSessionRef session = NULL;
while (1) {
if (request->aborted())
break;
if (!have_open_session(mds))
continue;
} else {
- session = &mds_sessions.at(mds);
+ session = mds_sessions.at(mds);
}
// send request.
- send_request(request, session);
+ send_request(request, session.get());
// wait for signal
ldout(cct, 20) << "awaiting reply|forward|kick on " << &caller_cond << dendl;
request->dispatch_cond = 0;
if (r >= 0 && ptarget)
- r = verify_reply_trace(r, session, request, reply, ptarget, pcreated, perms);
+ r = verify_reply_trace(r, session.get(), request, reply, ptarget, pcreated, perms);
if (pdirbl)
*pdirbl = reply->get_extra_bl();
{
const auto &it = mds_sessions.find(mds);
return it != mds_sessions.end() &&
- (it->second.state == MetaSession::STATE_OPEN ||
- it->second.state == MetaSession::STATE_STALE);
+ (it->second->state == MetaSession::STATE_OPEN ||
+ it->second->state == MetaSession::STATE_STALE);
}
-MetaSession *Client::_get_mds_session(mds_rank_t mds, Connection *con)
+MetaSessionRef Client::_get_mds_session(mds_rank_t mds, Connection *con)
{
const auto &it = mds_sessions.find(mds);
- if (it == mds_sessions.end() || it->second.con != con) {
+ if (it == mds_sessions.end() || it->second->con != con) {
return NULL;
} else {
- return &it->second;
+ return it->second;
}
}
-MetaSession *Client::_get_or_open_mds_session(mds_rank_t mds)
+MetaSessionRef Client::_get_or_open_mds_session(mds_rank_t mds)
{
auto it = mds_sessions.find(mds);
- return it == mds_sessions.end() ? _open_mds_session(mds) : &it->second;
+ return it == mds_sessions.end() ? _open_mds_session(mds) : it->second;
}
/**
metadata[k] = v;
}
-MetaSession *Client::_open_mds_session(mds_rank_t mds)
+MetaSessionRef Client::_open_mds_session(mds_rank_t mds)
{
ldout(cct, 10) << __func__ << " mds." << mds << dendl;
auto addrs = mdsmap->get_addrs(mds);
auto em = mds_sessions.emplace(std::piecewise_construct,
std::forward_as_tuple(mds),
- std::forward_as_tuple(mds, messenger->connect_to_mds(addrs), addrs));
+ std::forward_as_tuple(new MetaSession(mds, messenger->connect_to_mds(addrs), addrs)));
ceph_assert(em.second); /* not already present */
- MetaSession *session = &em.first->second;
+ auto session = em.first->second;
auto m = make_message<MClientSession>(CEPH_SESSION_REQUEST_OPEN);
m->metadata = metadata;
ldout(cct, 10) << __func__ << " " << *m << " from mds." << from << dendl;
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(from, m->get_connection().get());
+ auto session = _get_mds_session(from, m->get_connection().get());
if (!session) {
ldout(cct, 10) << " discarding session message from sessionless mds " << m->get_source_inst() << dendl;
return;
if (!missing_features.empty()) {
lderr(cct) << "mds." << from << " lacks required features '"
<< missing_features << "', closing session " << dendl;
- _close_mds_session(session);
- _closed_mds_session(session, -CEPHFS_EPERM, true);
+ _close_mds_session(session.get());
+ _closed_mds_session(session.get(), -CEPHFS_EPERM, true);
break;
}
session->mds_features = std::move(m->supported_features);
- renew_caps(session);
+ renew_caps(session.get());
session->state = MetaSession::STATE_OPEN;
if (is_unmounting())
mount_cond.notify_all();
}
case CEPH_SESSION_CLOSE:
- _closed_mds_session(session);
+ _closed_mds_session(session.get());
break;
case CEPH_SESSION_RENEWCAPS:
session->cap_ttl =
session->last_cap_renew_request + mdsmap->get_session_timeout();
if (was_stale)
- wake_up_session_caps(session, false);
+ wake_up_session_caps(session.get(), false);
}
break;
session->cap_gen++;
session->cap_ttl = ceph_clock_now();
session->cap_ttl -= 1;
- renew_caps(session);
+ renew_caps(session.get());
break;
case CEPH_SESSION_RECALL_STATE:
* by tick().
*/
renew_and_flush_cap_releases();
- trim_caps(session, m->get_max_caps());
+ trim_caps(session.get(), m->get_max_caps());
break;
case CEPH_SESSION_FLUSHMSG:
break;
case CEPH_SESSION_FORCE_RO:
- force_session_readonly(session);
+ force_session_readonly(session.get());
break;
case CEPH_SESSION_REJECT:
error_str = "unknown error";
lderr(cct) << "mds." << from << " rejected us (" << error_str << ")" << dendl;
- _closed_mds_session(session, -CEPHFS_EPERM, true);
+ _closed_mds_session(session.get(), -CEPHFS_EPERM, true);
}
break;
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
for (const auto &p : mds_sessions) {
- if (p.second.state == MetaSession::STATE_STALE) {
+ if (p.second->state == MetaSession::STATE_STALE) {
return true;
}
}
ldout(cct, 1) << __func__ << dendl;
for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
- MetaSession &s = it->second;
- if (s.state == MetaSession::STATE_REJECTED) {
- mds_sessions.erase(it++);
+ auto s = it->second;
+ if (s->state == MetaSession::STATE_REJECTED) {
+ mds_sessions.erase(it->first);
continue;
}
- ++it;
- if (s.state == MetaSession::STATE_STALE)
- _closed_mds_session(&s);
+ if (s->state == MetaSession::STATE_STALE)
+ _closed_mds_session(s.get());
}
}
mds_rank_t mds = mds_rank_t(fwd->get_source().num());
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(mds, fwd->get_connection().get());
+ auto session = _get_mds_session(mds, fwd->get_connection().get());
if (!session) {
return;
}
mds_rank_t mds_num = mds_rank_t(reply->get_source().num());
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(mds_num, reply->get_connection().get());
+ auto session = _get_mds_session(mds_num, reply->get_connection().get());
if (!session) {
return;
}
ceph_assert(!request->reply);
request->reply = reply;
- insert_trace(request, session);
+ insert_trace(request, session.get());
// Handle unsafe reply
if (!is_safe) {
_mdsmap.swap(mdsmap);
// reset session
- for (auto p = mds_sessions.begin(); p != mds_sessions.end(); ) {
- mds_rank_t mds = p->first;
- MetaSession *session = &p->second;
- ++p;
+ for (auto &p : mds_sessions) {
+ mds_rank_t mds = p.first;
+ auto session = p.second;
int oldstate = _mdsmap->get_state(mds);
int newstate = mdsmap->get_state(mds);
// When new MDS starts to take over, notify kernel to trim unused entries
// in its dcache/icache. Hopefully, the kernel will release some unused
// inodes before the new MDS enters reconnect state.
- trim_cache_for_reconnect(session);
+ trim_cache_for_reconnect(session.get());
} else if (oldstate == newstate)
continue; // no change
session->mds_state = newstate;
if (newstate == MDSMap::STATE_RECONNECT) {
session->con = messenger->connect_to_mds(session->addrs);
- send_reconnect(session);
+ send_reconnect(session.get());
} else if (newstate > MDSMap::STATE_RECONNECT) {
if (oldstate < MDSMap::STATE_RECONNECT) {
ldout(cct, 1) << "we may miss the MDSMap::RECONNECT, close mds session ... " << dendl;
- _closed_mds_session(session);
+ _closed_mds_session(session.get());
continue;
}
if (newstate >= MDSMap::STATE_ACTIVE) {
if (oldstate < MDSMap::STATE_ACTIVE) {
// kick new requests
- kick_requests(session);
- kick_flushing_caps(session);
+ kick_requests(session.get());
+ kick_flushing_caps(session.get());
signal_context_list(session->waiting_for_open);
- wake_up_session_caps(session, true);
+ wake_up_session_caps(session.get(), true);
}
connect_mds_targets(mds);
}
} else if (newstate == MDSMap::STATE_NULL &&
mds >= mdsmap->get_max_mds()) {
- _closed_mds_session(session);
+ _closed_mds_session(session.get());
}
}
{
list<MetaRequest*> last_unsafe_reqs;
for (const auto &p : mds_sessions) {
- const MetaSession &s = p.second;
- if (!s.unsafe_requests.empty()) {
- MetaRequest *req = s.unsafe_requests.back();
+ const auto s = p.second;
+ if (!s->unsafe_requests.empty()) {
+ MetaRequest *req = s->unsafe_requests.back();
req->get();
last_unsafe_reqs.push_back(req);
}
mds_rank_t mds = mds_rank_t(m->get_source().num());
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(mds, m->get_connection().get());
+ auto session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
return;
}
- got_mds_push(session);
+ got_mds_push(session.get());
ceph_seq_t seq = m->get_seq();
}
for (auto &[mds, cap] : in->caps) {
- MetaSession *session = &mds_sessions.at(mds);
+ auto session = mds_sessions.at(mds);
cap_used = used;
if (in->auth_cap && &cap != in->auth_cap)
if (in->flags & I_KICK_FLUSH) {
ldout(cct, 20) << " reflushing caps (check_caps) on " << *in
<< " to mds." << mds << dendl;
- kick_flushing_caps(in, session);
+ kick_flushing_caps(in, session.get());
}
if (!in->cap_snaps.empty() &&
in->cap_snaps.rbegin()->second.flush_tid == 0)
flush_tid = 0;
}
- send_cap(in, session, &cap, msg_flags, cap_used, wanted, retain,
+ send_cap(in, session.get(), &cap, msg_flags, cap_used, wanted, retain,
flushing, flush_tid);
}
}
ldout(cct, 10) << __func__ << " want " << want << " (last is " << last_flush_tid << ", "
<< num_flushing_caps << " total flushing)" << dendl;
for (auto &p : mds_sessions) {
- MetaSession *s = &p.second;
+ auto s = p.second;
if (s->flushing_caps_tids.empty())
continue;
ceph_tid_t oldest_tid = *s->flushing_caps_tids.begin();
mds_rank_t mds = mds_rank_t(m->get_source().num());
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(mds, m->get_connection().get());
+ auto session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
return;
}
- got_mds_push(session);
+ got_mds_push(session.get());
map<Inode*, SnapContext> to_move;
SnapRealm *realm = 0;
mds_rank_t mds = mds_rank_t(m->get_source().num());
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(mds, m->get_connection().get());
+ auto session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
return;
}
- got_mds_push(session);
+ got_mds_push(session.get());
ldout(cct, 10) << __func__ << " " << *m << " from mds." << mds << dendl;
mds_rank_t mds = mds_rank_t(m->get_source().num());
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(mds, m->get_connection().get());
+ auto session = _get_mds_session(mds, m->get_connection().get());
if (!session) {
return;
}
set_cap_epoch_barrier(m->osd_epoch_barrier);
}
- got_mds_push(session);
+ got_mds_push(session.get());
Inode *in;
vinodeno_t vino(m->get_ino(), CEPH_NOSNAP);
}
switch (m->get_op()) {
- case CEPH_CAP_OP_EXPORT: return handle_cap_export(session, in, m);
- case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(session, in, m);
- case CEPH_CAP_OP_IMPORT: /* no return */ handle_cap_import(session, in, m);
+ case CEPH_CAP_OP_EXPORT: return handle_cap_export(session.get(), in, m);
+ case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(session.get(), in, m);
+ case CEPH_CAP_OP_IMPORT: /* no return */ handle_cap_import(session.get(), in, m);
}
if (auto it = in->caps.find(mds); it != in->caps.end()) {
Cap &cap = in->caps.at(mds);
switch (m->get_op()) {
- case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(session, in, m);
+ case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(session.get(), in, m);
case CEPH_CAP_OP_IMPORT:
case CEPH_CAP_OP_REVOKE:
- case CEPH_CAP_OP_GRANT: return handle_cap_grant(session, in, &cap, m);
- case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(session, in, &cap, m);
+ case CEPH_CAP_OP_GRANT: return handle_cap_grant(session.get(), in, &cap, m);
+ case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(session.get(), in, &cap, m);
}
} else {
ldout(cct, 5) << __func__ << " don't have " << *in << " cap on mds." << mds << dendl;
if (cap.cap_id == m->get_cap_id()) {
if (m->peer.cap_id) {
const auto peer_mds = mds_rank_t(m->peer.mds);
- MetaSession *tsession = _get_or_open_mds_session(peer_mds);
+ auto tsession = _get_or_open_mds_session(peer_mds);
auto it = in->caps.find(peer_mds);
if (it != in->caps.end()) {
Cap &tcap = it->second;
if (&cap == in->auth_cap)
in->auth_cap = &tcap;
if (in->auth_cap == &tcap && in->flushing_cap_item.is_on_list())
- adjust_session_flushing_caps(in, session, tsession);
+ adjust_session_flushing_caps(in, session, tsession.get());
}
} else {
- add_update_cap(in, tsession, m->peer.cap_id, cap.issued, 0,
+ add_update_cap(in, tsession.get(), m->peer.cap_id, cap.issued, 0,
m->peer.seq - 1, m->peer.mseq, (uint64_t)-1,
&cap == in->auth_cap ? CEPH_CAP_FLAG_AUTH : 0,
cap.latest_perms);
void Client::_close_sessions()
{
for (auto it = mds_sessions.begin(); it != mds_sessions.end(); ) {
- if (it->second.state == MetaSession::STATE_REJECTED)
+ if (it->second->state == MetaSession::STATE_REJECTED)
mds_sessions.erase(it++);
else
++it;
while (!mds_sessions.empty()) {
// send session closes!
for (auto &p : mds_sessions) {
- if (p.second.state != MetaSession::STATE_CLOSING) {
- _close_mds_session(&p.second);
+ if (p.second->state != MetaSession::STATE_CLOSING) {
+ _close_mds_session(p.second.get());
mds_ranks_closing.insert(p.first);
}
}
while (!mds_ranks_closing.empty()) {
auto session = mds_sessions.at(*mds_ranks_closing.begin());
// this prunes entry from mds_sessions and mds_ranks_closing
- _closed_mds_session(&session, -CEPHFS_ETIMEDOUT);
+ _closed_mds_session(session.get(), -CEPHFS_ETIMEDOUT);
}
}
for (auto &rank : anchor) {
auto session = &mds_sessions.at(rank);
- flush_mdlog(session);
+ flush_mdlog(session->get());
}
}
if (mds_requests.empty())
return;
for (auto &p : mds_sessions) {
- flush_mdlog(&p.second);
+ flush_mdlog(p.second.get());
}
}
// Force-close all sessions
while(!mds_sessions.empty()) {
- auto& session = mds_sessions.begin()->second;
- _closed_mds_session(&session, err);
+ auto session = mds_sessions.begin()->second;
+ _closed_mds_session(session.get(), err);
}
}
// send any cap releases
for (auto &p : mds_sessions) {
- auto &session = p.second;
- if (session.release && mdsmap->is_clientreplay_or_active_or_stopping(
+ auto session = p.second;
+ if (session->release && mdsmap->is_clientreplay_or_active_or_stopping(
p.first)) {
- nr_caps += session.release->caps.size();
+ nr_caps += session->release->caps.size();
if (cct->_conf->client_inject_release_failure) {
ldout(cct, 20) << __func__ << " injecting failure to send cap release message" << dendl;
} else {
- session.con->send_message2(std::move(session.release));
+ session->con->send_message2(std::move(session->release));
}
- session.release.reset();
+ session->release.reset();
}
}
}
signal_cond_list(waiting_for_mdsmap);
for (auto &p : mds_sessions) {
- signal_context_list(p.second.waiting_for_open);
+ signal_context_list(p.second->waiting_for_open);
}
}
}
for (auto &p : mds_sessions) {
ldout(cct, 15) << "renew_caps requesting from mds." << p.first << dendl;
if (mdsmap->get_state(p.first) >= MDSMap::STATE_REJOIN)
- renew_caps(&p.second);
+ renew_caps(p.second.get());
}
}
utime_t now = ceph_clock_now();
if (dn->lease_mds >= 0 && dn->lease_ttl > now &&
mds_sessions.count(dn->lease_mds)) {
- MetaSession &s = mds_sessions.at(dn->lease_mds);
- if (s.cap_ttl > now && s.cap_gen == dn->lease_gen) {
+ auto s = mds_sessions.at(dn->lease_mds);
+ if (s->cap_ttl > now && s->cap_gen == dn->lease_gen) {
dlease_hit();
return true;
}
- ldout(cct, 20) << " bad lease, cap_ttl " << s.cap_ttl << ", cap_gen " << s.cap_gen
+ ldout(cct, 20) << " bad lease, cap_ttl " << s->cap_ttl << ", cap_gen " << s->cap_gen
<< " vs lease_gen " << dn->lease_gen << dendl;
}
{
// kludge to figure out which mds this is; fixme with a Connection* state
mds_rank_t mds = MDS_RANK_NONE;
- MetaSession *s = NULL;
+ MetaSessionRef s = NULL;
for (auto &p : mds_sessions) {
if (mdsmap->have_inst(p.first) && mdsmap->get_addrs(p.first) == con->get_peer_addrs()) {
mds = p.first;
- s = &p.second;
+ s = p.second;
}
}
if (mds >= 0) {
switch (s->state) {
case MetaSession::STATE_CLOSING:
ldout(cct, 1) << "reset from mds we were closing; we'll call that closed" << dendl;
- _closed_mds_session(s);
+ _closed_mds_session(s.get());
break;
case MetaSession::STATE_OPENING:
ldout(cct, 1) << "reset from mds we were opening; retrying" << dendl;
list<Context*> waiters;
waiters.swap(s->waiting_for_open);
- _closed_mds_session(s);
- MetaSession *news = _get_or_open_mds_session(mds);
+ _closed_mds_session(s.get());
+ auto news = _get_or_open_mds_session(mds);
news->waiting_for_open.swap(waiters);
}
break;
objecter->maybe_request_map(); /* to check if we are blocklisted */
if (cct->_conf.get_val<bool>("client_reconnect_stale")) {
ldout(cct, 1) << "reset from mds we were open; close mds session for reconnect" << dendl;
- _closed_mds_session(s);
+ _closed_mds_session(s.get());
} else {
ldout(cct, 1) << "reset from mds we were open; mark session as stale" << dendl;
s->state = MetaSession::STATE_STALE;
continue;
}
- MetaSession *session;
+ MetaSessionRef session;
if (!have_open_session(mds)) {
session = _get_or_open_mds_session(mds);
if (session->state == MetaSession::STATE_REJECTED)
continue;
}
- session = &mds_sessions.at(mds);
+ session = mds_sessions.at(mds);
if (!session->mds_features.test(CEPHFS_FEATURE_RECLAIM_CLIENT))
return -CEPHFS_EOPNOTSUPP;
auto it = metadata.find("reclaiming_uuid");
if (it == metadata.end()) {
for (auto &p : mds_sessions)
- p.second.reclaim_state = MetaSession::RECLAIM_NULL;
+ p.second->reclaim_state = MetaSession::RECLAIM_NULL;
return;
}
for (auto &p : mds_sessions) {
- p.second.reclaim_state = MetaSession::RECLAIM_NULL;
+ p.second->reclaim_state = MetaSession::RECLAIM_NULL;
auto m = make_message<MClientReclaim>("", MClientReclaim::FLAG_FINISH);
- p.second.con->send_message2(std::move(m));
+ p.second->con->send_message2(std::move(m));
}
metadata["uuid"] = it->second;
ldout(cct, 10) << __func__ << " " << *reply << " from mds." << from << dendl;
std::scoped_lock cl(client_lock);
- MetaSession *session = _get_mds_session(from, reply->get_connection().get());
+ auto session = _get_mds_session(from, reply->get_connection().get());
if (!session) {
ldout(cct, 10) << " discarding reclaim reply from sessionless mds." << from << dendl;
return;