#include "messages/MClientRequestForward.h"
#include "messages/MClientReply.h"
#include "messages/MClientCaps.h"
+#include "messages/MClientCapRelease.h"
#include "messages/MClientLease.h"
#include "messages/MClientSnap.h"
}
}
- //utime_t ttl = from;
- //ttl += (float)lease->duration_ms * 1000.0;
-
//dout(12) << "update_inode mask " << lease->mask << " ttl " << ttl << dendl;
dout(12) << "add_update_inode " << *in << " caps " << ccap_string(st->cap.caps) << dendl;
if (st->cap.caps) {
if (in->snapid == CEPH_NOSNAP)
- add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm));
+ add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags);
else {
in->snap_caps |= st->cap.caps;
}
Inode *in, utime_t from, int mds)
{
utime_t dttl = from;
- dttl += (float)dlease->duration_ms * 1000.0;
+ dttl += (float)dlease->duration_ms / 1000.0;
Dentry *dn = NULL;
if (dir->dentries.count(dname))
assert(dn && dn->inode);
if (dlease->mask & CEPH_LOCK_DN) {
- utime_t ttl = from;
- ttl += (float)dlease->duration_ms * 1000.0;
- if (ttl > dn->lease_ttl) {
- dout(10) << "got dentry lease on " << dname << " dur " << dlease->duration_ms << "ms ttl " << ttl << dendl;
- dn->lease_ttl = ttl;
- dn->lease_mds = from;
+ if (dttl > dn->lease_ttl) {
+ dout(10) << "got dentry lease on " << dname
+ << " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl;
+ dn->lease_ttl = dttl;
+ dn->lease_mds = mds;
dn->lease_seq = dlease->seq;
}
}
MClientRequest *req = request->request;
MClientReply *reply = request->reply;
+ dout(10) << "insert_trace from " << from << " mds" << mds << dendl;
+
bufferlist::iterator p = reply->get_trace_bl().begin();
if (p.end()) {
dout(10) << "insert_trace -- no trace" << dendl;
}
}
+ // insert readdir results too
+
+ // the rest?
+ p = reply->get_dir_bl().begin();
+ if (!p.end()) {
+ // only open dir if we're actually adding stuff to it!
+ Dir *dir = in->open_dir();
+ assert(dir);
+
+ // dirstat
+ DirStat dst(p);
+ __u32 numdn;
+ __u8 complete, end;
+ ::decode(numdn, p);
+ ::decode(end, p);
+ ::decode(complete, p);
+
+ string dname;
+ LeaseStat dlease;
+ while (numdn) {
+ ::decode(dname, p);
+ ::decode(dlease, p);
+ InodeStat ist(p);
+
+ Inode *in = add_update_inode(&ist, from, mds);
+ insert_dentry_inode(dir, dname, &dlease, in, from, mds);
+
+ numdn--;
+ }
+
+ if (dir->is_empty())
+ close_dir(dir);
+ }
+
return in;
}
-
// -------
int Client::choose_target_mds(MClientRequest *req)
MClientReply *Client::make_request(MClientRequest *req,
int uid, int gid,
- utime_t *pfrom,
Inode **ptarget,
int use_mds)
{
// insert trace
utime_t from = request.sent_stamp;
- if (pfrom)
- *pfrom = from;
Inode *target = insert_trace(&request, from, mds);
if (ptarget)
*ptarget = target;
}
Dentry *dn = in->dir->dentries[m->dname];
dout(10) << " revoked DN lease on " << dn << dendl;
- if (dn->lease_mds == mds)
- seq = dn->lease_seq;
dn->lease_mds = -1;
}
//cout << "put_inode on " << in << " " << in->inode.ino << endl;
in->put(n);
if (in->ref == 0) {
+ // release any caps
+ remove_all_caps(in);
+
//cout << "put_inode deleting " << in << " " << in->inode.ino << std::endl;
objectcacher->release_set(in->ino());
if (in->snapdir_parent)
delayed_caps.push_back(&in->cap_item);
}
-void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain)
+void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain, int flush)
{
int held = cap->issued | cap->implemented;
int revoking = cap->implemented & ~cap->issued;
<< " mds" << mds << " seq " << cap->seq
<< " used " << ccap_string(used)
<< " want " << ccap_string(want)
+ << " flush " << ccap_string(flush)
<< " retain " << ccap_string(retain)
<< " held "<< ccap_string(held)
<< " revoking " << ccap_string(revoking)
<< " dropping " << ccap_string(dropping)
<< dendl;
-
- int dirty = in->caps_dirty();
- cap->flushing |= dirty & held;
- if (cap->flushing) {
- dout(10) << " flushing " << ccap_string(cap->flushing) << dendl;
- in->dirty_caps &= ~cap->flushing;
- }
-
cap->issued &= retain;
if (revoking && (revoking & used) == 0) {
cap->cap_id, cap->seq,
cap->issued,
want,
- cap->flushing,
+ flush,
cap->mseq);
m->head.uid = in->inode.uid;
in->inode.atime.encode_timeval(&m->head.atime);
in->inode.ctime.encode_timeval(&m->head.ctime);
m->head.time_warp_seq = in->inode.time_warp_seq;
-
-
+
in->reported_size = in->inode.size;
m->set_max_size(in->wanted_max_size);
in->requested_max_size = in->wanted_max_size;
m->set_snap_follows(in->snaprealm->get_snap_context().seq);
messenger->send_message(m, mdsmap->get_inst(mds));
-
- if (cap->flushing == 0 && cap->issued == 0)
- remove_cap(in, mds);
}
{
unsigned wanted = in->caps_wanted();
unsigned used = in->caps_used();
+ int flush = 0;
int retain = wanted;
if (!unmounting) {
dout(10) << "delaying cap release" << dendl;
continue;
}
-
+
ack:
- send_cap(in, mds, cap, used, wanted, retain);
+ if (cap == in->auth_cap) {
+ flush = in->dirty_caps;
+ in->flushing_caps |= flush;
+ in->dirty_caps = 0;
+ dout(10) << " flushing " << ccap_string(flush) << dendl;
+ }
+
+ send_cap(in, mds, cap, used, wanted, retain, flush);
}
}
* do not block.
*/
void Client::add_update_cap(Inode *in, int mds, __u64 cap_id,
- unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm)
+ unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm,
+ int flags)
{
InodeCap *cap = 0;
if (in->caps.count(mds)) {
assert(in->snaprealm == 0);
in->snaprealm = get_snap_realm(realm);
in->snaprealm->inodes_with_caps.push_back(&in->snaprealm_item);
- in->get();
dout(15) << "add_update_cap first one, opened snaprealm " << in->snaprealm << dendl;
}
if (in->exporting_mds == mds) {
cap_list.push_back(&in->cap_item);
}
+ if (flags & CEPH_CAP_FLAG_AUTH)
+ in->auth_cap = cap;
+
unsigned old_caps = cap->issued;
cap->cap_id = cap_id;
cap->issued |= issued;
void Client::remove_cap(Inode *in, int mds)
{
dout(10) << "remove_cap mds" << mds << " on " << *in << dendl;
+ InodeCap *cap = in->caps[mds];
+ MDSSession *session = &mds_sessions[mds];
+
+ if (!session->release)
+ session->release = new MClientCapRelease;
+ ceph_mds_cap_item i;
+ i.ino = in->ino();
+ i.cap_id = cap->cap_id;
+ i.seq = cap->seq;
+ i.migrate_seq = cap->mseq;
+ session->release->caps.push_back(i);
+
+ if (in->auth_cap == cap)
+ in->auth_cap = NULL;
assert(in->caps.count(mds));
in->caps.erase(mds);
+
if (!in->is_any_caps()) {
dout(15) << "remove_cap last one, closing snaprealm " << in->snaprealm << dendl;
put_snap_realm(in->snaprealm);
in->snaprealm = 0;
in->snaprealm_item.remove_myself();
- put_inode(in);
}
}
void Client::remove_all_caps(Inode *in)
{
- bool wasempty = in->caps.empty();
- in->caps.clear();
- if (!wasempty) {
- dout(15) << "remove_all_caps closing snaprealm " << in->snaprealm << dendl;
- put_snap_realm(in->snaprealm);
- in->snaprealm = 0;
- in->snaprealm_item.remove_myself();
- put_inode(in);
- }
+ while (in->caps.size())
+ remove_cap(in, in->caps.begin()->first);
+}
+
+void Client::mark_caps_dirty(Inode *in, int caps)
+{
+ dout(10) << "mark_caps_dirty " << *in << " " << ccap_string(in->dirty_caps) << " -> "
+ << ccap_string(in->dirty_caps | caps) << dendl;
+ if (caps & !in->dirty_caps)
+ in->get();
+ in->dirty_caps |= caps;
}
void SnapRealm::build_snap_context()
// add/update it
update_snap_trace(m->snapbl);
add_update_cap(in, mds, m->get_cap_id(),
- m->get_caps(), m->get_seq(), m->get_mseq(), m->get_realm());
+ m->get_caps(), m->get_seq(), m->get_mseq(), m->get_realm(),
+ CEPH_CAP_FLAG_AUTH);
if (m->get_mseq() > in->exporting_mseq) {
dout(5) << "handle_cap_import ino " << m->get_ino() << " mseq " << m->get_mseq()
void Client::handle_cap_flush_ack(Inode *in, int mds, InodeCap *cap, MClientCaps *m)
{
- int cleaned = m->get_dirty() & ~m->get_caps();
+ int cleaned = m->get_dirty();
dout(5) << "handle_cap_flush_ack mds" << mds
<< " cleaned " << ccap_string(cleaned) << " on " << *in << dendl;
- cap->flushing &= ~cleaned;
- dout(5) << " cap->flushing now " << ccap_string(cap->flushing)
- << ", in->caps_dirty() now " << ccap_string(in->caps_dirty()) << dendl;
- if (m->get_caps() == 0 && m->get_seq() == cap->seq) {
- assert(in->caps_dirty() == 0);
- remove_cap(in, mds);
+ if (in->flushing_caps) {
+ dout(5) << " flushing_caps " << ccap_string(in->flushing_caps)
+ << " -> " << ccap_string(in->flushing_caps & ~cleaned) << dendl;
+ in->flushing_caps &= ~cleaned;
+ if (!in->caps_dirty())
+ put_inode(in);
}
+
delete m;
}
timer.add_event_after(g_conf.client_tick_interval, tick_event);
utime_t now = g_clock.now();
- utime_t el = now - last_cap_renew;
- if (mdsmap && el > mdsmap->get_session_timeout() / 3.0)
- renew_caps();
+
+ if (mdsmap) {
+ // renew caps?
+ utime_t el = now - last_cap_renew;
+ if (el > mdsmap->get_session_timeout() / 3.0)
+ renew_caps();
+
+ // send any cap releases
+ for (map<int,MDSSession>::iterator p = mds_sessions.begin();
+ p != mds_sessions.end();
+ p++) {
+ if (p->second.release) {
+ messenger->send_message(p->second.release, mdsmap->get_inst(p->first));
+ p->second.release = 0;
+ }
+ }
+ }
// delayed caps
xlist<Inode*>::iterator p = delayed_caps.begin();
cap_list.push_back(&in->cap_item);
check_caps(in, true);
}
+
}
void Client::renew_caps()
req->head.args.stat.mask = 0;
dout(10) << "_lookup on " << path << dendl;
- MClientReply *reply = make_request(req, 0, 0, 0, target);
+ MClientReply *reply = make_request(req, 0, 0, target);
int r = reply->get_result();
dout(10) << "_lookup res is " << r << dendl;
delete reply;
if (dir->dir &&
dir->dir->dentries.count(dname)) {
Dentry *dn = dir->dir->dentries[dname];
- *target = dn->inode;
- goto done;
+
+ dout(20) << " have dn " << dname << " mds" << dn->lease_mds << " ttl " << dn->lease_ttl
+ << " seq " << dn->lease_seq
+ << dendl;
+
+ // is lease valid?
+ if (dn->lease_mds >= 0 &&
+ dn->lease_ttl > g_clock.now()) {
+ *target = dn->inode;
+ goto done;
+ }
}
r = _do_lookup(dir, dname.c_str(), target);
req->set_filepath(path);
req->head.args.readdir.frag = fg;
- utime_t from;
- MClientReply *reply = make_request(req, -1, -1, &from);
+ MClientReply *reply = make_request(req, -1, -1);
int res = reply->get_result();
- int mds = reply->get_source().num();
if (res == -EAGAIN) {
dout(10) << "_readdir_get_frag got EAGAIN, retrying" << dendl;
// the rest?
bufferlist::iterator p = reply->get_dir_bl().begin();
if (!p.end()) {
- // only open dir if we're actually adding stuff to it!
- Dir *dir = diri->open_dir();
- assert(dir);
-
// dirstat
DirStat dst(p);
__u32 numdn;
::decode(dlease, p);
InodeStat ist(p);
- Inode *in = add_update_inode(&ist, from, mds);
- insert_dentry_inode(dir, dname, &dlease, in, from, mds);
-
- // caller
+ Inode *in = _ll_get_inode(ist.vino);
dout(15) << "_readdir_get_frag got " << dname << " to " << in->inode.ino << dendl;
_readdir_add_dirent(dirp, dname, in);
numdn--;
}
-
- if (dir->is_empty())
- close_dir(dir);
}
-
- // FIXME: remove items in cache that weren't in my readdir?
- // ***
} else {
dout(10) << "_readdir_get_frag got error " << res << ", setting end flag" << dendl;
dirp->set_end();
filepath path;
in->make_path(path);
req->set_filepath(path);
- req->head.args.open.flags = flags;
+ req->head.args.open.flags = flags & ~O_CREAT;
req->head.args.open.mode = mode;
int cmode = ceph_flags_to_mode(flags);
in->get_open_ref(cmode); // make note of pending open, since it effects _wanted_ caps.
-
- MClientReply *reply = make_request(req, uid, gid);
- int result = reply->get_result();
+
+ int want = ceph_caps_for_mode(cmode);
+
+ int result = 0;
+ if ((in->caps_issued() & want) == want) {
+ // update wanted?
+ check_caps(in, true);
+ } else {
+ MClientReply *reply = make_request(req, uid, gid);
+ result = reply->get_result();
+ delete reply;
+ }
// success?
if (result >= 0) {
in->put_open_ref(cmode);
}
- delete reply;
-
trim_cache();
return result;
}
+int Client::_sync_fs()
+{
+ dout(10) << "_sync_fs" << dendl;
+ return 0;
+}
+
+int Client::sync_fs()
+{
+ Mutex::Locker l(client_lock);
+ return _sync_fs();
+}
+
+
int Client::lazyio_propogate(int fd, loff_t offset, size_t count)
{
client_lock.Lock();
tout << flags << std::endl;
Inode *dir = _ll_get_inode(parent);
- int r = _mknod(dir, name, 0, 0);
+ int r = _mknod(dir, name, mode, 0);
if (r < 0)
return r;
Dentry *dn = dir->dir->dentries[name];
class MClientMountAck;
class MClientRequestForward;
class MClientLease;
+class MClientCaps;
+class MClientCapRelease;
class MMonMap;
class Filer;
unsigned issued;
unsigned implemented;
unsigned wanted; // as known to mds.
- unsigned flushing;
__u64 seq;
__u32 mseq; // migration seq
- InodeCap() : issued(0), implemented(0), wanted(0), flushing(0), seq(0), mseq(0) {}
+ InodeCap() : issued(0), implemented(0), wanted(0), seq(0), mseq(0) {}
};
struct CapSnap {
// per-mds caps
map<int,InodeCap*> caps; // mds -> InodeCap
- unsigned dirty_caps;
+ InodeCap *auth_cap;
+ unsigned dirty_caps, flushing_caps;
int snap_caps, snap_cap_refs;
unsigned exporting_issued;
int exporting_mds;
//inode(_inode),
snapid(vino.snapid),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
- dirty_caps(0),
+ dirty_caps(0), flushing_caps(0),
snap_caps(0), snap_cap_refs(0),
exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
cap_item(this),
return want;
}
int caps_dirty() {
- int flushing = dirty_caps;
- for (map<int,InodeCap*>::iterator it = caps.begin();
- it != caps.end();
- it++)
- flushing |= it->second->flushing;
- return flushing;
+ return dirty_caps | flushing_caps;
}
bool have_valid_size() {
__u64 cap_gen;
utime_t cap_ttl, last_cap_renew_request;
int num_caps;
- MDSSession() : seq(0), cap_gen(0), num_caps(0) {}
+
+ MClientCapRelease *release;
+
+ MDSSession() : seq(0), cap_gen(0), num_caps(0), release(NULL) {}
};
map<int, MDSSession> mds_sessions; // mds -> push seq
map<int, list<Cond*> > waiting_for_session;
map<tid_t,StatfsRequest*> statfs_requests;
MClientReply *make_request(MClientRequest *req, int uid, int gid,
- utime_t *pfrom=0, Inode **ptarget = 0,
+ Inode **ptarget = 0,
int use_mds=-1);
int choose_target_mds(MClientRequest *req);
void send_request(MetaRequest *request, int mds);
// file caps
void add_update_cap(Inode *in, int mds, __u64 cap_id,
- unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm);
+ unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm,
+ int flags);
void remove_cap(Inode *in, int mds);
void remove_all_caps(Inode *in);
+ void mark_caps_dirty(Inode *in, int caps);
void maybe_update_snaprealm(SnapRealm *realm, snapid_t snap_created, snapid_t snap_highwater,
vector<snapid_t>& snaps);
void handle_cap_flushsnap_ack(Inode *in, class MClientCaps *m);
void handle_cap_grant(Inode *in, int mds, InodeCap *cap, class MClientCaps *m);
void cap_delay_requeue(Inode *in);
- void send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain);
+ void send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain, int flush);
void check_caps(Inode *in, bool is_delayed);
void put_cap_ref(Inode *in, int cap);
void flush_snaps(Inode *in);
int _write(Fh *fh, __s64 offset, __u64 size, const char *buf);
int _flush(Fh *fh);
int _fsync(Fh *fh, bool syncdataonly);
+ int _sync_fs();
int _statfs(struct statvfs *stbuf);
int fsync(int fd, bool syncdataonly);
int fstat(int fd, struct stat *stbuf);
+ int sync_fs();
+
// hpc lazyio
int lazyio_propogate(int fd, loff_t offset, size_t count);
int lazyio_synchronize(int fd, loff_t offset, size_t count);