- surviving mds rejoins replicas from a recovering mds
- will tell auth if it holds an xlock
- recovering open files
- - need to journal EOpen
- - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
+/ - need to journal EOpen
+/ - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
- need to batch EOpen events when rejournaling to avoid looping
- recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
- path+cap window will require fetching and metadata from disk before doing the rejoin
+- client reconnect after resolve, before rejoin.
- clientmap request history
- hmm, so we need completion codes, but only after recovery.
- list of active tids < max
- journaled bit in MDRequest
- journal
+ - complete request list in MClientMap
+ - populated on replay
+ - periodic client->mds MClientLastRetry -> trim request list
+
/untested- truncate...
- how to get usage feedback to monitor?
-- change messenger entity_inst_t
- - no more rank! make it a uniquish nonce?
-
- clean up mds caps release in exporter
- figure out client failure modes
-- clean up messenger failure modes.
- add connection retry.
simplemessenger
- close idle connections
-- retry, timeout on connection or transmission failure
+- buffer sent messages until a receive is acknowledged (handshake!)
+ - retry, timeout on connection or transmission failure
- exponential backoff on monitor resend attempts (actually, this should go outside the messenger!)
objectcacher
// assign a unique tid
tid_t tid = ++last_tid;
req->set_tid(tid);
+ if (!mds_requests.empty())
+ req->set_oldest_client_tid(mds_requests.begin()->first);
// make note
MetaRequest request(req, tid);
* contacted if necessary).
*/
class ClientMap {
+private:
version_t version;
-
version_t projected;
version_t committing;
version_t committed;
map<version_t, list<Context*> > commit_waiters;
+public:
+ ClientMap() : version(0), projected(0), committing(0), committed(0) {}
+
+ version_t get_version() { return version; }
+ version_t get_projected() { return projected; }
+ version_t get_committing() { return committing; }
+ version_t get_committed() { return committed; }
+
+ version_t inc_projected() { return ++projected; }
+ void reset_projected() { projected = version; }
+ void set_committing(version_t v) { committing = v; }
+ void set_committed(version_t v) { committed = v; }
+
+ void add_commit_waiter(Context *c) {
+ commit_waiters[committing].push_back(c);
+ }
+ void take_commit_waiters(version_t v, list<Context*>& ls) {
+ ls.swap(commit_waiters[v]);
+ commit_waiters.erase(v);
+ }
+
+ // client mount, inst info
+private:
hash_map<int,entity_inst_t> client_inst;
set<int> client_mount;
hash_map<int, int> client_ref;
- /*
- hrm, we need completion codes, too.
- struct active_tid_set_t {
- tid_t max; // max tid we've journaled
- set<tid_t> active; // still-active tids that are < max
- };
- hash_map<int, active_tid_set_t> client_committed;
- */
-
void inc_ref(int client, const entity_inst_t& inst) {
if (client_inst.count(client)) {
assert(client_inst[client] == inst);
client_inst.erase(client);
}
}
-
-public:
- ClientMap() : version(0), projected(0), committing(0), committed(0) {}
-
- version_t get_version() { return version; }
- version_t get_projected() { return projected; }
- version_t get_committing() { return committing; }
- version_t get_committed() { return committed; }
-
- version_t inc_projected() { return ++projected; }
- void reset_projected() { projected = version; }
- void set_committing(version_t v) { committing = v; }
- void set_committed(version_t v) { committed = v; }
-
- void add_commit_waiter(Context *c) {
- commit_waiters[committing].push_back(c);
- }
- void take_commit_waiters(version_t v, list<Context*>& ls) {
- ls.swap(commit_waiters[v]);
- commit_waiters.erase(v);
- }
+public:
bool empty() {
return client_inst.empty() && client_mount.empty() && client_ref.empty();
}
//version++;
}
+
+private:
+ // -- completed requests --
+ // client id -> tid -> result code
+ map<int, set<tid_t> > completed_requests; // completed client requests
+
+public:
+ void add_completed_request(metareqid_t ri) {
+ completed_requests[ri.client].insert(ri.tid);
+ }
+ void trim_completed_requests(int client, tid_t mintid) {
+ if (completed_requests.count(client) == 0) return;
+ set<tid_t>& ls = completed_requests[client];
+ while (!ls.empty() && *ls.begin() < mintid)
+ ls.erase(ls.begin());
+ if (ls.empty())
+ completed_requests.erase(client);
+ }
+ bool have_completed_request(metareqid_t ri) {
+ return completed_requests.count(ri.client) &&
+ completed_requests[ri.client].count(ri.tid);
+ }
+
+
+ // -- encoding --
void encode(bufferlist& bl) {
bl.append((char*)&version, sizeof(version));
::_encode(client_inst, bl);
<< " (" << strerror(-reply->get_result())
<< ") " << *req << endl;
+ // note result code in clientmap?
+ if (!req->is_idempotent())
+ mds->clientmap.add_completed_request(mdr->reqid);
+
// include trace
if (tracei) {
reply->set_trace_dist( tracei, mds->get_nodeid() );
}
// send reply
- messenger->send_message(reply,
- req->get_client_inst());
+ messenger->send_message(reply, req->get_client_inst());
// finish request
mdcache->request_finish(mdr);
// okay, i want
CInode *ref = 0;
+ // retry?
+ if (req->get_retry_attempt()) {
+ if (mds->clientmap.have_completed_request(req->get_reqid())) {
+ dout(5) << "already completed " << req->get_reqid() << endl;
+ mds->messenger->send_message(new MClientReply(req, 0),
+ req->get_source_inst());
+ delete req;
+ return;
+ }
+ }
+ // trim completed_request list
+ if (req->get_oldest_client_tid())
+ mds->clientmap.trim_completed_requests(req->get_source().num(),
+ req->get_oldest_client_tid());
+
// -----
// some ops are on ino's
<< " to " << p->second << endl;
mds->mdcache->add_recovered_purge(p->first, p->second);
}
+
+ // client requests
+ for (list<metareqid_t>::iterator p = client_reqs.begin();
+ p != client_reqs.end();
+ ++p)
+ mds->clientmap.add_completed_request(*p);
}
// -----------------------
// -----------------------
-// EUpdate
+// ESlaveUpdate
bool ESlaveUpdate::has_expired(MDS *mds)
{
class MClientRequest : public Message {
struct {
- long tid;
+ tid_t tid;
+ tid_t oldest_client_tid;
int num_fwd;
int retry_attempt;
inodeno_t mds_wants_replica_in_dirino;
// normal fields
- void set_tid(long t) { st.tid = t; }
+ void set_tid(tid_t t) { st.tid = t; }
+ void set_oldest_client_tid(tid_t t) { st.oldest_client_tid = t; }
void inc_num_fwd() { st.num_fwd++; }
void set_retry_attempt(int a) { st.retry_attempt = a; }
void set_path(string& p) { path.set_path(p); }
const entity_inst_t& get_client_inst() { return st.client_inst; }
int get_client() { return st.client_inst.name.num(); }
- long get_tid() { return st.tid; }
+ tid_t get_tid() { return st.tid; }
+ tid_t get_oldest_client_tid() { return st.oldest_client_tid; }
int get_num_fwd() { return st.num_fwd; }
int get_retry_attempt() { return st.retry_attempt; }
int get_op() { return st.op; }