}
}
-Inode * Client::add_update_inode(InodeStat *st, utime_t from, int mds)
+Inode * Client::add_update_inode(InodeStat *st, utime_t from, MetaSession *session)
{
Inode *in;
bool was_new = false;
in->dirfragtree = st->dirfragtree;
if (in->snapid == CEPH_NOSNAP)
- add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags);
+ add_update_cap(in, session, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags);
else
in->snap_caps |= st->cap.caps;
* insert_dentry_inode - insert + link a single dentry + inode into the metadata cache.
*/
Dentry *Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease,
- Inode *in, utime_t from, int mds, bool set_offset,
+ Inode *in, utime_t from, MetaSession *session, bool set_offset,
Dentry *old_dentry)
{
Dentry *dn = NULL;
}
}
- update_dentry_lease(dn, dlease, from, mds);
+ update_dentry_lease(dn, dlease, from, session);
return dn;
}
-void Client::update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, int mds)
+void Client::update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, MetaSession *session)
{
utime_t dttl = from;
dttl += (float)dlease->duration_ms / 1000.0;
ldout(cct, 10) << "got dentry lease on " << dn->name
<< " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl;
dn->lease_ttl = dttl;
- dn->lease_mds = mds;
+ dn->lease_mds = session->mds_num;
dn->lease_seq = dlease->seq;
- dn->lease_gen = mds_sessions[mds]->cap_gen;
+ dn->lease_gen = session->cap_gen;
}
}
dn->cap_shared_gen = dn->dir->parent_inode->shared_gen;
/*
* insert results from readdir or lssnap into the metadata cache.
*/
-void Client::insert_readdir_results(MetaRequest *request, int mds, Inode *diri) {
+void Client::insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri) {
MClientReply *reply = request->reply;
Connection *con = request->reply->get_connection();
else
ldout(cct, 15) << " pd is '" << pd->first << "' dn " << pd->second << dendl;
- Inode *in = add_update_inode(&ist, request->sent_stamp, mds);
+ Inode *in = add_update_inode(&ist, request->sent_stamp, session);
Dentry *dn;
if (pd != dir->dentry_map.end() &&
pd->first == dname) {
// new dn
dn = link(dir, dname, in, NULL);
}
- update_dentry_lease(dn, &dlease, request->sent_stamp, mds);
+ update_dentry_lease(dn, &dlease, request->sent_stamp, session);
dn->offset = dir_result_t::make_fpos(request->readdir_frag, i + request->readdir_offset);
// add to cached result list
*
* insert a trace from a MDS reply into the cache.
*/
-Inode* Client::insert_trace(MetaRequest *request, int mds)
+Inode* Client::insert_trace(MetaRequest *request, MetaSession *session)
{
MClientReply *reply = request->reply;
- ldout(cct, 10) << "insert_trace from " << request->sent_stamp << " mds." << mds
+ ldout(cct, 10) << "insert_trace from " << request->sent_stamp << " mds." << session->mds_num
<< " is_target=" << (int)reply->head.is_target
<< " is_dentry=" << (int)reply->head.is_dentry
<< dendl;
if (reply->head.is_target) {
ist.decode(p, features);
- in = add_update_inode(&ist, request->sent_stamp, mds);
+ in = add_update_inode(&ist, request->sent_stamp, session);
}
if (reply->head.is_dentry) {
- Inode *diri = add_update_inode(&dirst, request->sent_stamp, mds);
+ Inode *diri = add_update_inode(&dirst, request->sent_stamp, session);
update_dir_dist(diri, &dst); // dir stat info is attached to ..
if (in) {
Dir *dir = diri->open_dir();
- insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, mds, true,
+ insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session, true,
((request->head.op == CEPH_MDS_OP_RENAME) ?
request->old_dentry : NULL));
} else {
if (in) {
Dir *dir = diri->open_dir();
- insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, mds, true);
+ insert_dentry_inode(dir, dname, &dlease, in, request->sent_stamp, session, true);
} else {
if (diri->dir && diri->dir->dentries.count(dname)) {
Dentry *dn = diri->dir->dentries[dname];
if (in && (reply->head.op == CEPH_MDS_OP_READDIR ||
reply->head.op == CEPH_MDS_OP_LSSNAP)) {
- insert_readdir_results(request, mds, in);
+ insert_readdir_results(request, session, in);
}
request->target = in;
ldout(cct, 20) << "have to return ESTALE" << dendl;
}
- int mds = reply->get_source().num();
assert(request->reply == NULL);
request->reply = reply;
- insert_trace(request, mds);
+ insert_trace(request, mds_sessions[mds_num]);
if (!request->got_unsafe) {
request->got_unsafe = true;
delayed_caps.push_back(&in->cap_item);
}
-void Client::send_cap(Inode *in, int mds, Cap *cap, int used, int want, int retain, int flush)
+void Client::send_cap(Inode *in, MetaSession *session, Cap *cap,
+ int used, int want, int retain, int flush)
{
int held = cap->issued | cap->implemented;
int revoking = cap->implemented & ~cap->issued;
int op = CEPH_CAP_OP_UPDATE;
ldout(cct, 10) << "send_cap " << *in
- << " mds." << mds << " seq " << cap->seq
+ << " mds." << session->mds_num << " seq " << cap->seq
<< " used " << ccap_string(used)
<< " want " << ccap_string(want)
<< " flush " << ccap_string(flush)
in->requested_max_size = in->wanted_max_size;
ldout(cct, 15) << "auth cap, setting max_size = " << in->requested_max_size << dendl;
}
- messenger->send_message(m, mdsmap->get_inst(mds));
+ messenger->send_message(m, mdsmap->get_inst(session->mds_num));
}
Cap *cap = it->second;
it++;
+ MetaSession *session = mds_sessions[mds];
+ assert(session);
+
int revoking = cap->implemented & ~cap->issued;
ldout(cct, 10) << " cap mds." << mds
else
flushing = 0;
- send_cap(in, mds, cap, used, wanted, retain, flushing);
+ send_cap(in, session, cap, used, wanted, retain, flushing);
}
}
}
}
-void Client::add_update_cap(Inode *in, int mds, uint64_t cap_id,
+void Client::add_update_cap(Inode *in, MetaSession *mds_session, uint64_t cap_id,
unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm,
int flags)
{
Cap *cap = 0;
- MetaSession *mds_session = mds_sessions[mds];
+ int mds = mds_session->mds_num;
if (in->caps.count(mds)) {
cap = in->caps[mds];
} else {
}
}
-void Client::flush_caps(Inode *in, int mds)
+void Client::flush_caps(Inode *in, MetaSession *session)
{
- ldout(cct, 10) << "flush_caps " << in << " mds." << mds << dendl;
+ ldout(cct, 10) << "flush_caps " << in << " mds." << session->mds_num << dendl;
Cap *cap = in->auth_cap;
- assert(cap->session->mds_num == mds);
+ assert(cap->session == session);
int wanted = in->caps_wanted();
int retain = wanted | CEPH_CAP_PIN;
- send_cap(in, mds, cap, in->caps_used(), wanted, retain, in->flushing_caps);
+ send_cap(in, session, cap, in->caps_used(), wanted, retain, in->flushing_caps);
}
void Client::wait_sync_caps(uint64_t want)
Inode *in = *p;
ldout(cct, 20) << " reflushing caps on " << *in << " to mds." << mds << dendl;
if (in->flushing_caps)
- flush_caps(in, mds);
+ flush_caps(in, session);
}
}
void Client::handle_cap_import(Inode *in, MClientCaps *m)
{
int mds = m->get_source().num();
+ MetaSession *session = mds_sessions[mds];
// add/update it
update_snap_trace(m->snapbl);
- add_update_cap(in, mds, m->get_cap_id(),
+ add_update_cap(in, session, m->get_cap_id(),
m->get_caps(), m->get_seq(), m->get_mseq(), m->get_realm(),
CEPH_CAP_FLAG_AUTH);
if (in->cap_snaps.size())
flush_snaps(in, true);
if (in->flushing_caps)
- flush_caps(in, mds);
+ flush_caps(in, session);
}
if (m->get_mseq() > in->exporting_mseq) {
if (!syncdataonly && (in->dirty_caps & ~CEPH_CAP_ANY_FILE_WR)) {
for (map<int, Cap*>::iterator iter = in->caps.begin(); iter != in->caps.end(); ++iter) {
if (iter->second->implemented & ~CEPH_CAP_ANY_FILE_WR) {
- flush_caps(in, iter->first);
+ MetaSession *session = mds_sessions[iter->first];
+ assert(session);
+ flush_caps(in, session);
}
}
wait_on_flush = in->last_flush_tid;
// file caps
void check_cap_issue(Inode *in, Cap *cap, unsigned issued);
- void add_update_cap(Inode *in, int mds, uint64_t cap_id,
+ void add_update_cap(Inode *in, MetaSession *session, uint64_t cap_id,
unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm,
int flags);
void remove_cap(Cap *cap);
void mark_caps_dirty(Inode *in, int caps);
int mark_caps_flushing(Inode *in);
void flush_caps();
- void flush_caps(Inode *in, int mds);
+ void flush_caps(Inode *in, MetaSession *session);
void kick_flushing_caps(int mds);
int get_caps(Inode *in, int need, int want, int *have, loff_t endoff);
void handle_cap_flushsnap_ack(Inode *in, class MClientCaps *m);
void handle_cap_grant(Inode *in, int mds, Cap *cap, class MClientCaps *m);
void cap_delay_requeue(Inode *in);
- void send_cap(Inode *in, int mds, Cap *cap, int used, int want, int retain, int flush);
+ void send_cap(Inode *in, MetaSession *session, Cap *cap,
+ int used, int want, int retain, int flush);
void check_caps(Inode *in, bool is_delayed);
void get_cap_ref(Inode *in, int cap);
void put_cap_ref(Inode *in, int cap);
// metadata cache
void update_dir_dist(Inode *in, DirStat *st);
- void insert_readdir_results(MetaRequest *request, int mds, Inode *diri);
- Inode* insert_trace(MetaRequest *request, int mds);
+ void insert_readdir_results(MetaRequest *request, MetaSession *session, Inode *diri);
+ Inode* insert_trace(MetaRequest *request, MetaSession *session);
void update_inode_file_bits(Inode *in,
uint64_t truncate_seq, uint64_t truncate_size, uint64_t size,
uint64_t time_warp_seq, utime_t ctime, utime_t mtime, utime_t atime,
int issued);
- Inode *add_update_inode(InodeStat *st, utime_t ttl, int mds);
+ Inode *add_update_inode(InodeStat *st, utime_t ttl, MetaSession *session);
Dentry *insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease,
- Inode *in, utime_t from, int mds, bool set_offset,
+ Inode *in, utime_t from, MetaSession *session, bool set_offset,
Dentry *old_dentry = NULL);
- void update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, int mds);
+ void update_dentry_lease(Dentry *dn, LeaseStat *dlease, utime_t from, MetaSession *session);
// ----------------------