case MSG_MDS_EXPORTCAPS:
handle_export_caps(static_cast<MExportCaps*>(m));
break;
+ case MSG_MDS_EXPORTCAPSACK:
+ handle_export_caps_ack(static_cast<MExportCapsAck*>(m));
+ break;
case MSG_MDS_GATHERCAPS:
handle_gather_caps(static_cast<MGatherCaps*>(m));
break;
cap->mark_importing();
}
- Capability::Import& im = import_map[it.first];
- im.cap_id = cap->get_cap_id();
- im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
- im.issue_seq = cap->get_last_seq() + 1;
+ // Always ask exporter mds to send cap export messages for auth caps.
+ // For non-auth caps, ask exporter mds to send cap export messages to
+ // clients who haven't opened sessions. The cap export messages will
+ // make clients open sessions.
+ if (auth_cap || session->connection == nullptr) {
+ Capability::Import& im = import_map[it.first];
+ im.cap_id = cap->get_cap_id();
+ im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
+ im.issue_seq = cap->get_last_seq() + 1;
+ }
if (peer >= 0) {
cap->merge(it.second, auth_cap);
mds->send_message_mds(ex, dest);
}
+/* This function DOES put the passed message before returning*/
+void Migrator::handle_export_caps_ack(MExportCapsAck *ack)
+{
+ mds_rank_t from = ack->get_source().num();
+ CInode *in = cache->get_inode(ack->ino);
+ if (in) {
+ assert(!in->is_auth());
+
+ dout(10) << "handle_export_caps_ack " << *ack << " from "
+ << ack->get_source() << " on " << *in << dendl;
+
+ map<client_t,Capability::Import> imported_caps;
+ map<client_t,uint64_t> caps_ids;
+ auto blp = ack->cap_bl.cbegin();
+ decode(imported_caps, blp);
+ decode(caps_ids, blp);
+
+ for (auto& it : imported_caps) {
+ Capability *cap = in->get_client_cap(it.first);
+ if (!cap || cap->get_cap_id() != caps_ids.at(it.first))
+ continue;
+
+ dout(7) << __func__ << " telling client." << it.first
+ << " exported caps on " << *in << dendl;
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0,
+ cap->get_cap_id(), cap->get_mseq(),
+ mds->get_osd_epoch_barrier());
+ m->set_cap_peer(it.second.cap_id, it.second.issue_seq, it.second.mseq, from, 0);
+ mds->send_message_client_counted(m, it.first);
+
+ in->remove_client_cap(it.first);
+ }
+
+ mds->locker->request_inode_file_caps(in);
+ mds->locker->try_eval(in, CEPH_CAP_LOCKS);
+ }
+
+ ack->put();
+}
+
void Migrator::handle_gather_caps(MGatherCaps *m)
{
CInode *in = cache->get_inode(m->ino);
-
if (!in)
goto out;
dout(10) << "handle_gather_caps " << *m << " from " << m->get_source()
- << " on " << *in
- << dendl;
+ << " on " << *in << dendl;
+
if (in->is_any_caps() &&
!in->is_auth() &&
!in->is_ambiguous_auth() &&
// force open client sessions and finish cap import
mds->server->finish_force_open_sessions(imported_session_map);
- map<client_t,Capability::Import> imported_caps;
-
auto it = peer_exports.find(in);
assert(it != peer_exports.end());
// clients will release caps from the exporter when they receive the cap import message.
+ map<client_t,Capability::Import> imported_caps;
finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
+
+ if (!imported_caps.empty()) {
+ MExportCapsAck *ack = new MExportCapsAck(in->ino());
+ map<client_t,uint64_t> peer_caps_ids;
+ for (auto &p : imported_caps )
+ peer_caps_ids[p.first] = it->second.at(p.first).cap_id;
+
+ encode(imported_caps, ack->cap_bl);
+ encode(peer_caps_ids, ack->cap_bl);
+ mds->send_message_mds(ack, from);
+ }
+
in->auth_unpin(this);
}