]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: include cap, dentry lease release in request messages
authorSage Weil <sage@newdream.net>
Thu, 9 Apr 2009 22:28:11 +0000 (15:28 -0700)
committerSage Weil <sage@newdream.net>
Thu, 9 Apr 2009 22:28:11 +0000 (15:28 -0700)
Embed dentry and cap releases inside mds request messages.  This
avoids the overhead of sending additional messages, although it
does limit the release to the mds the request is going to.  That
is normally fine, since updates go to the auth MDS, and that is
usually who we're dealing with.

12 files changed:
src/include/ceph_fs.h
src/kernel/caps.c
src/kernel/dir.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/ioctl.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.h
src/mds/Locker.cc
src/mds/Locker.h
src/mds/Server.cc

index c0efce17d2eb9f5d438d5760e716a24774d31f66..2aded4ea51dc5b3475dfaecc821ce0f2cc1ffa50 100644 (file)
@@ -808,9 +808,10 @@ struct ceph_mds_request_head {
 } __attribute__ ((packed));
 
 struct ceph_mds_request_release {
-       __le64 ino;
+       __le64 ino, cap_id;
        __le32 caps;
-       __le32 seq;
+       __le32 seq, mseq;
+       __le32 dname_seq;
        __le32 dname_len;   /* if releasing a dentry lease, too.  string follows. */
 } __attribute__ ((packed));
 
index 7b1ffb1945778c7050488b1eecd064133740acb5..cbb47fd25623bf23a379fadd808f88f4659f2989 100644 (file)
@@ -258,9 +258,8 @@ void ceph_reservation_status(int *total, int *avail, int *used, int *reserved)
  *
  * Called with i_lock held.
  */
-static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds)
+static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
 {
-       struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_cap *cap;
        struct rb_node *n = ci->i_caps.rb_node;
 
@@ -433,7 +432,7 @@ int ceph_add_cap(struct inode *inode,
 
 retry:
        spin_lock(&inode->i_lock);
-       cap = __get_cap_for_mds(inode, mds);
+       cap = __get_cap_for_mds(ci, mds);
        if (!cap) {
                if (new_cap) {
                        cap = new_cap;
@@ -2093,7 +2092,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
 
        /* the rest require a cap */
        spin_lock(&inode->i_lock);
-       cap = __get_cap_for_mds(inode, mds);
+       cap = __get_cap_for_mds(ceph_inode(inode), mds);
        if (!cap) {
                dout(10, "no cap on %p ino %llx.%llx from mds%d, releasing\n",
                     inode, ceph_ino(inode), ceph_snap(inode), mds);
@@ -2192,3 +2191,71 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
                ceph_check_caps(ci, 0, 0, NULL);
 }
 
+/*
+ * Helpers for embedding cap and dentry lease releases into mds
+ * requests.
+ */
+int ceph_encode_inode_release(void **p, struct inode *inode,
+                             int mds, int drop, int unless)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_cap *cap;
+       struct ceph_mds_request_release *rel = *p;
+       int ret = 0;
+
+       dout(10, "encode_inode_release %p mds%d drop %s unless %s\n", inode,
+            mds, ceph_cap_string(drop), ceph_cap_string(unless));
+
+       spin_lock(&inode->i_lock);
+       cap = __get_cap_for_mds(ci, mds);
+       if (cap && __cap_is_valid(cap)) {
+               if ((cap->issued & drop) &&
+                   (cap->issued & unless) == 0) {
+                       dout(10, "encode_inode_release %p cap %p %s -> %s\n",
+                            inode, cap, ceph_cap_string(cap->issued),
+                            ceph_cap_string(cap->issued & ~drop));
+                       cap->issued &= ~drop;
+                       cap->implemented &= ~drop;
+
+                       rel->ino = cpu_to_le64(ceph_ino(inode));
+                       rel->cap_id = cpu_to_le64(cap->cap_id);
+                       rel->seq = cpu_to_le32(cap->seq);
+                       rel->mseq = cpu_to_le32(cap->mseq);
+                       rel->caps = cpu_to_le32(cap->issued);
+                       rel->dname_len = 0;
+                       rel->dname_seq = 0;
+                       *p += sizeof(*rel);
+                       ret = 1;
+               } else {
+                       dout(10, "encode_inode_release %p cap %p %s\n",
+                            inode, cap, ceph_cap_string(cap->issued));
+               }
+       }
+       spin_unlock(&inode->i_lock);
+       return ret;
+}
+
+int ceph_encode_dentry_release(void **p, struct dentry *dentry,
+                              int mds, int drop, int unless)
+{
+       struct inode *dir = dentry->d_parent->d_inode;
+       struct ceph_mds_request_release *rel = *p;
+       struct ceph_dentry_info *di = ceph_dentry(dentry);
+       int ret;
+
+       ret = ceph_encode_inode_release(p, dir, mds, drop, unless);
+
+       /* drop dentry lease too? */
+       spin_lock(&dentry->d_lock);
+       if (ret && di->lease_session && di->lease_session->s_mds == mds) {
+               dout(10, "encode_dentry_release %p mds%d seq %d\n",
+                    dentry, mds, (int)di->lease_seq);
+               rel->dname_len = cpu_to_le32(dentry->d_name.len);
+               memcpy(*p, dentry->d_name.name, dentry->d_name.len);
+               *p += dentry->d_name.len;
+               rel->dname_seq = cpu_to_le32(di->lease_seq);
+       }
+       spin_unlock(&dentry->d_lock);
+
+       return ret;
+}
index 3351479f21a8312ffe659471d89ac28bdcc3a0b3..efa2ef8b5e1277081bd58a460d0ecf2b71f0c017 100644 (file)
@@ -396,7 +396,7 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry)
 }
 
 static int ceph_mknod(struct inode *dir, struct dentry *dentry,
-                         int mode, dev_t rdev)
+                     int mode, dev_t rdev)
 {
        struct ceph_client *client = ceph_sb_to_client(dir->i_sb);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -409,7 +409,6 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        dout(5, "mknod in dir %p dentry %p mode 0%o rdev %d\n",
             dir, dentry, mode, rdev);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
-
        if (IS_ERR(req)) {
                d_drop(dentry);
                return PTR_ERR(req);
@@ -419,8 +418,8 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        req->r_locked_dir = dir;
        req->r_args.mknod.mode = cpu_to_le32(mode);
        req->r_args.mknod.rdev = cpu_to_le32(rdev);
-       if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+       req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
@@ -469,13 +468,12 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
                d_drop(dentry);
                return PTR_ERR(req);
        }
-
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        req->r_path2 = dest;
        req->r_locked_dir = dir;
-       if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+       req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
@@ -514,9 +512,8 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
        req->r_num_caps = 2;
        req->r_locked_dir = dir;
        req->r_args.mkdir.mode = cpu_to_le32(mode);
-
-       if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+       req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
@@ -549,9 +546,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        req->r_num_caps = 2;
        req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
        req->r_locked_dir = dir;
-
-       if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+       req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (err) {
                d_drop(dentry);
@@ -567,15 +563,15 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
  * than PIN) we don't specifically want (due to the file still being
  * open).
  */
-static void drop_caps_for_unlink(struct inode *inode)
+static int drop_caps_for_unlink(struct inode *inode)
 {
        int drop = CEPH_CAP_LINK_RDCACHE | CEPH_CAP_LINK_EXCL;
 
        spin_lock(&inode->i_lock);
-       if (inode->i_nlink == 1) 
+       if (inode->i_nlink == 1)
                drop |= ~(__ceph_caps_wanted(ceph_inode(inode)) | CEPH_CAP_PIN);
        spin_unlock(&inode->i_lock);
-       ceph_release_caps(inode, drop);
+       return drop;
 }
 
 /*
@@ -610,11 +606,9 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        req->r_locked_dir = dir;
-
-       if (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
-       ceph_mdsc_lease_release(mdsc, dir, dentry, CEPH_LOCK_DN);
-       drop_caps_for_unlink(inode);
+       req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       req->r_inode_drop = drop_caps_for_unlink(inode);
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                d_delete(dentry);
@@ -645,16 +639,14 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
        req->r_num_caps = 2;
        req->r_old_dentry = dget(old_dentry);
        req->r_locked_dir = new_dir;
-       if (!ceph_caps_issued_mask(ceph_inode(old_dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(old_dir, CEPH_CAP_FILE_RDCACHE);
-       ceph_mdsc_lease_release(mdsc, old_dir, old_dentry, CEPH_LOCK_DN);
-       if (!ceph_caps_issued_mask(ceph_inode(new_dir), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(new_dir, CEPH_CAP_FILE_RDCACHE);
-       ceph_mdsc_lease_release(mdsc, new_dir, new_dentry, CEPH_LOCK_DN);
+       req->r_old_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
+       req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+       req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        /* release LINK_RDCACHE on source inode (mds will lock it) */
-       ceph_release_caps(old_dentry->d_inode, CEPH_CAP_LINK_RDCACHE);
+       req->r_old_inode_drop = CEPH_CAP_LINK_RDCACHE;
        if (new_dentry->d_inode)
-               drop_caps_for_unlink(new_dentry->d_inode);
+               req->r_inode_drop = drop_caps_for_unlink(new_dentry->d_inode);
        err = ceph_mdsc_do_request(mdsc, old_dir, req);
        if (!err && !req->r_reply_info.head->is_dentry) {
                /*
index b770149c627860d2fc68e0431705445ddbeeeab9..919d89a8c1b0685793d8e6eb0d369093f6b04b08 100644 (file)
@@ -165,8 +165,6 @@ int ceph_open(struct inode *inode, struct file *file)
        spin_unlock(&inode->i_lock);
 
        dout(10, "open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
-       if (!ceph_caps_issued_mask(ceph_inode(inode), CEPH_CAP_FILE_EXCL))
-               ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
        req = prepare_open_request(inode->i_sb, flags, 0);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
@@ -217,9 +215,10 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
                return ERR_PTR(PTR_ERR(req));
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
-       if ((flags & O_CREAT) &&
-           (!ceph_caps_issued_mask(ceph_inode(dir), CEPH_CAP_FILE_EXCL)))
-               ceph_release_caps(dir, CEPH_CAP_FILE_RDCACHE);
+       if (flags & O_CREAT) {
+               req->r_dentry_drop = CEPH_CAP_FILE_RDCACHE;
+               req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       }
        req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
        err = ceph_mdsc_do_request(mdsc, parent_inode, req);
        dentry = ceph_finish_lookup(req, dentry, err);
index 1c7c49b3c39bd4a2f8f69e4197ec5ce6f341c0b1..50e06508fa9b68e03c07b5857f1dea76406dcc3e 100644 (file)
@@ -1437,9 +1437,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        spin_unlock(&inode->i_lock);
 
        if (mask) {
-               if (release)
-                       ceph_release_caps(inode, release);
                req->r_inode = igrab(inode);
+               req->r_inode_drop = release;
                req->r_args.setattr.mask = mask;
                req->r_num_caps = 1;
                err = ceph_mdsc_do_request(mdsc, parent_inode, req);
@@ -1795,6 +1794,7 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
        if (IS_ERR(req))
                return PTR_ERR(req);
        req->r_inode = igrab(inode);
+       req->r_inode_drop = CEPH_CAP_XATTR_RDCACHE;
        req->r_num_caps = 1;
        req->r_args.setxattr.flags = cpu_to_le32(flags);
 
@@ -1803,7 +1803,6 @@ int ceph_setxattr(struct dentry *dentry, const char *name,
        req->r_request->hdr.data_len = cpu_to_le32(size);
        req->r_request->hdr.data_off = cpu_to_le16(0);
 
-       ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
        err = ceph_mdsc_do_request(mdsc, parent_inode, req);
        ceph_mdsc_put_request(req);
 
@@ -1840,8 +1839,9 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
        if (IS_ERR(req))
                return PTR_ERR(req);
        req->r_inode = igrab(inode);
+       req->r_inode_drop = CEPH_CAP_XATTR_RDCACHE;
        req->r_num_caps = 1;
-       ceph_release_caps(inode, CEPH_CAP_XATTR_RDCACHE);
+
        err = ceph_mdsc_do_request(mdsc, parent_inode, req);
        ceph_mdsc_put_request(req);
        return err;
index 55e0991c0758224c7cea69595e4f6ee2ab1de57f..0dd70722d06e2924cea2508622e1508e2fe62450 100644 (file)
@@ -43,8 +43,9 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
        if (IS_ERR(req))
                return PTR_ERR(req);
        req->r_inode = igrab(inode);
+       req->r_inode_drop = CEPH_CAP_FILE_RDCACHE | CEPH_CAP_FILE_EXCL;
        req->r_args.setlayout.layout = layout;
-       ceph_release_caps(inode, CEPH_CAP_FILE_RDCACHE);
+
        err = ceph_mdsc_do_request(mdsc, parent_inode, req);
        ceph_mdsc_put_request(req);
        return err;
index c3dcefaa6e14d35d49be724b0fa75091c82bd9a1..64b3b3fc705aa19fbcbd5aaad106195c754117ba 100644 (file)
@@ -895,6 +895,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 
        if (!req)
                return ERR_PTR(-ENOMEM);
+
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1092,8 +1093,9 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        const char *path2 = req->r_path2;
        u64 ino1, ino2;
        int pathlen1, pathlen2;
-       int pathlen;
+       int len;
        int freepath1, freepath2;
+       u32 releases;
        void *p, *end;
        int ret;
 
@@ -1113,9 +1115,19 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                goto out_free1;
        }
 
-       pathlen = pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, sizeof(*head) + pathlen,
-                          0, 0, NULL);
+       len = sizeof(*head) +
+               pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
+
+       /* calculate (max) length for cap releases */
+       len += sizeof(struct ceph_mds_request_release) *
+               (!!req->r_inode_drop + !!req->r_dentry_drop +
+                !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
+       if (req->r_dentry_drop)
+               len += req->r_dentry->d_name.len;
+       if (req->r_old_dentry_drop)
+               len += req->r_old_dentry->d_name.len;
+
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
        if (IS_ERR(msg))
                goto out_free2;
 
@@ -1134,11 +1146,30 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 #endif
        head->args = req->r_args;
 
-       head->num_releases = 0;
        ceph_encode_filepath(&p, end, ino1, path1);
        ceph_encode_filepath(&p, end, ino2, path2);
 
-       BUG_ON(p != end);
+       /* cap releases */
+       releases = 0;
+       if (req->r_inode_drop)
+               releases += ceph_encode_inode_release(&p, 
+                     req->r_inode ? req->r_inode : req->r_dentry->d_inode,
+                     mds, req->r_inode_drop, req->r_inode_unless);
+       if (req->r_dentry_drop)
+               releases += ceph_encode_dentry_release(&p, req->r_dentry,
+                      mds, req->r_dentry_drop, req->r_dentry_unless);
+       if (req->r_old_dentry_drop)
+               releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
+                      mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
+       if (req->r_old_inode_drop)
+               releases += ceph_encode_inode_release(&p,
+                     req->r_old_dentry->d_inode,
+                     mds, req->r_old_inode_drop, req->r_old_inode_unless);
+       head->num_releases = cpu_to_le32(releases);
+
+       BUG_ON(p > end);
+       msg->front.iov_len = p - msg->front.iov_base;
+       msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
 out_free2:
        if (freepath2)
index e1a936d826bec16c4981e727a7b9ba0350a99d4c..3da02d88b4d1dc1809a2cff696078cc45d661752 100644 (file)
@@ -167,6 +167,12 @@ struct ceph_mds_request {
        struct ceph_vino r_ino1, r_ino2;
        union ceph_mds_request_args r_args;
 
+       int r_inode_drop, r_inode_unless;
+       int r_dentry_drop, r_dentry_unless;
+       int r_old_dentry_drop, r_old_dentry_unless;
+       struct inode *r_old_inode;
+       int r_old_inode_drop, r_old_inode_unless;
+       
        struct inode *r_target_inode;
 
        struct ceph_msg  *r_request;  /* original request */
index ed67f91ba8df6022fc0137b0ab8541d989a55de3..3691cb38b76c8c6b6d0ab5081c57fa2a225ab2b1 100644 (file)
@@ -790,6 +790,10 @@ static inline void ceph_release_caps(struct inode *inode, int mask)
 {
        ceph_check_caps(ceph_inode(inode), 1, mask, NULL);
 }
+extern int ceph_encode_inode_release(void **p, struct inode *inode,
+                                    int mds, int drop, int unless);
+extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
+                                     int mds, int drop, int unless);
 
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
                      loff_t endoff);
index b31e9efbb7707d10d27164044c5a114723ffb14e..bc157429db2bc9c297de67d8ef51f6701c0da539 100644 (file)
@@ -1588,6 +1588,44 @@ void Locker::handle_client_caps(MClientCaps *m)
   delete m;
 }
 
+void Locker::process_cap_update(int client, inodeno_t ino, __u64 cap_id, int caps, int seq, int mseq,
+                               const nstring& dname)
+{
+  CInode *in = mdcache->get_inode(ino);
+  if (!in)
+    return;
+  Capability *cap = in->get_client_cap(client);
+  if (cap) {
+    dout(10) << "process_cap_update client" << client << " " << ccap_string(caps) << " on " << *in << dendl;
+    
+    if (ceph_seq_cmp(mseq, cap->get_mseq()) < 0) {
+      dout(7) << " mseq " << mseq << " < " << cap->get_mseq() << ", dropping" << dendl;
+      return;
+    }
+    
+    cap->confirm_receipt(seq, caps);
+    
+    eval_cap_gather(in);
+    if (in->filelock.is_stable())
+      file_eval(&in->filelock);
+    if (in->authlock.is_stable())
+      eval(&in->authlock);
+  }
+  
+  if (dname.length()) {
+    frag_t fg = in->pick_dirfrag(dname);
+    CDir *dir = in->get_dirfrag(fg);
+    if (dir) {
+      CDentry *dn = dir->lookup(dname);
+      MDSCacheObject *p = dn;
+      ClientLease *l = p->get_client_lease(client);
+      if (l)
+       p->remove_client_lease(l, l->mask, this);
+    }
+  }
+
+}
+
 /*
  * update inode based on cap flush|flushsnap|wanted.
  *  adjust max_size, if needed.
index 6475e37c372003d06667e1ff4aa6f2afe23d0232..49dfee5605a3645699537dea7a6bf954ce1e43fd 100644 (file)
@@ -152,7 +152,20 @@ protected:
 public:
   void mark_updated_scatterlock(ScatterLock *lock);
 
+
+  // caps
+  void process_cap_update(int client, inodeno_t ino, __u64 cap_id, int caps, int seq, int mseq,
+                         const nstring& dname);
+
+ protected:
+  void handle_client_caps(class MClientCaps *m);
+  bool _do_cap_update(CInode *in, Capability *cap, int had, int wanted, snapid_t follows, MClientCaps *m,
+                     MClientCaps *ack=0);
+  void handle_client_cap_release(class MClientCapRelease *m);
+
+
   // local
+public:
   void local_wrlock_grab(LocalLock *lock, Mutation *mut);
 protected:
   bool local_wrlock_start(LocalLock *lock, MDRequest *mut);
@@ -184,12 +197,6 @@ public:
   void resume_stale_caps(Session *session);
   void remove_stale_leases(Session *session);
 
- protected:
-  void handle_client_caps(class MClientCaps *m);
-  bool _do_cap_update(CInode *in, Capability *cap, int had, int wanted, snapid_t follows, MClientCaps *m,
-                     MClientCaps *ack=0);
-  void handle_client_cap_release(class MClientCapRelease *m);
-
 public:
   void request_inode_file_caps(CInode *in);
 protected:
index c90646ccf8d7c2c6b3763fc2072cafd295818cc9..5f2db12aa01bf0fdb1849e53a9fac63a8f17382d 100644 (file)
@@ -834,6 +834,17 @@ void Server::handle_client_request(MClientRequest *req)
     session->trim_completed_requests(req->get_oldest_client_tid());
   }
 
+  // process embedded cap releases?
+  if (req->get_source().is_client()) {
+    int client = req->get_source().num();
+    for (vector<MClientRequest::Release>::iterator p = req->releases.begin();
+        p != req->releases.end();
+        p++) {
+      mds->locker->process_cap_update(client, inodeno_t((__u64)p->item.ino), p->item.cap_id, p->item.caps,
+                                     p->item.seq, p->item.mseq, p->dname);
+    }
+  }
+
   // register + dispatch
   MDRequest *mdr = mdcache->request_start(req);
   if (!mdr) return;