]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
make kernel client request redirection work
authorSage Weil <sage@newdream.net>
Mon, 12 May 2008 21:51:21 +0000 (14:51 -0700)
committerSage Weil <sage@newdream.net>
Mon, 12 May 2008 21:51:21 +0000 (14:51 -0700)
13 files changed:
src/TODO
src/client/Client.cc
src/kernel/dir.c
src/kernel/export.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.c
src/kernel/super.h
src/mds/MDS.cc
src/messages/MClientRequestForward.h
src/msg/SimpleMessenger.cc

index e17d35414814eebd195af8721634cacdac942401..a3c8d024bdabaa08c385c732cda4a9033b280f06 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -68,6 +68,7 @@ mon
 
 mds mustfix
 - replay of dir fragmentation  (dont want dir frozen, pins, etc.?)
+- fix rm -r vs mds exports
 - rename slave in-memory rollback on failure
 - proper handling of cache expire messages during rejoin phase?
   -> i think cache expires are fine; the rejoin_ack handler just has to behave if rejoining items go missing
index e2ca91029b0046e9a69dad8bc77a17e21e54f85d..4cee7310b05dd9c87b073be7597a972a9137372b 100644 (file)
@@ -942,7 +942,7 @@ void Client::handle_client_request_forward(MClientRequestForward *fwd)
   // reset retry counter
   request->retry_attempt = 0;
 
-  if (request->idempotent && 
+  if (!fwd->must_resend() && 
       mds_sessions.count(fwd->get_dest_mds())) {
     // dest mds has a session, and request was forwarded for us.
 
index 0497ed62e43d960a5cddbedf7be6ab8f9fa37d5a..8fc1d7aa3ab80f178ebff660c53c2229af503015 100644 (file)
@@ -113,16 +113,18 @@ nextfrag:
                struct ceph_mds_request *req;
                struct ceph_mds_request_head *rhead;
 
-               frag = ceph_choose_frag(ceph_inode(inode), frag);
+               frag = ceph_choose_frag(ceph_inode(inode), frag, 0);
 
                /* query mds */
                dout(10, "dir_readdir querying mds for ino %llx frag %x\n",
                     ceph_ino(inode), frag);
                req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_READDIR,
                                               ceph_ino(inode), "", 0, 0,
-                                              filp->f_dentry, 1, frag);
+                                              filp->f_dentry, USE_AUTH_MDS);
                if (IS_ERR(req))
                        return PTR_ERR(req);
+               req->r_direct_hash = frag_value(frag);
+               req->r_direct_is_hash = true;
                rhead = req->r_request->front.iov_base;
                rhead->args.readdir.frag = cpu_to_le32(frag);
                err = ceph_mdsc_do_request(mdsc, req);
@@ -252,7 +254,7 @@ struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
                req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSTAT,
                                               ceph_ino(dentry->d_inode), 0,
                                               0, 0,
-                                              dentry, 0, -1);
+                                              dentry, USE_CAP_MDS);
        } else {
                /* build path */
                path = ceph_build_dentry_path(dentry, &pathlen);
@@ -261,7 +263,7 @@ struct dentry *ceph_do_lookup(struct super_block *sb, struct dentry *dentry,
                req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSTAT,
                                               ceph_ino(sb->s_root->d_inode),
                                               path, 0, 0,
-                                              dentry, 0, -1);
+                                              dentry, USE_ANY_MDS);
                kfree(path);
        }
        if (IS_ERR(req))
@@ -334,7 +336,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD,
                                       ceph_ino(dir->i_sb->s_root->d_inode),
                                       path, 0, 0,
-                                      dentry, 1, -1);
+                                      dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req)) {
                d_drop(dentry);
@@ -385,7 +387,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK,
                                       ceph_ino(dir->i_sb->s_root->d_inode),
                                       path, 0, dest,
-                                      dentry, 1, -1);
+                                      dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req)) {
                d_drop(dentry);
@@ -416,7 +418,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, int mode)
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKDIR,
                                       ceph_ino(dir->i_sb->s_root->d_inode),
                                       path, 0, 0,
-                                      dentry, 1, -1);
+                                      dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req)) {
                d_drop(dentry);
@@ -457,7 +459,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
                                       path,
                                       ceph_ino(dir->i_sb->s_root->d_inode),
                                       oldpath,
-                                      dentry, 1, -1);
+                                      dentry, USE_AUTH_MDS);
        kfree(oldpath);
        kfree(path);
        if (IS_ERR(req)) {
@@ -501,7 +503,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
        req = ceph_mdsc_create_request(mdsc, op,
                                       ceph_ino(dir->i_sb->s_root->d_inode),
                                       path, 0, 0,
-                                      dentry, 1, -1);
+                                      dentry, USE_AUTH_MDS);
        kfree(path);
        if (IS_ERR(req))
                return PTR_ERR(req);
@@ -541,7 +543,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RENAME,
                                       ceph_ino(root->d_inode), oldpath,
                                       ceph_ino(root->d_inode), newpath,
-                                      new_dentry, 1, -1);
+                                      new_dentry, USE_AUTH_MDS);
        kfree(oldpath);
        kfree(newpath);
        if (IS_ERR(req))
index bb0f5cc0bac35a9d0f95ec92383d104084c8b6b1..d14afc18c6c9b30b1d8a45b834bd5911515d0d5f 100644 (file)
@@ -70,7 +70,7 @@ struct dentry *__fh_to_dentry(struct super_block *sb,
                req = ceph_mdsc_create_request(mdsc,
                                               CEPH_MDS_OP_FINDINODE,
                                               len, (char *)fh, 0, 0,
-                                              NULL, 0, -1);
+                                              NULL, USE_ANY_MDS);
                if (IS_ERR(req))
                        return ERR_PTR(PTR_ERR(req));
                err = ceph_mdsc_do_request(mdsc, req);
index b1522058197ce8f45ab5f51aa54ef7809a07304c..bac8d29c8947987d0bfcc620bcd494a77251bf20 100644 (file)
@@ -26,10 +26,10 @@ prepare_open_request(struct super_block *sb, struct dentry *dentry,
        int pathlen;
        struct ceph_mds_request *req;
        struct ceph_mds_request_head *rhead;
-       int want_auth = 0;
+       int want_auth = USE_ANY_MDS;
 
        if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
-               want_auth = 1;
+               want_auth = USE_AUTH_MDS;
 
        dout(5, "prepare_open_request dentry %p name '%s' flags %d\n", dentry,
             dentry->d_name.name, flags);
@@ -39,7 +39,7 @@ prepare_open_request(struct super_block *sb, struct dentry *dentry,
                return ERR_PTR(PTR_ERR(path));
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN, pathbase, path,
                                       0, 0,
-                                      dentry, want_auth, -1);
+                                      dentry, want_auth);
        req->r_expects_cap = 1;
        req->r_fmode = ceph_flags_to_mode(flags);
        kfree(path);
index f2fbd97da117e4f8be6f6aa10a5ec56425ef2570..cd72691be26916624bd8133539b9dd2304c03837 100644 (file)
@@ -99,7 +99,8 @@ struct ceph_inode_frag *ceph_get_frag(struct ceph_inode_info *ci, u32 f)
        return frag;
 }
 
-__u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v)
+__u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+                      struct ceph_inode_frag **pfrag)
 {
        u32 t = frag_make(0, 0);
        struct ceph_inode_frag *frag;
@@ -110,8 +111,13 @@ __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v)
        while (1) {
                WARN_ON(!frag_contains_value(t, v));
                frag = ceph_find_frag(ci, t);
-               if (!frag || frag->split_by == 0)
+               if (!frag)
                        break; /* t is a leaf */
+               if (frag->split_by == 0) {
+                       if (pfrag)
+                               *pfrag = frag;
+                       break;
+               }
 
                /* choose child */
                nway = 1 << frag->split_by;
@@ -133,6 +139,11 @@ __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v)
        return t;
 }
 
+/*
+ * process dirfrag (delegation) info.  include leaf fragment in tree
+ * ONLY if mds >= 0 || ndist > 0.  (otherwise, only branches/splits
+ * are included in i_fragtree)
+ */
 static int ceph_fill_dirfrag(struct inode *inode,
                             struct ceph_mds_reply_dirfrag *dirinfo)
 {
@@ -150,13 +161,13 @@ static int ceph_fill_dirfrag(struct inode *inode,
                        return 0;
                if (frag->split_by == 0) {
                        /* tree leaf, remove */
-                       dout(20, "removed %llx frag %x (no referral)\n",
+                       dout(20, "fill_dirfrag removed %llx frag %x (no ref)\n",
                             ceph_ino(inode), id);
                        rb_erase(&frag->node, &ci->i_fragtree);
                        kfree(frag);
                } else {
                        /* tree branch, keep */
-                       dout(20, "cleared %llx frag %x referral\n",
+                       dout(20, "fill_dirfrag cleared %llx frag %x referral\n",
                             ceph_ino(inode), id);
                        frag->mds = -1;
                        frag->ndist = 0;
@@ -168,7 +179,7 @@ static int ceph_fill_dirfrag(struct inode *inode,
        /* find/add this frag to store mds delegation info */
        frag = ceph_get_frag(ci, id);
        if (!frag) {
-               derr(0, "ENOMEM on mds referral ino %llx frag %x\n",
+               derr(0, "fill_dirfrag ENOMEM on mds ref ino %llx frag %x\n",
                     ceph_ino(inode), le32_to_cpu(dirinfo->frag));
                return -ENOMEM;
        } else {
@@ -176,7 +187,7 @@ static int ceph_fill_dirfrag(struct inode *inode,
                frag->ndist = min_t(u32, ndist, MAX_DIRFRAG_REP);
                for (i = 0; i < frag->ndist; i++)
                        frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
-               dout(20, "set %llx frag %x referral mds %d ndist=%d\n",
+               dout(20, "fill_dirfrag %llx frag %x referral mds %d ndist=%d\n",
                     ceph_ino(inode), frag->frag, frag->mds, frag->ndist);
        }
        return 0;
@@ -810,6 +821,22 @@ static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds)
        return 0;
 }
 
+int ceph_get_cap_mds(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_cap *cap;
+       int mds = -1;
+
+       spin_lock(&inode->i_lock);
+       if (!list_empty(&ci->i_caps)) {
+               cap = list_first_entry(&ci->i_caps, struct ceph_inode_cap,
+                                      ci_caps);
+               mds = cap->mds;
+       }
+       spin_unlock(&inode->i_lock);
+       return mds;
+}
+
 /*
  * caller shoudl hold session s_mutex.
  */
@@ -1494,14 +1521,14 @@ static struct ceph_mds_request *prepare_setattr(struct ceph_mds_client *mdsc,
                req = ceph_mdsc_create_request(mdsc, op,
                                               ceph_ino(dentry->d_inode), "",
                                               0, 0,
-                                              dentry, 1, -1);
+                                              dentry, USE_CAP_MDS);
        } else {
                dout(5, "prepare_setattr dentry %p (full path)\n", dentry);
                path = ceph_build_dentry_path(dentry, &pathlen);
                if (IS_ERR(path))
                        return ERR_PTR(PTR_ERR(path));
                req = ceph_mdsc_create_request(mdsc, op, baseino, path, 0, 0,
-                                              dentry, 1, -1);
+                                              dentry, USE_ANY_MDS);
                kfree(path);
        }
        return req;
index 17772f5b5834fd9bcce4ff6edef1e6da50f96078..77055bb4aa1b9bca5a7bb26c72b453071919fcbd 100644 (file)
@@ -417,8 +417,9 @@ static struct ceph_mds_request *new_request(struct ceph_msg *msg)
        req->r_request = msg;
        req->r_reply = 0;
        req->r_direct_dentry = 0;
-       req->r_direct_auth = 1;
-       req->r_direct_frag = -1;
+       req->r_direct_mode = USE_ANY_MDS;
+       req->r_direct_hash = 0;
+       req->r_direct_is_hash = false;
        req->r_last_inode = 0;
        req->r_last_dentry = 0;
        req->r_old_dentry = 0;
@@ -458,6 +459,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
        ceph_mdsc_put_request(req);
 }
 
+static bool have_session(struct ceph_mds_client *mdsc, int mds)
+{
+       if (mds >= mdsc->max_sessions)
+               return false;
+       return mdsc->sessions[mds] ? true:false;
+}
+
 
 /*
  * choose mds to send request to next
@@ -465,16 +473,83 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
 static int choose_mds(struct ceph_mds_client *mdsc,
                      struct ceph_mds_request *req)
 {
-       int mds;
+       int mds = -1;
+       u32 hash = req->r_direct_hash;
+       bool is_hash = req->r_direct_is_hash;
+       struct dentry *dentry = req->r_direct_dentry;
+       struct ceph_inode_info *ci;
+       struct ceph_inode_frag *frag = 0;
+       int mode = req->r_direct_mode;
 
        /* is there a specific mds we should try? */
        if (req->r_resend_mds >= 0 &&
-           ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0) {
-               dout(20, "using resend_mds mds%d\n", req->r_resend_mds);
+           (!have_session(mdsc, req->r_resend_mds) ||
+            ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
+               dout(20, "choose_mds using resend_mds mds%d\n",
+                    req->r_resend_mds);
                return req->r_resend_mds;
        }
 
-       /* pick one at random */
+       if (mode == USE_CAP_MDS) {
+               mds = ceph_get_cap_mds(dentry->d_inode);
+               if (mds >= 0) {
+                       dout(20, "choose_mds %p %llx mds%d (cap)\n", 
+                            dentry->d_inode, ceph_ino(dentry->d_inode), mds);
+                       return mds;
+               }
+               derr(0, "choose_mds %p %llx has NO CAPS, using auth\n",
+                    dentry->d_inode, ceph_ino(dentry->d_inode));
+               WARN_ON(1);
+               mode = USE_AUTH_MDS;
+       }
+
+       if (mode == USE_RANDOM_MDS)
+               goto random;
+       
+       while (dentry) {
+               if (is_hash &&
+                   dentry->d_inode &&
+                   S_ISDIR(dentry->d_inode->i_mode)) {
+                       ci = ceph_inode(dentry->d_inode);
+                       ceph_choose_frag(ci, hash, &frag);
+                       if (frag) {
+                               /* avoid hitting dir replicas on dir
+                                * auth delegation point.. mds will
+                                * likely forward anyway to avoid
+                                * twiddling scatterlock */
+                               if (mode == USE_ANY_MDS && frag->ndist > 0 &&
+                                   dentry != req->r_direct_dentry) {
+                                       u8 r;
+                                       get_random_bytes(&r, 1);
+                                       r %= frag->ndist;
+                                       mds = frag->dist[r];
+                                       dout(20, "choose_mds %p %llx frag %u "
+                                            "mds%d (%d/%d)\n", dentry->d_inode,
+                                            ceph_ino(&ci->vfs_inode),
+                                            frag->frag, frag->mds,
+                                            (int)r, frag->ndist);
+                                       return mds;
+                               }
+                               mode = USE_AUTH_MDS;
+                               if (frag->mds >= 0) {
+                                       mds = frag->mds;
+                                       dout(20, "choose_mds %p %llx frag %u "
+                                            "mds%d (auth)\n", dentry->d_inode,
+                                            ceph_ino(&ci->vfs_inode),
+                                            frag->frag, mds);
+                                       return mds;
+                               }
+                       }
+               }
+               if (IS_ROOT(dentry))
+                       break;
+               hash = dentry->d_name.hash;
+               is_hash = true;
+               dentry = dentry->d_parent;
+       }
+
+       /* ok, just pick one at random */
+random:
        mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
        dout(20, "choose_mds chose random mds%d\n", mds);
        return mds;
@@ -869,7 +944,7 @@ struct ceph_mds_request *
 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
                         ceph_ino_t ino1, const char *path1,
                         ceph_ino_t ino2, const char *path2,
-                        struct dentry *ref, int want_auth, int want_frag)
+                        struct dentry *ref, int mode)
 {
        struct ceph_msg *msg;
        struct ceph_mds_request *req;
@@ -905,8 +980,8 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
        if (ref)
                dget(ref);
        req->r_direct_dentry = ref;
-       req->r_direct_auth = want_auth;
-       req->r_direct_frag = want_frag;
+       req->r_direct_mode = mode;
+       req->r_direct_hash = -1;
 
        /* encode head */
        head->client_inst = mdsc->client->msgr->inst;
@@ -1148,6 +1223,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
        __u64 tid;
        __u32 next_mds;
        __u32 fwd_seq;
+       __u8 must_resend;
        int err = -EINVAL;
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
@@ -1158,6 +1234,7 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
        ceph_decode_64(&p, tid);
        ceph_decode_32(&p, next_mds);
        ceph_decode_32(&p, fwd_seq);
+       ceph_decode_8(&p, must_resend);
 
        /* handle */
        req = find_request_and_lock(mdsc, tid);
@@ -1165,25 +1242,25 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
                return;  /* dup reply? */
 
        /* do we have a session with the dest mds? */
-       if (next_mds < mdsc->max_sessions &&
-           mdsc->sessions[next_mds] &&
-           mdsc->sessions[next_mds]->s_state == CEPH_MDS_SESSION_OPEN) {
-               /* yes.  adjust mds set */
-               if (fwd_seq > req->r_num_fwd) {
-                       dout(10, "forward %llu to mds%d\n", tid, next_mds);
-                       req->r_num_fwd = fwd_seq;
-                       req->r_resend_mds = next_mds;
-                       put_request_sessions(req);
-                       req->r_session = __get_session(mdsc, next_mds);
-                       req->r_fwd_session = __get_session(mdsc, from_mds);
-               } else
-                       dout(10, "forward %llu to mds%d - old seq %d <= %d\n",
-                            tid, next_mds, req->r_num_fwd, fwd_seq);
+       /* yes.  adjust mds set, but mds will do the forward. */
+       if (fwd_seq <= req->r_num_fwd) {
+               dout(10, "forward %llu to mds%d - old seq %d <= %d\n",
+                    tid, next_mds, req->r_num_fwd, fwd_seq);
+               spin_unlock(&mdsc->lock);
+       } else if (!must_resend && 
+                  have_session(mdsc, next_mds) &&
+                  mdsc->sessions[next_mds]->s_state == CEPH_MDS_SESSION_OPEN) {
+               dout(10, "forward %llu to mds%d (mds fwded)\n", tid, next_mds);
+               req->r_num_fwd = fwd_seq;
+               req->r_resend_mds = next_mds;
+               put_request_sessions(req);
+               req->r_session = __get_session(mdsc, next_mds);
+               req->r_fwd_session = __get_session(mdsc, from_mds);
                spin_unlock(&mdsc->lock);
        } else {
                /* no, resend. */
                /* forward race not possible; mds would drop */
-               dout(10, "forward %llu to mds%d (no session)\n", tid, next_mds);
+               dout(10, "forward %llu to mds%d (we resend)\n", tid, next_mds);
                BUG_ON(fwd_seq <= req->r_num_fwd);
                put_request_sessions(req);
                req->r_resend_mds = next_mds;
index f7aa6c7dec640aaa70288b03eb74d3947c758fbd..ff15ee9cadbf8df48db4ed3512726a0910dedfff 100644 (file)
@@ -70,6 +70,13 @@ struct ceph_mds_session {
 /*
  * an in-flight request
  */
+enum {
+       USE_CAP_MDS,
+       USE_ANY_MDS,
+       USE_AUTH_MDS,
+       USE_RANDOM_MDS
+};
+
 struct ceph_mds_request {
        __u64             r_tid;
        struct ceph_msg  *r_request;  /* original request */
@@ -78,8 +85,9 @@ struct ceph_mds_request {
 
        /* to direct request */
        struct dentry *r_direct_dentry;
-       int r_direct_auth;
-       int r_direct_frag;
+       int r_direct_mode;
+       u32 r_direct_hash;
+       bool r_direct_is_hash;
 
        struct inode     *r_last_inode;
        struct dentry    *r_last_dentry;
@@ -146,7 +154,7 @@ extern struct ceph_mds_request *
 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op,
                         ceph_ino_t ino1, const char *path1,
                         ceph_ino_t ino2, const char *path2,
-                        struct dentry *ref, int want_auth, int want_frag);
+                        struct dentry *ref, int want_auth);
 extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
                                struct ceph_mds_request *req);
 extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
index be01057dc166debb930ffb8cfcba0a09c25a12b1..38636b22dbac09d2e801c3979b44d5453a9e7deb 100644 (file)
@@ -640,7 +640,7 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
        dout(30, "open_root_inode opening '%s'\n", args->path);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN,
                                       1, args->path, 0, 0,
-                                      NULL, 1, -1);
+                                      NULL, USE_ANY_MDS);
        if (IS_ERR(req))
                return ERR_PTR(PTR_ERR(req));
        req->r_expects_cap = 1;
index c06ea22ea13a65253ebfd657ac928616fcdd8652..89046320dacf7e0256649b4a88d54bc00d12c86a 100644 (file)
@@ -156,6 +156,10 @@ struct ceph_inode_cap {
 
 #define MAX_DIRFRAG_REP 4
 
+/*
+ * a _leaf_ frag will be present in the i_fragtree IFF there is
+ * delegation info.  that is, if mds >= 0 || ndist > 0.
+ */
 struct ceph_inode_frag {
        struct rb_node node;
 
@@ -238,7 +242,8 @@ static inline struct ceph_inode_frag *ceph_find_frag(struct ceph_inode_info *ci,
        return NULL;
 }
 
-extern __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v);
+extern __u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+                             struct ceph_inode_frag **pfrag);
 
 struct ceph_dentry_info {
        struct dentry *dentry;
@@ -418,6 +423,7 @@ extern struct ceph_inode_cap *ceph_add_cap(struct inode *inode,
 extern void __ceph_remove_cap(struct ceph_inode_cap *cap);
 extern void ceph_remove_cap(struct ceph_inode_cap *cap);
 extern void ceph_remove_all_caps(struct ceph_inode_info *ci);
+extern int ceph_get_cap_mds(struct inode *inode);
 extern int ceph_handle_cap_grant(struct inode *inode,
                                 struct ceph_mds_file_caps *grant,
                                 struct ceph_mds_session *session);
index 4a5b7c3af5769c967d7a3eef27353bb71e79edc5..7091fdf467156a18698997e49fd1430fec577ad8 100644 (file)
@@ -254,16 +254,20 @@ void MDS::forward_message_mds(Message *req, int mds)
     MClientRequest *creq = (MClientRequest*)req;
     creq->inc_num_fwd();    // inc forward counter
 
+    /*
+     * don't actually forward if non-idempotent!
+     * client has to do it.  although the MDS will ignore duplicate requests,
+     * the affected metadata may migrate, in which case the new authority
+     * won't have the metareq_id in the completed request map.
+     */
+    bool client_must_resend = !creq->is_idempotent();
+
     // tell the client where it should go
-    messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd()),
+    messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd(),
+                                                     client_must_resend),
                            creq->get_client_inst());
     
-    if (!creq->is_idempotent()) {
-      /* don't actually forward if non-idempotent!
-       * client has to do it.  although the MDS will ignore duplicate requests,
-       * the affected metadata may migrate, in which case the new authority
-       * won't have the metareq_id in the completed request map.
-       */
+    if (client_must_resend) {
       delete req;
       return; 
     }
index 5bc257d42b79ea8a6353f098e5abf64b94481849..761a2739dcc4c6388195134a9d527e6a87310d8e 100644 (file)
@@ -20,22 +20,25 @@ class MClientRequestForward : public Message {
   tid_t tid;
   int32_t dest_mds;
   int32_t num_fwd;
+  bool client_must_resend;
 
  public:
   MClientRequestForward() : Message(CEPH_MSG_CLIENT_REQUEST_FORWARD) {}
-  MClientRequestForward(tid_t t, int dm, int nf) : 
+  MClientRequestForward(tid_t t, int dm, int nf, bool cmr) : 
     Message(CEPH_MSG_CLIENT_REQUEST_FORWARD),
-    tid(t), dest_mds(dm), num_fwd(nf) { }
+    tid(t), dest_mds(dm), num_fwd(nf), client_must_resend(cmr) { }
 
   tid_t get_tid() { return tid; }
   int get_dest_mds() { return dest_mds; }
   int get_num_fwd() { return num_fwd; }
+  bool must_resend() { return client_must_resend; }
 
   const char *get_type_name() { return "cfwd"; }
   void print(ostream& o) {
     o << "client_request_forward(" << tid
       << " to " << dest_mds
       << " num_fwd=" << num_fwd
+      << (client_must_resend ? " client_must_resend":"")
       << ")";
   }
 
@@ -43,6 +46,7 @@ class MClientRequestForward : public Message {
     ::encode(tid, payload);
     ::encode(dest_mds, payload);
     ::encode(num_fwd, payload);
+    ::encode(client_must_resend, payload);
   }
 
   void decode_payload() {
@@ -50,6 +54,7 @@ class MClientRequestForward : public Message {
     ::decode(tid, p);
     ::decode(dest_mds, p);
     ::decode(num_fwd, p);
+    ::decode(client_must_resend, p);
   }
 };
 
index 72ea9c2562eb06611b71e577881a91e0dd45d2bc..8dd93626479cc4f3219771ed1130ae78a0ebb239 100644 (file)
@@ -1360,7 +1360,7 @@ void Rank::Pipe::reader()
          // first message?
          if (rank.need_addr) {
            entity->_myinst.addr = rank.rank_addr = m->get_dest_inst().addr;
-           dout(0) << "reader my rank addr is " << rank.rank_addr << dendl;
+           dout(2) << "reader my rank addr is " << rank.rank_addr << dendl;
            rank.need_addr = false;
          }