if (!is_dir()) {
if ((file_i->inline_data.version != CEPH_INLINE_NONE &&
- !session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) ||
+ !session->get_connection()->has_feature(
+ CEPH_FEATURE_MDS_INLINE_DATA)) ||
(!file_i->layout.pool_ns.empty() &&
- !session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
+ !session->get_connection()->has_feature(
+ CEPH_FEATURE_FS_FILE_LAYOUT_V2)))
allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
}
return allowed;
ENCODE_FINISH(bl);
}
else {
- assert(session->connection);
+ assert(session->get_connection());
encode(oi->ino, bl);
encode(snapid, bl);
encode(file_i->rstat.rctime, bl);
dirfragtree.encode(bl);
encode(symlink, bl);
- if (session->connection->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
+ if (session->get_connection()->has_feature(CEPH_FEATURE_DIRLAYOUTHASH)) {
encode(file_i->dir_layout, bl);
}
encode(xbl, bl);
- if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
+ if (session->get_connection()->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
encode(inline_version, bl);
encode(inline_data, bl);
}
- if (session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
+ if (session->get_connection()->has_feature(CEPH_FEATURE_MDS_QUOTA)) {
mempool_inode *policy_i = ppolicy ? pi : oi;
encode(policy_i->quota, bl);
}
- if (session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
+ if (session->get_connection()->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
encode(layout.pool_ns, bl);
}
- if (session->connection->has_feature(CEPH_FEATURE_FS_BTIME)) {
+ if (session->get_connection()->has_feature(CEPH_FEATURE_FS_BTIME)) {
encode(any_i->btime, bl);
encode(any_i->change_attr, bl);
}
Session *session = mds->get_session(it->first);
if (in->inode.inline_data.version != CEPH_INLINE_NONE &&
- !(session && session->connection &&
- session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
+ !(session &&
+ session->get_connection() &&
+ session->get_connection()->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)))
allowed &= ~(CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR);
int pending = cap->pending();
for (auto &p : in->client_caps) {
Session *session = mds->get_session(p.first);
- if (!session || !session->connection ||
- !session->connection->has_feature(CEPH_FEATURE_MDS_QUOTA))
+ if (!session ||
+ !session->get_connection() ||
+ !session->get_connection()->has_feature(CEPH_FEATURE_MDS_QUOTA))
continue;
Capability *cap = &p.second;
msg->ino = in->ino();
msg->rstat = i->rstat;
msg->quota = i->quota;
- mds->send_message_client_counted(msg, session->connection);
+ mds->send_message_client_counted(msg, session->get_connection());
}
for (const auto &it : in->get_replicas()) {
MGatherCaps *msg = new MGatherCaps;
// This session only existed to issue commands, so terminate it
// as soon as we can.
assert(session->is_closed());
- session->connection->mark_disposable();
+ session->get_connection()->mark_disposable();
}
priv.reset();
mds_rank->kick_waiters_for_any_client_connection();
}
} else {
- dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
+ dout(10) << " existing session " << s << " for " << s->info.inst
+ << " existing con " << s->get_connection()
<< ", new/authorizing con " << con << dendl;
con->set_priv(RefCountedPtr{s});
auto s = static_cast<Session *>(priv.get());
dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
if (s) {
- if (s->connection != con) {
- dout(10) << " session connection " << s->connection << " -> " << con << dendl;
+ if (s->get_connection() != con) {
+ dout(10) << " session connection " << s->get_connection()
+ << " -> " << con << dendl;
s->set_connection(con);
// send out any queued messages
if (session->is_closed()) {
Session *imported_session = sessionmap.get_session(session->info.inst.name);
if (imported_session && imported_session != session) {
- dout(10) << __func__ << " replacing connection bootstrap session " << session << " with imported session " << imported_session << dendl;
+ dout(10) << __func__ << " replacing connection bootstrap session "
+ << session << " with imported session " << imported_session
+ << dendl;
imported_session->info.auth_name = session->info.auth_name;
//assert(session->info.auth_name == imported_session->info.auth_name);
assert(session->info.inst == imported_session->info.inst);
- imported_session->set_connection(session->connection);
+ imported_session->set_connection(session->get_connection().get());
// send out any queued messages
while (!session->preopen_out_queue.empty()) {
- imported_session->connection->send_message(session->preopen_out_queue.front());
+ imported_session->get_connection()->send_message(
+ session->preopen_out_queue.front());
session->preopen_out_queue.pop_front();
}
imported_session->auth_caps = session->auth_caps;
assert(session->get_nref() == 1);
- imported_session->connection->set_priv(imported_session->get());
+ imported_session->get_connection()->set_priv(imported_session->get());
session = imported_session;
}
}
version_t seq = session->inc_push_seq();
dout(10) << "send_message_client_counted " << session->info.inst.name << " seq "
<< seq << " " << *m << dendl;
- if (session->connection) {
- session->connection->send_message(m);
+ if (session->get_connection()) {
+ session->get_connection()->send_message(m);
} else {
session->preopen_out_queue.push_back(m);
}
void MDSRank::send_message_client(Message *m, Session *session)
{
dout(10) << "send_message_client " << session->info.inst << " " << *m << dendl;
- if (session->connection) {
- session->connection->send_message(m);
+ if (session->get_connection()) {
+ session->get_connection()->send_message(m);
} else {
session->preopen_out_queue.push_back(m);
}
for (set<Session*>::const_iterator p = clients.begin();
p != clients.end();
++p)
- (*p)->connection->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
+ (*p)->get_connection()->send_message(new MMDSMap(monc->get_fsid(), mdsmap));
last_client_mdsmap_bcast = mdsmap->get_epoch();
}
// For non-auth caps, ask exporter mds to send cap export messages to
// clients who haven't opened sessions. The cap export messages will
// make clients open sessions.
- if (auth_cap || session->connection == nullptr) {
+ if (auth_cap || !session->get_connection()) {
Capability::Import& im = import_map[it.first];
im.cap_id = cap->get_cap_id();
im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
assert(session);
if (!session->is_open() ||
- !session->connection.get() ||
- !session->connection->has_feature(CEPH_FEATURE_EXPORT_PEER))
+ !session->get_connection() ||
+ !session->get_connection()->has_feature(CEPH_FEATURE_EXPORT_PEER))
continue;
version_t seq = session->wait_for_flush(gather.new_sub());
mds->send_message_client(new MClientSession(CEPH_SESSION_FLUSHMSG, seq), session);
assert(session->is_opening());
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
- assert(session->connection != NULL);
+ assert(session->get_connection());
MClientSession *reply = new MClientSession(CEPH_SESSION_OPEN);
if (session->info.has_feature(CEPHFS_FEATURE_MIMIC))
reply->supported_features = supported_features;
- session->connection->send_message(reply);
+ session->get_connection()->send_message(reply);
if (mdcache->is_readonly())
- session->connection->send_message(new MClientSession(CEPH_SESSION_FORCE_RO));
+ session->get_connection()->send_message(new MClientSession(CEPH_SESSION_FORCE_RO));
} else if (session->is_closing() ||
session->is_killing()) {
// kill any lingering capabilities, leases, requests
// ms_handle_remote_reset() and realize they had in fact closed.
// do this *before* sending the message to avoid a possible
// race.
- if (session->connection != NULL) {
+ if (session->get_connection()) {
// Conditional because terminate_sessions will indiscrimately
// put sessions in CLOSING whether they ever had a conn or not.
- session->connection->mark_disposable();
+ session->get_connection()->mark_disposable();
}
// reset session
mds->sessionmap.remove_session(session);
} else if (session->is_killing()) {
// destroy session, close connection
- if (session->connection != NULL) {
- session->connection->mark_down();
- session->connection->set_priv(NULL);
+ if (session->get_connection()) {
+ session->get_connection()->mark_down();
+ session->get_connection()->set_priv(NULL);
}
mds->sessionmap.remove_session(session);
} else {
// user space client
if (it->second.compare(0, 16, "ceph version 12.") == 0)
supported = CEPHFS_FEATURE_LUMINOUS;
- else if (session->connection->has_feature(CEPH_FEATURE_FS_CHANGE_ATTR))
+ else if (session->get_connection()->has_feature(CEPH_FEATURE_FS_CHANGE_ATTR))
supported = CEPHFS_FEATURE_KRAKEN;
} else {
it = client_metadata.find("kernel_version");
if (it != client_metadata.end()) {
// kernel client
- if (session->connection->has_feature(CEPH_FEATURE_NEW_OSDOP_ENCODING))
+ if (session->get_connection()->has_feature(CEPH_FEATURE_NEW_OSDOP_ENCODING))
supported = CEPHFS_FEATURE_LUMINOUS;
}
}
if (supported == -1 &&
- session->connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2))
+ session->get_connection()->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2))
supported = CEPHFS_FEATURE_JEWEL;
if (supported >= 0) {
}
if (!mds->mdsmap->get_inline_data_enabled() ||
- !mdr->session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA))
+ !mdr->session->get_connection()->has_feature(CEPH_FEATURE_MDS_INLINE_DATA))
in->inode.inline_data.version = CEPH_INLINE_NONE;
mdcache->add_inode(in); // add
}
if (cur->inode.inline_data.version != CEPH_INLINE_NONE &&
- !mdr->session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
+ !mdr->session->get_connection()->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) {
dout(7) << "old client cannot open inline data file " << *cur << dendl;
respond_to_request(mdr, -EPERM);
return;
MDSAuthCaps auth_caps;
+protected:
ConnectionRef connection;
+public:
entity_addr_t socket_addr;
xlist<Session*>::item item_session_list;
connection = con;
socket_addr = con->get_peer_socket_addr();
}
+ const ConnectionRef& get_connection() const {
+ return connection;
+ }
void clear() {
pending_prealloc_inos.clear();
} else {
session = mds->sessionmap.get_session(client_inst.name);
if (session) { // there always should be a session, but there's a bug
- if (session->connection == NULL) {
+ if (session->get_connection() == NULL) {
dout(10) << " removed session " << session->info.inst << dendl;
mds->sessionmap.remove_session(session);
session = NULL;