kernel client
-- flush unsafe mds requests when closing mds sessions
- optional or no fill_trace?
- flock, fnctl locks
- async xattrs
- fix readdir vs fragment race by keeping a separate frag pos, and ignoring dentries below it
mds
+- take some care with replayed client requests vs new requests
- linkage vs cdentry replicas and remote rename....
- move root inode into stray dir
- make recovery work with early replies
} __attribute__ ((packed)) setlayout;
} __attribute__ ((packed));
+#define CEPH_MDS_REQUEST_REPLAY 0xffff
+
struct ceph_mds_request_head {
ceph_tid_t tid, oldest_client_tid;
ceph_epoch_t mdsmap_epoch; /* on client */
__le32 num_fwd;
- __le32 retry_attempt;
+ __le32 retry_attempt; /* REQUEST_REPLAY if replay */
__le64 mds_wants_replica_in_dirino;
__le32 op;
__le32 caller_uid, caller_gid;
bool CInode::encode_inodestat(bufferlist& bl, Session *session,
- snapid_t snapid)
+ snapid_t snapid, bool is_replay)
{
int client = session->inst.name.num();
cap = add_client_cap(client, session, &mdcache->client_rdcaps, find_snaprealm());
}
+ if (is_replay) {
+ // if this is a replayed request, check for a cap reconnect
+ ceph_mds_cap_reconnect *rc = mdcache->get_replay_cap_reconnect(pi->ino, client);
+ if (rc) {
+ // we should only have the cap reconnect for ONE client, and from ourselves.
+ dout(10) << " incorporating cap reconnect wanted " << ccap_string(rc->wanted)
+ << " issue " << ccap_string(rc->issued) << " on " << *this << dendl;
+ cap->set_wanted(rc->wanted);
+ cap->issue(rc->issued);
+ mdcache->remove_replay_cap_reconnect(pi->ino, client);
+ }
+ }
+
// if we're a directory, maybe bump filelock to loner?
if (inode.is_dir() &&
is_auth() &&
// for giving to clients
- bool encode_inodestat(bufferlist& bl, Session *session, snapid_t snapid=CEPH_NOSNAP);
+ bool encode_inodestat(bufferlist& bl, Session *session, snapid_t snapid=CEPH_NOSNAP, bool is_replay=false);
void encode_cap_message(MClientCaps *m, Capability *cap);
* returns a C_Gather* is there is work to do. caller is responsible for setting
* the C_Gather completer.
*/
-C_Gather *MDCache::parallel_fetch(map<inodeno_t,filepath>& pathmap)
+C_Gather *MDCache::parallel_fetch(map<inodeno_t,filepath>& pathmap, set<inodeno_t>& missing)
{
dout(10) << "parallel_fetch on " << pathmap.size() << " paths" << dendl;
dout(17) << " missing " << p->first << " at " << p->second << dendl;
CDir *dir = path_traverse_to_dir(p->second);
assert(dir);
- fetch_queue.insert(dir);
- p++;
+ if (!dir->is_complete()) {
+ fetch_queue.insert(dir);
+ p++;
+ } else {
+ // probably because the client created it and held a cap but it never committed
+ // to the journal, and the op hasn't replayed yet.
+ dout(5) << " dne (not created yet?) " << p->first << " at " << p->second << dendl;
+ missing.insert(p->first);
+ pathmap.erase(p++);
+ }
}
if (pathmap.empty()) {
// do this before ack, since some inodes we may have already gotten
// from surviving MDSs.
if (!cap_import_paths.empty()) {
- C_Gather *gather = parallel_fetch(cap_import_paths);
+ C_Gather *gather = parallel_fetch(cap_import_paths, cap_imports_missing);
if (gather) {
gather->set_finisher(new C_MDC_RejoinGatherFinish(this));
return;
}
}
+ process_imported_caps();
+ process_reconnected_caps();
+ identify_files_to_recover();
+
+ rejoin_send_acks();
+
+ // did we already get our acks too?
+ // this happens when the rejoin_gather has to wait on a MISSING/FULL exchange.
+ if (rejoin_ack_gather.empty()) {
+ mds->rejoin_done();
+
+ // finally, kickstart past snap parent opens
+ open_snap_parents();
+ }
+}
+
+void MDCache::process_imported_caps()
+{
// process cap imports
// ino -> client -> frommds -> capex
- for (map<inodeno_t,map<int, map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin();
- p != cap_imports.end();
- ++p) {
+ map<inodeno_t,map<int, map<int,ceph_mds_cap_reconnect> > >::iterator p = cap_imports.begin();
+ while (p != cap_imports.end()) {
CInode *in = get_inode(p->first);
- assert(in);
+ if (!in) {
+ dout(10) << "process_imported_caps still missing " << p->first
+ << ", will try again after replayed client requests"
+ << dendl;
+ p++;
+ continue;
+ }
for (map<int, map<int,ceph_mds_cap_reconnect> >::iterator q = p->second.begin();
q != p->second.end();
++q)
if (r->first >= 0)
rejoin_import_cap(in, q->first, r->second, r->first);
}
- }
-
- process_reconnected_caps();
- identify_files_to_recover();
-
- rejoin_send_acks();
-
- // did we already get our acks too?
- // this happens when the rejoin_gather has to wait on a MISSING/FULL exchange.
- if (rejoin_ack_gather.empty()) {
- mds->rejoin_done();
-
- // finally, kickstart past snap parent opens
- open_snap_parents();
+ cap_imports.erase(p++); // remove and move on
}
}
map<inodeno_t,map<int,map<int,ceph_mds_cap_reconnect> > > cap_imports; // ino -> client -> frommds -> capex
map<inodeno_t,filepath> cap_import_paths;
+ set<inodeno_t> cap_imports_missing;
set<CInode*> rejoin_undef_inodes;
set<CInode*> rejoin_potential_updated_scatterlocks;
cap_imports[ino][client][frommds] = icr.capinfo;
cap_import_paths[ino] = filepath(icr.path, (__u64)icr.capinfo.pathbase);
}
+ ceph_mds_cap_reconnect *get_replay_cap_reconnect(inodeno_t ino, int client) {
+ if (cap_imports.count(ino) &&
+ cap_imports[ino].count(client) &&
+ cap_imports[ino][client].count(-1)) {
+ return &cap_imports[ino][client][-1];
+ }
+ return NULL;
+ }
+ void remove_replay_cap_reconnect(inodeno_t ino, int client) {
+ assert(cap_imports[ino].size() == 1);
+ assert(cap_imports[ino][client].size() == 1);
+ cap_imports.erase(ino);
+ }
// [reconnect/rejoin caps]
map<CInode*,map<int, inodeno_t> > reconnected_caps; // inode -> client -> realmino
void add_reconnected_snaprealm(int client, inodeno_t ino, snapid_t seq) {
reconnected_snaprealms[ino][client] = seq;
}
+ void process_imported_caps();
void process_reconnected_caps();
void prepare_realm_split(SnapRealm *realm, int client, inodeno_t ino,
map<int,MClientSnap*>& splits);
void open_remote_dentry(CDentry *dn, bool projected, Context *fin);
void _open_remote_dentry_finish(int r, CDentry *dn, bool projected, Context *fin);
- C_Gather *parallel_fetch(map<inodeno_t,filepath>& pathmap);
+ C_Gather *parallel_fetch(map<inodeno_t,filepath>& pathmap, set<inodeno_t>& missing);
void make_trace(vector<CDentry*>& trace, CInode *in);
snapid_t snapid = CEPH_NOSNAP;
CInode *snapdiri = 0;
if (tracei || tracedn)
- set_trace_dist(mdr->session, reply, tracei, tracedn, snapid, snapdiri, mdr);
+ set_trace_dist(mdr->session, reply, tracei, tracedn, snapid, snapdiri, mdr,
+ mdr->client_request->is_replay());
messenger->send_message(reply, client_inst);
// give any preallocated inos to the session
apply_allocated_inos(mdr);
+ bool is_replay = mdr->client_request->is_replay();
+
// clean up request, drop locks, etc.
// do this before replying, so that we can issue leases
Session *session = mdr->session;
// send reply, with trace, and possible leases
if (!did_early_reply && // don't issue leases if we sent an earlier reply already
(tracei || tracedn))
- set_trace_dist(session, reply, tracei, tracedn, snapid, snapdiri, mdr);
+ set_trace_dist(session, reply, tracei, tracedn, snapid, snapdiri, mdr, is_replay);
messenger->send_message(reply, client_inst);
}
*/
void Server::set_trace_dist(Session *session, MClientReply *reply, CInode *in, CDentry *dn,
snapid_t snapid, CInode *snapdiri,
- MDRequest *mdr)
+ MDRequest *mdr, bool is_replay)
{
// inode, dentry, dir, ..., inode
bufferlist bl;
dout(10) << "set_trace_dist snaprealm " << *realm << dendl;
}
- in->encode_inodestat(bl, session, snapid);
+ in->encode_inodestat(bl, session, snapid, is_replay);
dout(20) << "set_trace_dist added snapid " << snapid << " " << *in << dendl;
if (snapid != CEPH_NOSNAP && in == snapdiri) {
void reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei = 0, CDentry *tracedn = 0);
void set_trace_dist(Session *session, MClientReply *reply, CInode *in, CDentry *dn,
snapid_t snapid, CInode *snapdiri,
- MDRequest *mdr = 0);
+ MDRequest *mdr, bool is_replay);
void encode_empty_dirstat(bufferlist& bl);
void encode_infinite_lease(bufferlist& bl);
return head.op & CEPH_MDS_OP_FOLLOW_LINK;
}
-
+ bool is_replay() {
+ return head.retry_attempt == CEPH_MDS_REQUEST_REPLAY;
+ }
// normal fields
void set_tid(tid_t t) { head.tid = t; }