]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: send release anytime we get a caps message and have no inode or cap
authorSage Weil <sage@newdream.net>
Tue, 6 Jan 2009 22:40:06 +0000 (14:40 -0800)
committerSage Weil <sage@newdream.net>
Tue, 6 Jan 2009 22:40:06 +0000 (14:40 -0800)
The exceptions are IMPORT, EXPORT, and FLUSHSNAP_ACK.

src/client/Client.cc
src/kernel/caps.c

index c40f3f0c7663a62020109c0e3a3c2dd9207015e4..28a3638d6056ef36950ce3b7bb8924873eab45bd 100644 (file)
@@ -2024,23 +2024,31 @@ void Client::handle_caps(MClientCaps *m)
   if (!in) {
     dout(5) << "handle_caps don't have vino " << vino << dendl;
 
-    if (m->get_op() == CEPH_CAP_OP_IMPORT) {
-      // release.
-      m->set_op(CEPH_CAP_OP_RELEASE);
-      m->head.caps = 0;
-      m->head.dirty = 0;
-      messenger->send_message(m, m->get_source_inst());
-    } else
-      delete m;
+    // release.
+    m->set_op(CEPH_CAP_OP_RELEASE);
+    m->head.caps = 0;
+    m->head.dirty = 0;
+    messenger->send_message(m, m->get_source_inst());
     return;
   }
 
   switch (m->get_op()) {
   case CEPH_CAP_OP_IMPORT: return handle_cap_import(in, m);
   case CEPH_CAP_OP_EXPORT: return handle_cap_export(in, m);
+  case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(in, m);
+  }
+
+  if (in->caps.count(mds) == 0) {
+    m->set_op(CEPH_CAP_OP_RELEASE);
+    m->head.caps = 0;
+    m->head.dirty = 0;
+    messenger->send_message(m, m->get_source_inst());
+    return;
+  }
+
+  switch (m->get_op()) {
   case CEPH_CAP_OP_TRUNC: return handle_cap_trunc(in, m);
   case CEPH_CAP_OP_GRANT: return handle_cap_grant(in, m);
-  case CEPH_CAP_OP_FLUSHSNAP_ACK: return handle_cap_flushsnap_ack(in, m);
   case CEPH_CAP_OP_FLUSH_ACK: return handle_cap_flush_ack(in, m);
   default:
     delete m;
@@ -2179,11 +2187,7 @@ void Client::handle_cap_flushsnap_ack(Inode *in, MClientCaps *m)
 void Client::handle_cap_grant(Inode *in, MClientCaps *m)
 {
   int mds = m->get_source().num();
-  if (in->caps.count(mds) == 0) {
-    dout(5) << "handle_cap_grant on ino " << m->get_ino() << " no cap for mds" << mds << dendl;
-    delete m;
-    return;
-  }
+  assert(in->caps.count(mds) == 0);
   InodeCap *cap = in->caps[mds];
   cap->seq = m->get_seq();
   in->inode.layout = m->get_layout();
index fcdde37553db8d7b628c1381e32c7fb69a22da89..5567a576cfffc06477156f9f624c9c25884e27fb 100644 (file)
@@ -1058,9 +1058,11 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
  *  1 - send the msg back to mds
  */
 static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
-                           struct ceph_mds_session *session)
+                           struct ceph_mds_session *session,
+                           struct ceph_cap *cap)
+       __releases(inode->i_lock)
+
 {
-       struct ceph_cap *cap;
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
        int seq = le32_to_cpu(grant->seq);
@@ -1083,7 +1085,6 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
             inode, ci, mds, seq, ceph_cap_string(newcaps));
        dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
                inode->i_size);
-       spin_lock(&inode->i_lock);
 start:
 
        /* do we have this cap? */
@@ -1261,20 +1262,18 @@ out:
  * MDS has been safely recorded.
  */
 static void handle_cap_flush_ack(struct inode *inode,
-                               struct ceph_mds_caps *m,
-                               struct ceph_mds_session *session)
+                                struct ceph_mds_caps *m,
+                                struct ceph_mds_session *session,
+                                struct ceph_cap *cap)
+       __releases(inode->i_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_cap *cap;
        unsigned seq = le32_to_cpu(m->seq);
        int caps = le32_to_cpu(m->caps);
        int dirty = le32_to_cpu(m->dirty);
        int cleaned = dirty & ~caps;
        int removed_last = 0;
 
-       spin_lock(&inode->i_lock);
-       cap = __get_cap_for_mds(inode, session->s_mds);
-       BUG_ON(!cap);
        dout(10, "handle_cap_flush_ack inode %p mds%d seq %d cleaned %s,"
             " flushing %s -> %s\n",
             inode, session->s_mds, seq, ceph_cap_string(cleaned),
@@ -1346,6 +1345,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode,
 static void handle_cap_trunc(struct inode *inode,
                             struct ceph_mds_caps *trunc,
                             struct ceph_mds_session *session)
+       __releases(inode->i_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -1365,7 +1365,6 @@ static void handle_cap_trunc(struct inode *inode,
         * if its an expansion, and there is no truncate pending, we
         * don't need to truncate.
         */
-       spin_lock(&inode->i_lock);
        if (ci->i_vmtruncate_to < 0 && size > inode->i_size) {
                dout(10, "clean fwd truncate, no vmtruncate needed\n");
        } else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to) {
@@ -1494,6 +1493,7 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        struct super_block *sb = mdsc->client->sb;
        struct ceph_mds_session *session;
        struct inode *inode;
+       struct ceph_cap *cap;
        struct ceph_mds_caps *h;
        int mds = le32_to_cpu(msg->hdr.src.name.num);
        int op;
@@ -1535,50 +1535,23 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        dout(20, " op %s ino %llx inode %p\n", ceph_cap_op_name(op), vino.ino,
             inode);
        if (!inode) {
-               dout(10, " i don't have ino %llx\n", vino.ino);
-               if (op == CEPH_CAP_OP_IMPORT)
-                       send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
-                                    0, 0, 0,
-                                    seq, 0,
-                                    size, 0, NULL, NULL, 0,
-                                    0, 0, 0,
-                                    0, mds);
-               goto no_inode;
+               dout(10, " i don't have ino %llx, sending release\n", vino.ino);
+               goto release;
        }
 
+       /* these will work even if we don't have a cap yet */
        switch (op) {
-       case CEPH_CAP_OP_GRANT:
-               up_write(&mdsc->snap_rwsem);
-               if (handle_cap_grant(inode, h, session) == 1) {
-                       dout(10, " sending reply back to mds%d\n", mds);
-                       ceph_msg_get(msg);
-                       ceph_send_msg_mds(mdsc, msg, mds);
-               }
-               break;
-
-       case CEPH_CAP_OP_TRUNC:
-               up_write(&mdsc->snap_rwsem);
-               handle_cap_trunc(inode, h, session);
-               break;
-
-       case CEPH_CAP_OP_FLUSH_ACK:
-               handle_cap_flush_ack(inode, h, session);
-               up_write(&mdsc->snap_rwsem);
-               if (list_empty(&session->s_caps))
-                       ceph_mdsc_flushed_all_caps(mdsc, session);
-               break;
-
        case CEPH_CAP_OP_FLUSHSNAP_ACK:
                handle_cap_flushsnap_ack(inode, h, session);
                up_write(&mdsc->snap_rwsem);
-               break;
+               goto done;
 
        case CEPH_CAP_OP_EXPORT:
                handle_cap_export(inode, h, session);
                up_write(&mdsc->snap_rwsem);
                if (list_empty(&session->s_caps))
                        ceph_mdsc_flushed_all_caps(mdsc, session);
-               break;
+               goto done;
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
@@ -1586,14 +1559,50 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
                                  le32_to_cpu(h->snap_trace_len));
                up_write(&mdsc->snap_rwsem);
                check_caps = 1; /* we may have sent a RELEASE to the old auth */
+               goto done;
+
+       }
+
+       /* the rest require a cap */
+       spin_lock(&inode->i_lock);
+       cap = __get_cap_for_mds(inode, mds);
+       if (!cap) {
+               dout(10, "no cap on %p ino %llx.%llx from mds%d, releasing\n",
+                    inode, ceph_ino(inode), ceph_snap(inode), mds);
+               spin_unlock(&inode->i_lock);
+               goto release;
+       }
+       
+       /* note that each of these drops i_lock for us */
+       switch (op) {
+       case CEPH_CAP_OP_GRANT:
+               up_write(&mdsc->snap_rwsem);
+               if (handle_cap_grant(inode, h, session, cap) == 1) {
+                       dout(10, " sending reply back to mds%d\n", mds);
+                       ceph_msg_get(msg);
+                       ceph_send_msg_mds(mdsc, msg, mds);
+               }
+               break;
+
+       case CEPH_CAP_OP_FLUSH_ACK:
+               handle_cap_flush_ack(inode, h, session, cap);
+               up_write(&mdsc->snap_rwsem);
+               if (list_empty(&session->s_caps))
+                       ceph_mdsc_flushed_all_caps(mdsc, session);
+               break;
+
+       case CEPH_CAP_OP_TRUNC:
+               up_write(&mdsc->snap_rwsem);
+               handle_cap_trunc(inode, h, session);
                break;
 
        default:
+               spin_unlock(&inode->i_lock);
                up_write(&mdsc->snap_rwsem);
                derr(10, " unknown cap op %d %s\n", op, ceph_cap_op_name(op));
        }
 
-no_inode:
+done:
        mutex_unlock(&session->s_mutex);
        ceph_put_mds_session(session);
 
@@ -1606,6 +1615,15 @@ no_inode:
 bad:
        derr(10, "corrupt caps message\n");
        return;
+
+release:
+       send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
+                    0, 0, 0,
+                    seq, 0,
+                    size, 0, NULL, NULL, 0,
+                    0, 0, 0,
+                    0, mds);
+       goto done;
 }