From c48c720bb94c6544419cf10955f6c94c9d006bc2 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 9 Apr 2009 15:28:11 -0700 Subject: [PATCH] mds: include cap, dentry lease release in request messages Embed dentry and cap releases inside mds request messages. This avoids the overhead of sending additional messages, although it does limit the release to the mds the request is going to. That is normally fine, since updates go to the auth MDS, and that is usually who we're dealing with. --- src/include/ceph_fs.h | 5 +-- src/kernel/caps.c | 75 ++++++++++++++++++++++++++++++++++++++--- src/kernel/dir.c | 50 ++++++++++++--------------- src/kernel/file.c | 9 +++-- src/kernel/inode.c | 8 ++--- src/kernel/ioctl.c | 3 +- src/kernel/mds_client.c | 43 +++++++++++++++++++---- src/kernel/mds_client.h | 6 ++++ src/kernel/super.h | 4 +++ src/mds/Locker.cc | 38 +++++++++++++++++++++ src/mds/Locker.h | 19 +++++++---- src/mds/Server.cc | 11 ++++++ 12 files changed, 214 insertions(+), 57 deletions(-) diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index c0efce17d2eb9..2aded4ea51dc5 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -808,9 +808,10 @@ struct ceph_mds_request_head { } __attribute__ ((packed)); struct ceph_mds_request_release { - __le64 ino; + __le64 ino, cap_id; __le32 caps; - __le32 seq; + __le32 seq, mseq; + __le32 dname_seq; __le32 dname_len; /* if releasing a dentry lease, too. string follows. */ } __attribute__ ((packed)); diff --git a/src/kernel/caps.c b/src/kernel/caps.c index 7b1ffb1945778..cbb47fd25623b 100644 --- a/src/kernel/caps.c +++ b/src/kernel/caps.c @@ -258,9 +258,8 @@ void ceph_reservation_status(int *total, int *avail, int *used, int *reserved) * * Called with i_lock held. */ -static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds) +static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) { - struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; struct rb_node *n = ci->i_caps.rb_node; @@ -433,7 +432,7 @@ int ceph_add_cap(struct inode *inode, retry: spin_lock(&inode->i_lock); - cap = __get_cap_for_mds(inode, mds); + cap = __get_cap_for_mds(ci, mds); if (!cap) { if (new_cap) { cap = new_cap; @@ -2093,7 +2092,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc, /* the rest require a cap */ spin_lock(&inode->i_lock); - cap = __get_cap_for_mds(inode, mds); + cap = __get_cap_for_mds(ceph_inode(inode), mds); if (!cap) { dout(10, "no cap on %p ino %llx.%llx from mds%d, releasing\n", inode, ceph_ino(inode), ceph_snap(inode), mds); @@ -2192,3 +2191,71 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) ceph_check_caps(ci, 0, 0, NULL); } +/* + * Helpers for embedding cap and dentry lease releases into mds + * requests. + */ +int ceph_encode_inode_release(void **p, struct inode *inode, + int mds, int drop, int unless) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap *cap; + struct ceph_mds_request_release *rel = *p; + int ret = 0; + + dout(10, "encode_inode_release %p mds%d drop %s unless %s\n", inode, + mds, ceph_cap_string(drop), ceph_cap_string(unless)); + + spin_lock(&inode->i_lock); + cap = __get_cap_for_mds(ci, mds); + if (cap && __cap_is_valid(cap)) { + if ((cap->issued & drop) && + (cap->issued & unless) == 0) { + dout(10, "encode_inode_release %p cap %p %s -> %s\n", + inode, cap, ceph_cap_string(cap->issued), + ceph_cap_string(cap->issued & ~drop)); + cap->issued &= ~drop; + cap->implemented &= ~drop; + + rel->ino = cpu_to_le64(ceph_ino(inode)); + rel->cap_id = cpu_to_le64(cap->cap_id); + rel->seq = cpu_to_le32(cap->seq); + rel->mseq = cpu_to_le32(cap->mseq); + rel->caps = cpu_to_le32(cap->issued); + rel->dname_len = 0; + rel->dname_seq = 0; + *p += sizeof(*rel); + ret = 1; + } else { + dout(10, "encode_inode_release %p cap %p %s\n", + inode, cap, ceph_cap_string(cap->issued)); + } + } + spin_unlock(&inode->i_lock); + return ret; +} + +int ceph_encode_dentry_release(void **p, struct dentry *dentry, + int mds, int drop, int unless) +{ + struct inode *dir = dentry->d_parent->d_inode; + struct ceph_mds_request_release *rel = *p; + struct ceph_dentry_info *di = ceph_dentry(dentry); + int ret; + + ret = ceph_encode_inode_release(p, dir, mds, drop, unless); + + /* drop dentry lease too? */ + spin_lock(&dentry->d_lock); + if (ret && di->lease_session && di->lease_session->s_mds == mds) { + dout(10, "encode_dentry_release %p mds%d seq %d\n", + dentry, mds, (int)di->lease_seq); + rel->dname_len = cpu_to_le32(dentry->d_name.len); + memcpy(*p, dentry->d_name.name, dentry->d_name.len); + *p += dentry->d_name.len; + rel->dname_seq = cpu_to_le32(di->lease_seq); + } + spin_unlock(&dentry->d_lock); + + return ret; +} diff --git a/src/kernel/dir.c b/src/kernel/dir.c index 3351479f21a83..efa2ef8b5e127 100644 --- a/src/kernel/dir.c +++ b/src/kernel/dir.c @@ -396,7 +396,7 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) } static int ceph_mknod(struct inode *dir, struct dentry *dentry, - int mode, dev_t rdev) + int mode, dev_t rdev) { struct ceph_client *client = ceph_sb_to_client(dir->i_sb); struct ceph_mds_client *mdsc = &client->mdsc; @@ -409,7 +409,6 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, dout(5, "mknod in dir %p dentry %p mode 0%o rdev %d\n", dir, dentry, mode, rdev); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); - if (IS_ERR(req)) { d_drop(dentry); return PTR_ERR(req); @@ -419,8 +418,8 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, req->r_locked_dir = dir; req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.rdev = cpu_to_le32(rdev); - if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -469,13 +468,12 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, d_drop(dentry); return PTR_ERR(req); } - req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_path2 = dest; req->r_locked_dir = dir; - if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -514,9 +512,8 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode) req->r_num_caps = 2; req->r_locked_dir = dir; req->r_args.mkdir.mode = cpu_to_le32(mode); - - if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -549,9 +546,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */ req->r_locked_dir = dir; - - if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; err = ceph_mdsc_do_request(mdsc, dir, req); if (err) { d_drop(dentry); @@ -567,15 +563,15 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, * than PIN) we don't specifically want (due to the file still being * open). */ -static void drop_caps_for_unlink(struct inode *inode) +static int drop_caps_for_unlink(struct inode *inode) { int drop = CEPH_CAP_LINK_RDCACHE | CEPH_CAP_LINK_EXCL; spin_lock(&inode->i_lock); - if (inode->i_nlink == 1) + if (inode->i_nlink == 1) drop |= ~(__ceph_caps_wanted(ceph_inode(inode)) | CEPH_CAP_PIN); spin_unlock(&inode->i_lock); - ceph_release_caps(inode, drop); + return drop; } /* @@ -610,11 +606,9 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_locked_dir = dir; - - if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); - ceph_mdsc_lease_release(mdsc, dir, dentry, CEPH_LOCK_DN); - drop_caps_for_unlink(inode); + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + req->r_inode_drop = drop_caps_for_unlink(inode); err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) d_delete(dentry); @@ -645,16 +639,14 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); req->r_locked_dir = new_dir; - if (!ceph_caps_issued_mask(ceph_inode(old_dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(old_dir, CEPH_CAP_FILE_RDCACHE); - ceph_mdsc_lease_release(mdsc, old_dir, old_dentry, CEPH_LOCK_DN); - if (!ceph_caps_issued_mask(ceph_inode(new_dir), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(new_dir, CEPH_CAP_FILE_RDCACHE); - ceph_mdsc_lease_release(mdsc, new_dir, new_dentry, CEPH_LOCK_DN); + req->r_old_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; /* release LINK_RDCACHE on source inode (mds will lock it) */ - ceph_release_caps(old_dentry->d_inode, CEPH_CAP_LINK_RDCACHE); + req->r_old_inode_drop = CEPH_CAP_LINK_RDCACHE; if (new_dentry->d_inode) - drop_caps_for_unlink(new_dentry->d_inode); + req->r_inode_drop = drop_caps_for_unlink(new_dentry->d_inode); err = ceph_mdsc_do_request(mdsc, old_dir, req); if (!err && !req->r_reply_info.head->is_dentry) { /* diff --git a/src/kernel/file.c b/src/kernel/file.c index b770149c62786..919d89a8c1b06 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -165,8 +165,6 @@ int ceph_open(struct inode *inode, struct file *file) spin_unlock(&inode->i_lock); dout(10, "open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); - if (!ceph_caps_issued_mask(ceph_inode(inode), CEPH_CAP_FILE_EXCL)) - ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE); req = prepare_open_request(inode->i_sb, flags, 0); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -217,9 +215,10 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, return ERR_PTR(PTR_ERR(req)); req->r_dentry = dget(dentry); req->r_num_caps = 2; - if ((flags & O_CREAT) && - (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))) - ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE); + if (flags & O_CREAT) { + req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE; + req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + } req->r_locked_dir = dir; /* caller holds dir->i_mutex */ err = ceph_mdsc_do_request(mdsc, parent_inode, req); dentry = ceph_finish_lookup(req, dentry, err); diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 1c7c49b3c39bd..50e06508fa9b6 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -1437,9 +1437,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) spin_unlock(&inode->i_lock); if (mask) { - if (release) - ceph_release_caps(inode, release); req->r_inode = igrab(inode); + req->r_inode_drop = release; req->r_args.setattr.mask = mask; req->r_num_caps = 1; err = ceph_mdsc_do_request(mdsc, parent_inode, req); @@ -1795,6 +1794,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name, if (IS_ERR(req)) return PTR_ERR(req); req->r_inode = igrab(inode); + req->r_inode_drop = CEPH_CAP_XATTR_RDCACHE; req->r_num_caps = 1; req->r_args.setxattr.flags = cpu_to_le32(flags); @@ -1803,7 +1803,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name, req->r_request->hdr.data_len = cpu_to_le32(size); req->r_request->hdr.data_off = cpu_to_le16(0); - ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE); err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); @@ -1840,8 +1839,9 @@ int ceph_removexattr(struct dentry *dentry, const char *name) if (IS_ERR(req)) return PTR_ERR(req); req->r_inode = igrab(inode); + req->r_inode_drop = CEPH_CAP_XATTR_RDCACHE; req->r_num_caps = 1; - ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); return err; diff --git a/src/kernel/ioctl.c b/src/kernel/ioctl.c index 55e0991c07582..0dd70722d06e2 100644 --- a/src/kernel/ioctl.c +++ b/src/kernel/ioctl.c @@ -43,8 +43,9 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) if (IS_ERR(req)) return PTR_ERR(req); req->r_inode = igrab(inode); + req->r_inode_drop = CEPH_CAP_FILE_RDCACHE | CEPH_CAP_FILE_EXCL; req->r_args.setlayout.layout = layout; - ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE); + err = ceph_mdsc_do_request(mdsc, parent_inode, req); ceph_mdsc_put_request(req); return err; diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index c3dcefaa6e14d..64b3b3fc705aa 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -895,6 +895,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) if (!req) return ERR_PTR(-ENOMEM); + req->r_started = jiffies; req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); @@ -1092,8 +1093,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, const char *path2 = req->r_path2; u64 ino1, ino2; int pathlen1, pathlen2; - int pathlen; + int len; int freepath1, freepath2; + u32 releases; void *p, *end; int ret; @@ -1113,9 +1115,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, goto out_free1; } - pathlen = pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64)); - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(*head) + pathlen, - 0, 0, NULL); + len = sizeof(*head) + + pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64)); + + /* calculate (max) length for cap releases */ + len += sizeof(struct ceph_mds_request_release) * + (!!req->r_inode_drop + !!req->r_dentry_drop + + !!req->r_old_inode_drop + !!req->r_old_dentry_drop); + if (req->r_dentry_drop) + len += req->r_dentry->d_name.len; + if (req->r_old_dentry_drop) + len += req->r_old_dentry->d_name.len; + + msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL); if (IS_ERR(msg)) goto out_free2; @@ -1134,11 +1146,30 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, #endif head->args = req->r_args; - head->num_releases = 0; ceph_encode_filepath(&p, end, ino1, path1); ceph_encode_filepath(&p, end, ino2, path2); - BUG_ON(p != end); + /* cap releases */ + releases = 0; + if (req->r_inode_drop) + releases += ceph_encode_inode_release(&p, + req->r_inode ? req->r_inode : req->r_dentry->d_inode, + mds, req->r_inode_drop, req->r_inode_unless); + if (req->r_dentry_drop) + releases += ceph_encode_dentry_release(&p, req->r_dentry, + mds, req->r_dentry_drop, req->r_dentry_unless); + if (req->r_old_dentry_drop) + releases += ceph_encode_dentry_release(&p, req->r_old_dentry, + mds, req->r_old_dentry_drop, req->r_old_dentry_unless); + if (req->r_old_inode_drop) + releases += ceph_encode_inode_release(&p, + req->r_old_dentry->d_inode, + mds, req->r_old_inode_drop, req->r_old_inode_unless); + head->num_releases = cpu_to_le32(releases); + + BUG_ON(p > end); + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); out_free2: if (freepath2) diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index e1a936d826bec..3da02d88b4d1d 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -167,6 +167,12 @@ struct ceph_mds_request { struct ceph_vino r_ino1, r_ino2; union ceph_mds_request_args r_args; + int r_inode_drop, r_inode_unless; + int r_dentry_drop, r_dentry_unless; + int r_old_dentry_drop, r_old_dentry_unless; + struct inode *r_old_inode; + int r_old_inode_drop, r_old_inode_unless; + struct inode *r_target_inode; struct ceph_msg *r_request; /* original request */ diff --git a/src/kernel/super.h b/src/kernel/super.h index ed67f91ba8df6..3691cb38b76c8 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -790,6 +790,10 @@ static inline void ceph_release_caps(struct inode *inode, int mask) { ceph_check_caps(ceph_inode(inode), 1, mask, NULL); } +extern int ceph_encode_inode_release(void **p, struct inode *inode, + int mds, int drop, int unless); +extern int ceph_encode_dentry_release(void **p, struct dentry *dn, + int mds, int drop, int unless); extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, loff_t endoff); diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index b31e9efbb7707..bc157429db2bc 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -1588,6 +1588,44 @@ void Locker::handle_client_caps(MClientCaps *m) delete m; } +void Locker::process_cap_update(int client, inodeno_t ino, __u64 cap_id, int caps, int seq, int mseq, + const nstring& dname) +{ + CInode *in = mdcache->get_inode(ino); + if (!in) + return; + Capability *cap = in->get_client_cap(client); + if (cap) { + dout(10) << "process_cap_update client" << client << " " << ccap_string(caps) << " on " << *in << dendl; + + if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) { + dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl; + return; + } + + cap->confirm_receipt(seq, caps); + + eval_cap_gather(in); + if (in->filelock.is_stable()) + file_eval(&in->filelock); + if (in->authlock.is_stable()) + eval(&in->authlock); + } + + if (dname.length()) { + frag_t fg = in->pick_dirfrag(dname); + CDir *dir = in->get_dirfrag(fg); + if (dir) { + CDentry *dn = dir->lookup(dname); + MDSCacheObject *p = dn; + ClientLease *l = p->get_client_lease(client); + if (l) + p->remove_client_lease(l, l->mask, this); + } + } + +} + /* * update inode based on cap flush|flushsnap|wanted. * adjust max_size, if needed. diff --git a/src/mds/Locker.h b/src/mds/Locker.h index 6475e37c37200..49dfee5605a36 100644 --- a/src/mds/Locker.h +++ b/src/mds/Locker.h @@ -152,7 +152,20 @@ protected: public: void mark_updated_scatterlock(ScatterLock *lock); + + // caps + void process_cap_update(int client, inodeno_t ino, __u64 cap_id, int caps, int seq, int mseq, + const nstring& dname); + + protected: + void handle_client_caps(class MClientCaps *m); + bool _do_cap_update(CInode *in, Capability *cap, int had, int wanted, snapid_t follows, MClientCaps *m, + MClientCaps *ack=0); + void handle_client_cap_release(class MClientCapRelease *m); + + // local +public: void local_wrlock_grab(LocalLock *lock, Mutation *mut); protected: bool local_wrlock_start(LocalLock *lock, MDRequest *mut); @@ -184,12 +197,6 @@ public: void resume_stale_caps(Session *session); void remove_stale_leases(Session *session); - protected: - void handle_client_caps(class MClientCaps *m); - bool _do_cap_update(CInode *in, Capability *cap, int had, int wanted, snapid_t follows, MClientCaps *m, - MClientCaps *ack=0); - void handle_client_cap_release(class MClientCapRelease *m); - public: void request_inode_file_caps(CInode *in); protected: diff --git a/src/mds/Server.cc b/src/mds/Server.cc index c90646ccf8d7c..5f2db12aa01bf 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -834,6 +834,17 @@ void Server::handle_client_request(MClientRequest *req) session->trim_completed_requests(req->get_oldest_client_tid()); } + // process embedded cap releases? + if (req->get_source().is_client()) { + int client = req->get_source().num(); + for (vector::iterator p = req->releases.begin(); + p != req->releases.end(); + p++) { + mds->locker->process_cap_update(client, inodeno_t((__u64)p->item.ino), p->item.cap_id, p->item.caps, + p->item.seq, p->item.mseq, p->dname); + } + } + // register + dispatch MDRequest *mdr = mdcache->request_start(req); if (!mdr) return; -- 2.39.5