req->set_caller_uid(uid);
req->set_caller_gid(gid);
- // encode payload now, in case we have to resend (in case of mds failure)
+ // encode payload now, in case we have to resend(in case of mds failure)
req->encode_payload();
request->request_payload = req->get_payload();
// insert trace
utime_t from = request->sent_stamp;
- Inode *target = insert_trace(request, from, mds);
+ Inode *target = insert_trace(request->get(), from, mds);
if (ptarget)
*ptarget = target;
int mds_num = reply->get_source().num();
MetaRequest *request = mds_requests[tid]->get();
assert(request);
-
+
// store reply
- if (!request->reply && !reply->is_safe()) //safe replies have no useful info
- request->reply = reply;
-
+ // if (!request->reply && !reply->is_safe()) //safe replies have no useful info
+ request->reply = reply;
+
if ((request->got_unsafe && !reply->is_safe())
|| (request->got_safe && reply->is_safe())) {
//duplicate response
request->put();
return;
}
-
-
+
if(reply->is_safe()) {
//the filesystem change is committed to disk
request->got_safe = true;
- if (request->got_unsafe)
+ if (request->got_unsafe) {
//we're done, clean up
+ request->remove_from_unsafe_list();
goto cleanup;
+ }
}
if(!reply->is_safe()) {
request->got_unsafe = true;
if(request->got_safe)
- //we already kicked, so don't do that, just clean up
+ //we already kicked, so just clean up
goto cleanup;
+ mds_sessions[mds_num].unsafe_requests.push_back(request->get_meta_item());
}
if(request->got_safe ^ request->got_unsafe) {
Cond cond;
cond.Wait(client_lock);
}
}
+
cleanup:
request->put();
}
}
}
-
// reset my cap seq number
mds_sessions[mds].seq = 0;
+
+ resend_unsafe_requests(mds);
} else {
dout(10) << " i had no session with this mds" << dendl;
m->closed = true;
p->second->caller_cond->Signal();
}
else {
- send_request(p->second, mds);
+ send_request(p->second->get(), mds);
}
}
}
-
+void Client::resend_unsafe_requests(int mds_num) {
+ MDSSession& mds = mds_sessions[mds_num];
+ MetaRequest* current = mds.unsafe_requests.front()->get();
+ MClientRequest *m;
+ while (current) {
+ current->remove_from_unsafe_list();
+ m = new MClientRequest;
+ m->copy_payload(current->request_payload);
+ m->decode_payload();
+ m->set_retry_attempt(current->retry_attempt);
+ m->set_dentry_wanted();
+ m->set_replayed_op();
+ current->request = m;
+ current->got_unsafe = false;
+ current->got_safe = false;
+ send_request(current->get(), mds_num);
+ current->put();
+ current = mds.unsafe_requests.front();
+ }
+}
/************
* leases
*/
struct InodeCap;
+
+struct MetaRequest {
+ tid_t tid;
+ MClientRequest *request;
+ bufferlist request_payload; // in case i have to retry
+
+ int uid, gid;
+
+ utime_t sent_stamp;
+ set<int> mds; // who i am asking
+ int resend_mds; // someone wants you to (re)send the request here
+ int num_fwd; // # of times i've been forwarded
+ int retry_attempt;
+ int ref;
+
+ MClientReply *reply; // the reply
+
+ //possible responses
+ bool got_safe;
+ bool got_unsafe;
+
+private:
+ xlist<MetaRequest*>::item unsafe_item;
+public:
+ Cond *caller_cond; // who to take up
+ Cond *dispatch_cond; // who to kick back
+
+ MetaRequest(MClientRequest *req, tid_t t) :
+ tid(t), request(req),
+ resend_mds(-1), num_fwd(0), retry_attempt(0),
+ ref(1), reply(0),
+ got_safe(false), got_unsafe(false), unsafe_item(this),
+ caller_cond(0), dispatch_cond(0) { }
+
+ MetaRequest* get() {
+ ++ref;
+ cerr << "Get called on MetaRequest tid " << tid
+ << "Refcount is " << ref
+ << " Type is " << request->head.op << std::endl;
+ return this; }
+
+ void put() {
+ cerr << "Put called on MetaRequest tid " << tid;
+ if (--ref == 0) {
+ cerr << "MetaRequest tid" << tid << " deleting." << std::endl;
+ delete this;
+ }
+ cerr << "Refcount is " << ref
+ << " Type is " << request->head.op << std::endl;
+ }
+
+ xlist<MetaRequest*>::item * get_meta_item() {
+ get();
+ return &unsafe_item;
+ }
+
+ void remove_from_unsafe_list() {
+ unsafe_item.remove_myself();
+ put();
+ }
+};
+
+
struct MDSSession {
version_t seq;
__u64 cap_gen;
bool was_stale;
xlist<InodeCap*> caps;
+ xlist<MetaRequest*> unsafe_requests;
MClientCapRelease *release;
void got_mds_push(int mds);
void handle_client_session(MClientSession *m);
void send_reconnect(int mds);
+ void resend_unsafe_requests(int mds);
// mds requests
- struct MetaRequest {
- tid_t tid;
- MClientRequest *request;
- bufferlist request_payload; // in case i have to retry
-
- int uid, gid;
-
- utime_t sent_stamp;
- set<int> mds; // who i am asking
- int resend_mds; // someone wants you to (re)send the request here
- int num_fwd; // # of times i've been forwarded
- int retry_attempt;
- int ref;
-
- MClientReply *reply; // the reply
-
- //possible responses
- bool got_safe;
- bool got_unsafe;
-
- Cond *caller_cond; // who to take up
- Cond *dispatch_cond; // who to kick back
-
- MetaRequest(MClientRequest *req, tid_t t) :
- tid(t), request(req),
- resend_mds(-1), num_fwd(0), retry_attempt(0),
- ref(1), reply(0),
- got_safe(false), got_unsafe(false),
- caller_cond(0), dispatch_cond(0) { }
-
- MetaRequest* get() {
- ++ref;
- dout(20) << "Get called on MetaRequest " << this << std::endl
- << "Refcount is " << ref << dendl;
- return this; }
- void put() {
- dout(20) << "Put called on MetaRequest " << this << dendl;
- if (--ref == 0) {
- dout(20) << "MetaRequest " << this << " deleting." << dendl;
- delete this;
- }
- dout(20) << "Refcount is " << ref << dendl;
- }
- };
+
tid_t last_tid;
map<tid_t, MetaRequest*> mds_requests;
set<int> failed_mds;
void set_dentry_wanted() {
head.flags = head.flags | CEPH_MDS_FLAG_WANT_DENTRY;
}
+ void set_replayed_op() {
+ head.flags = head.flags | CEPH_MDS_FLAG_REPLAY;
+ }
tid_t get_tid() { return head.tid; }
tid_t get_oldest_client_tid() { return head.oldest_client_tid; }