From ca723a29c717cb5d0851f1c6056e5d3a725f1aa1 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 7 Apr 2009 16:34:22 -0700 Subject: [PATCH] uclient: fix up caps some. make dentry leases work --- src/client/Client.cc | 245 ++++++++++++++++++++++++++++--------------- src/client/Client.h | 33 +++--- 2 files changed, 180 insertions(+), 98 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 77e7d11f2c5be..ca8b0838739e7 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -47,6 +47,7 @@ using namespace std; #include "messages/MClientRequestForward.h" #include "messages/MClientReply.h" #include "messages/MClientCaps.h" +#include "messages/MClientCapRelease.h" #include "messages/MClientLease.h" #include "messages/MClientSnap.h" @@ -382,9 +383,6 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, int mds) } } - //utime_t ttl = from; - //ttl += (float)lease->duration_ms * 1000.0; - //dout(12) << "update_inode mask " << lease->mask << " ttl " << ttl << dendl; dout(12) << "add_update_inode " << *in << " caps " << ccap_string(st->cap.caps) << dendl; @@ -392,7 +390,7 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, int mds) if (st->cap.caps) { if (in->snapid == CEPH_NOSNAP) - add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm)); + add_update_cap(in, mds, st->cap.cap_id, st->cap.caps, st->cap.seq, st->cap.mseq, inodeno_t(st->cap.realm), st->cap.flags); else { in->snap_caps |= st->cap.caps; } @@ -450,7 +448,7 @@ void Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dleas Inode *in, utime_t from, int mds) { utime_t dttl = from; - dttl += (float)dlease->duration_ms * 1000.0; + dttl += (float)dlease->duration_ms / 1000.0; Dentry *dn = NULL; if (dir->dentries.count(dname)) @@ -493,12 +491,11 @@ void Client::insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dleas assert(dn && dn->inode); if (dlease->mask & CEPH_LOCK_DN) { - utime_t ttl = from; - ttl += (float)dlease->duration_ms * 1000.0; - if (ttl > dn->lease_ttl) { - dout(10) << "got dentry lease on " << dname << " dur " << dlease->duration_ms << "ms ttl " << ttl << dendl; - dn->lease_ttl = ttl; - dn->lease_mds = from; + if (dttl > dn->lease_ttl) { + dout(10) << "got dentry lease on " << dname + << " dur " << dlease->duration_ms << "ms ttl " << dttl << dendl; + dn->lease_ttl = dttl; + dn->lease_mds = mds; dn->lease_seq = dlease->seq; } } @@ -547,6 +544,8 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds) MClientRequest *req = request->request; MClientReply *reply = request->reply; + dout(10) << "insert_trace from " << from << " mds" << mds << dendl; + bufferlist::iterator p = reply->get_trace_bl().begin(); if (p.end()) { dout(10) << "insert_trace -- no trace" << dendl; @@ -618,10 +617,43 @@ Inode* Client::insert_trace(MetaRequest *request, utime_t from, int mds) } } + // insert readdir results too + + // the rest? + p = reply->get_dir_bl().begin(); + if (!p.end()) { + // only open dir if we're actually adding stuff to it! + Dir *dir = in->open_dir(); + assert(dir); + + // dirstat + DirStat dst(p); + __u32 numdn; + __u8 complete, end; + ::decode(numdn, p); + ::decode(end, p); + ::decode(complete, p); + + string dname; + LeaseStat dlease; + while (numdn) { + ::decode(dname, p); + ::decode(dlease, p); + InodeStat ist(p); + + Inode *in = add_update_inode(&ist, from, mds); + insert_dentry_inode(dir, dname, &dlease, in, from, mds); + + numdn--; + } + + if (dir->is_empty()) + close_dir(dir); + } + return in; } - // ------- int Client::choose_target_mds(MClientRequest *req) @@ -698,7 +730,6 @@ int Client::choose_target_mds(MClientRequest *req) MClientReply *Client::make_request(MClientRequest *req, int uid, int gid, - utime_t *pfrom, Inode **ptarget, int use_mds) { @@ -827,8 +858,6 @@ MClientReply *Client::make_request(MClientRequest *req, // insert trace utime_t from = request.sent_stamp; - if (pfrom) - *pfrom = from; Inode *target = insert_trace(&request, from, mds); if (ptarget) *ptarget = target; @@ -1225,8 +1254,6 @@ void Client::handle_lease(MClientLease *m) } Dentry *dn = in->dir->dentries[m->dname]; dout(10) << " revoked DN lease on " << dn << dendl; - if (dn->lease_mds == mds) - seq = dn->lease_seq; dn->lease_mds = -1; } @@ -1263,6 +1290,9 @@ void Client::put_inode(Inode *in, int n) //cout << "put_inode on " << in << " " << in->inode.ino << endl; in->put(n); if (in->ref == 0) { + // release any caps + remove_all_caps(in); + //cout << "put_inode deleting " << in << " " << in->inode.ino << std::endl; objectcacher->release_set(in->ino()); if (in->snapdir_parent) @@ -1344,7 +1374,7 @@ void Client::cap_delay_requeue(Inode *in) delayed_caps.push_back(&in->cap_item); } -void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain) +void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain, int flush) { int held = cap->issued | cap->implemented; int revoking = cap->implemented & ~cap->issued; @@ -1355,20 +1385,13 @@ void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int << " mds" << mds << " seq " << cap->seq << " used " << ccap_string(used) << " want " << ccap_string(want) + << " flush " << ccap_string(flush) << " retain " << ccap_string(retain) << " held "<< ccap_string(held) << " revoking " << ccap_string(revoking) << " dropping " << ccap_string(dropping) << dendl; - - int dirty = in->caps_dirty(); - cap->flushing |= dirty & held; - if (cap->flushing) { - dout(10) << " flushing " << ccap_string(cap->flushing) << dendl; - in->dirty_caps &= ~cap->flushing; - } - cap->issued &= retain; if (revoking && (revoking & used) == 0) { @@ -1381,7 +1404,7 @@ void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int cap->cap_id, cap->seq, cap->issued, want, - cap->flushing, + flush, cap->mseq); m->head.uid = in->inode.uid; @@ -1401,16 +1424,12 @@ void Client::send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int in->inode.atime.encode_timeval(&m->head.atime); in->inode.ctime.encode_timeval(&m->head.ctime); m->head.time_warp_seq = in->inode.time_warp_seq; - - + in->reported_size = in->inode.size; m->set_max_size(in->wanted_max_size); in->requested_max_size = in->wanted_max_size; m->set_snap_follows(in->snaprealm->get_snap_context().seq); messenger->send_message(m, mdsmap->get_inst(mds)); - - if (cap->flushing == 0 && cap->issued == 0) - remove_cap(in, mds); } @@ -1418,6 +1437,7 @@ void Client::check_caps(Inode *in, bool is_delayed) { unsigned wanted = in->caps_wanted(); unsigned used = in->caps_used(); + int flush = 0; int retain = wanted; if (!unmounting) { @@ -1491,9 +1511,16 @@ void Client::check_caps(Inode *in, bool is_delayed) dout(10) << "delaying cap release" << dendl; continue; } - + ack: - send_cap(in, mds, cap, used, wanted, retain); + if (cap == in->auth_cap) { + flush = in->dirty_caps; + in->flushing_caps |= flush; + in->dirty_caps = 0; + dout(10) << " flushing " << ccap_string(flush) << dendl; + } + + send_cap(in, mds, cap, used, wanted, retain, flush); } } @@ -1670,7 +1697,8 @@ void Client::_flushed(Inode *in) * do not block. */ void Client::add_update_cap(Inode *in, int mds, __u64 cap_id, - unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm) + unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm, + int flags) { InodeCap *cap = 0; if (in->caps.count(mds)) { @@ -1681,7 +1709,6 @@ void Client::add_update_cap(Inode *in, int mds, __u64 cap_id, assert(in->snaprealm == 0); in->snaprealm = get_snap_realm(realm); in->snaprealm->inodes_with_caps.push_back(&in->snaprealm_item); - in->get(); dout(15) << "add_update_cap first one, opened snaprealm " << in->snaprealm << dendl; } if (in->exporting_mds == mds) { @@ -1694,6 +1721,9 @@ void Client::add_update_cap(Inode *in, int mds, __u64 cap_id, cap_list.push_back(&in->cap_item); } + if (flags & CEPH_CAP_FLAG_AUTH) + in->auth_cap = cap; + unsigned old_caps = cap->issued; cap->cap_id = cap_id; cap->issued |= issued; @@ -1712,28 +1742,44 @@ void Client::add_update_cap(Inode *in, int mds, __u64 cap_id, void Client::remove_cap(Inode *in, int mds) { dout(10) << "remove_cap mds" << mds << " on " << *in << dendl; + InodeCap *cap = in->caps[mds]; + MDSSession *session = &mds_sessions[mds]; + + if (!session->release) + session->release = new MClientCapRelease; + ceph_mds_cap_item i; + i.ino = in->ino(); + i.cap_id = cap->cap_id; + i.seq = cap->seq; + i.migrate_seq = cap->mseq; + session->release->caps.push_back(i); + + if (in->auth_cap == cap) + in->auth_cap = NULL; assert(in->caps.count(mds)); in->caps.erase(mds); + if (!in->is_any_caps()) { dout(15) << "remove_cap last one, closing snaprealm " << in->snaprealm << dendl; put_snap_realm(in->snaprealm); in->snaprealm = 0; in->snaprealm_item.remove_myself(); - put_inode(in); } } void Client::remove_all_caps(Inode *in) { - bool wasempty = in->caps.empty(); - in->caps.clear(); - if (!wasempty) { - dout(15) << "remove_all_caps closing snaprealm " << in->snaprealm << dendl; - put_snap_realm(in->snaprealm); - in->snaprealm = 0; - in->snaprealm_item.remove_myself(); - put_inode(in); - } + while (in->caps.size()) + remove_cap(in, in->caps.begin()->first); +} + +void Client::mark_caps_dirty(Inode *in, int caps) +{ + dout(10) << "mark_caps_dirty " << *in << " " << ccap_string(in->dirty_caps) << " -> " + << ccap_string(in->dirty_caps | caps) << dendl; + if (caps & !in->dirty_caps) + in->get(); + in->dirty_caps |= caps; } void SnapRealm::build_snap_context() @@ -2005,7 +2051,8 @@ void Client::handle_cap_import(Inode *in, MClientCaps *m) // add/update it update_snap_trace(m->snapbl); add_update_cap(in, mds, m->get_cap_id(), - m->get_caps(), m->get_seq(), m->get_mseq(), m->get_realm()); + m->get_caps(), m->get_seq(), m->get_mseq(), m->get_realm(), + CEPH_CAP_FLAG_AUTH); if (m->get_mseq() > in->exporting_mseq) { dout(5) << "handle_cap_import ino " << m->get_ino() << " mseq " << m->get_mseq() @@ -2086,17 +2133,18 @@ void Client::handle_cap_trunc(Inode *in, MClientCaps *m) void Client::handle_cap_flush_ack(Inode *in, int mds, InodeCap *cap, MClientCaps *m) { - int cleaned = m->get_dirty() & ~m->get_caps(); + int cleaned = m->get_dirty(); dout(5) << "handle_cap_flush_ack mds" << mds << " cleaned " << ccap_string(cleaned) << " on " << *in << dendl; - cap->flushing &= ~cleaned; - dout(5) << " cap->flushing now " << ccap_string(cap->flushing) - << ", in->caps_dirty() now " << ccap_string(in->caps_dirty()) << dendl; - if (m->get_caps() == 0 && m->get_seq() == cap->seq) { - assert(in->caps_dirty() == 0); - remove_cap(in, mds); + if (in->flushing_caps) { + dout(5) << " flushing_caps " << ccap_string(in->flushing_caps) + << " -> " << ccap_string(in->flushing_caps & ~cleaned) << dendl; + in->flushing_caps &= ~cleaned; + if (!in->caps_dirty()) + put_inode(in); } + delete m; } @@ -2485,9 +2533,23 @@ void Client::tick() timer.add_event_after(g_conf.client_tick_interval, tick_event); utime_t now = g_clock.now(); - utime_t el = now - last_cap_renew; - if (mdsmap && el > mdsmap->get_session_timeout() / 3.0) - renew_caps(); + + if (mdsmap) { + // renew caps? + utime_t el = now - last_cap_renew; + if (el > mdsmap->get_session_timeout() / 3.0) + renew_caps(); + + // send any cap releases + for (map::iterator p = mds_sessions.begin(); + p != mds_sessions.end(); + p++) { + if (p->second.release) { + messenger->send_message(p->second.release, mdsmap->get_inst(p->first)); + p->second.release = 0; + } + } + } // delayed caps xlist::iterator p = delayed_caps.begin(); @@ -2500,6 +2562,7 @@ void Client::tick() cap_list.push_back(&in->cap_item); check_caps(in, true); } + } void Client::renew_caps() @@ -2532,7 +2595,7 @@ int Client::_do_lookup(Inode *dir, const char *name, Inode **target) req->head.args.stat.mask = 0; dout(10) << "_lookup on " << path << dendl; - MClientReply *reply = make_request(req, 0, 0, 0, target); + MClientReply *reply = make_request(req, 0, 0, target); int r = reply->get_result(); dout(10) << "_lookup res is " << r << dendl; delete reply; @@ -2565,8 +2628,17 @@ int Client::_lookup(Inode *dir, const string& dname, Inode **target) if (dir->dir && dir->dir->dentries.count(dname)) { Dentry *dn = dir->dir->dentries[dname]; - *target = dn->inode; - goto done; + + dout(20) << " have dn " << dname << " mds" << dn->lease_mds << " ttl " << dn->lease_ttl + << " seq " << dn->lease_seq + << dendl; + + // is lease valid? + if (dn->lease_mds >= 0 && + dn->lease_ttl > g_clock.now()) { + *target = dn->inode; + goto done; + } } r = _do_lookup(dir, dname.c_str(), target); @@ -3134,10 +3206,8 @@ int Client::_readdir_get_frag(DirResult *dirp) req->set_filepath(path); req->head.args.readdir.frag = fg; - utime_t from; - MClientReply *reply = make_request(req, -1, -1, &from); + MClientReply *reply = make_request(req, -1, -1); int res = reply->get_result(); - int mds = reply->get_source().num(); if (res == -EAGAIN) { dout(10) << "_readdir_get_frag got EAGAIN, retrying" << dendl; @@ -3166,10 +3236,6 @@ int Client::_readdir_get_frag(DirResult *dirp) // the rest? bufferlist::iterator p = reply->get_dir_bl().begin(); if (!p.end()) { - // only open dir if we're actually adding stuff to it! - Dir *dir = diri->open_dir(); - assert(dir); - // dirstat DirStat dst(p); __u32 numdn; @@ -3185,22 +3251,13 @@ int Client::_readdir_get_frag(DirResult *dirp) ::decode(dlease, p); InodeStat ist(p); - Inode *in = add_update_inode(&ist, from, mds); - insert_dentry_inode(dir, dname, &dlease, in, from, mds); - - // caller + Inode *in = _ll_get_inode(ist.vino); dout(15) << "_readdir_get_frag got " << dname << " to " << in->inode.ino << dendl; _readdir_add_dirent(dirp, dname, in); numdn--; } - - if (dir->is_empty()) - close_dir(dir); } - - // FIXME: remove items in cache that weren't in my readdir? - // *** } else { dout(10) << "_readdir_get_frag got error " << res << ", setting end flag" << dendl; dirp->set_end(); @@ -3344,14 +3401,23 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp, int uid, int gid) filepath path; in->make_path(path); req->set_filepath(path); - req->head.args.open.flags = flags; + req->head.args.open.flags = flags & ~O_CREAT; req->head.args.open.mode = mode; int cmode = ceph_flags_to_mode(flags); in->get_open_ref(cmode); // make note of pending open, since it effects _wanted_ caps. - - MClientReply *reply = make_request(req, uid, gid); - int result = reply->get_result(); + + int want = ceph_caps_for_mode(cmode); + + int result = 0; + if ((in->caps_issued() & want) == want) { + // update wanted? + check_caps(in, true); + } else { + MClientReply *reply = make_request(req, uid, gid); + result = reply->get_result(); + delete reply; + } // success? if (result >= 0) { @@ -3378,8 +3444,6 @@ int Client::_open(Inode *in, int flags, mode_t mode, Fh **fhp, int uid, int gid) in->put_open_ref(cmode); } - delete reply; - trim_cache(); return result; @@ -4043,6 +4107,19 @@ void Client::handle_statfs_reply(MStatfsReply *reply) } +int Client::_sync_fs() +{ + dout(10) << "_sync_fs" << dendl; + return 0; +} + +int Client::sync_fs() +{ + Mutex::Locker l(client_lock); + return _sync_fs(); +} + + int Client::lazyio_propogate(int fd, loff_t offset, size_t count) { client_lock.Lock(); @@ -4808,7 +4885,7 @@ int Client::ll_create(vinodeno_t parent, const char *name, mode_t mode, int flag tout << flags << std::endl; Inode *dir = _ll_get_inode(parent); - int r = _mknod(dir, name, 0, 0); + int r = _mknod(dir, name, mode, 0); if (r < 0) return r; Dentry *dn = dir->dir->dentries[name]; diff --git a/src/client/Client.h b/src/client/Client.h index 16b09663bec3d..2930cbbd937d6 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -66,6 +66,8 @@ class MClientRequest; class MClientMountAck; class MClientRequestForward; class MClientLease; +class MClientCaps; +class MClientCapRelease; class MMonMap; class Filer; @@ -174,11 +176,10 @@ struct InodeCap { unsigned issued; unsigned implemented; unsigned wanted; // as known to mds. - unsigned flushing; __u64 seq; __u32 mseq; // migration seq - InodeCap() : issued(0), implemented(0), wanted(0), flushing(0), seq(0), mseq(0) {} + InodeCap() : issued(0), implemented(0), wanted(0), seq(0), mseq(0) {} }; struct CapSnap { @@ -205,7 +206,8 @@ class Inode { // per-mds caps map caps; // mds -> InodeCap - unsigned dirty_caps; + InodeCap *auth_cap; + unsigned dirty_caps, flushing_caps; int snap_caps, snap_cap_refs; unsigned exporting_issued; int exporting_mds; @@ -290,7 +292,7 @@ class Inode { //inode(_inode), snapid(vino.snapid), dir_auth(-1), dir_hashed(false), dir_replicated(false), - dirty_caps(0), + dirty_caps(0), flushing_caps(0), snap_caps(0), snap_cap_refs(0), exporting_issued(0), exporting_mds(-1), exporting_mseq(0), cap_item(this), @@ -365,12 +367,7 @@ class Inode { return want; } int caps_dirty() { - int flushing = dirty_caps; - for (map::iterator it = caps.begin(); - it != caps.end(); - it++) - flushing |= it->second->flushing; - return flushing; + return dirty_caps | flushing_caps; } bool have_valid_size() { @@ -579,7 +576,10 @@ public: __u64 cap_gen; utime_t cap_ttl, last_cap_renew_request; int num_caps; - MDSSession() : seq(0), cap_gen(0), num_caps(0) {} + + MClientCapRelease *release; + + MDSSession() : seq(0), cap_gen(0), num_caps(0), release(NULL) {} }; map mds_sessions; // mds -> push seq map > waiting_for_session; @@ -626,7 +626,7 @@ public: map statfs_requests; MClientReply *make_request(MClientRequest *req, int uid, int gid, - utime_t *pfrom=0, Inode **ptarget = 0, + Inode **ptarget = 0, int use_mds=-1); int choose_target_mds(MClientRequest *req); void send_request(MetaRequest *request, int mds); @@ -844,9 +844,11 @@ protected: // file caps void add_update_cap(Inode *in, int mds, __u64 cap_id, - unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm); + unsigned issued, unsigned seq, unsigned mseq, inodeno_t realm, + int flags); void remove_cap(Inode *in, int mds); void remove_all_caps(Inode *in); + void mark_caps_dirty(Inode *in, int caps); void maybe_update_snaprealm(SnapRealm *realm, snapid_t snap_created, snapid_t snap_highwater, vector& snaps); @@ -860,7 +862,7 @@ protected: void handle_cap_flushsnap_ack(Inode *in, class MClientCaps *m); void handle_cap_grant(Inode *in, int mds, InodeCap *cap, class MClientCaps *m); void cap_delay_requeue(Inode *in); - void send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain); + void send_cap(Inode *in, int mds, InodeCap *cap, int used, int want, int retain, int flush); void check_caps(Inode *in, bool is_delayed); void put_cap_ref(Inode *in, int cap); void flush_snaps(Inode *in); @@ -945,6 +947,7 @@ private: int _write(Fh *fh, __s64 offset, __u64 size, const char *buf); int _flush(Fh *fh); int _fsync(Fh *fh, bool syncdataonly); + int _sync_fs(); int _statfs(struct statvfs *stbuf); @@ -1009,6 +1012,8 @@ public: int fsync(int fd, bool syncdataonly); int fstat(int fd, struct stat *stbuf); + int sync_fs(); + // hpc lazyio int lazyio_propogate(int fd, loff_t offset, size_t count); int lazyio_synchronize(int fd, loff_t offset, size_t count); -- 2.39.5