]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use a per-connection ops vector
authorSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 00:18:47 +0000 (17:18 -0700)
committerSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 00:18:47 +0000 (17:18 -0700)
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/mon_client.c
src/kernel/mon_client.h
src/kernel/osd_client.c
src/kernel/super.c
src/kernel/super.h

index 3fedc0e0379b224f9fc902614fdc436226769d90..df7da75a49f4cfe710e8af15d9dd5abcfdc28b1a 100644 (file)
@@ -37,6 +37,9 @@
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head);
 
+const static struct ceph_connection_operations mds_con_ops;
+
+
 /*
  * mds reply parsing
  */
@@ -297,32 +300,6 @@ static bool __have_session(struct ceph_mds_client *mdsc, int mds)
        return mdsc->sessions[mds];
 }
 
-static void con_get(struct ceph_connection *con)
-{
-       struct ceph_mds_session *s = con->private;
-
-       ceph_get_mds_session(s);
-}
-
-static void con_put(struct ceph_connection *con)
-{
-       struct ceph_mds_session *s = con->private;
-
-       ceph_put_mds_session(s);
-}
-
-/*
- * if the client is unresponsive for long enough, the mds will kill
- * the session entirely.
- */
-static void peer_reset(void *p, struct ceph_connection *con)
-{
-       struct ceph_mds_session *s = p;
-
-       pr_err("ceph mds%d gave us the boot.  IMPLEMENT RECONNECT.\n",
-              s->s_mds);
-}
-
 /*
  * create+register a new session for given mds.
  * called under mdsc->mutex.
@@ -333,6 +310,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        struct ceph_mds_session *s;
 
        s = kzalloc(sizeof(*s), GFP_NOFS);
+       s->s_mdsc = mdsc;
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
        s->s_ttl = 0;
@@ -342,9 +320,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        ceph_con_init(mdsc->client->msgr, &s->s_con,
                      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
        s->s_con.private = s;
-       s->s_con.peer_reset = peer_reset;
-       s->s_con.get = con_get;
-       s->s_con.put = con_put;
+       s->s_con.ops = &mds_con_ops;
        s->s_con.peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MDS);
        s->s_con.peer_name.num = cpu_to_le32(mds);
 
@@ -1652,7 +1628,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
  * This preserves the logical ordering of replies, capabilities, etc., sent
  * by the MDS as they are applied to our local cache.
  */
-void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        struct ceph_mds_request *req;
        struct ceph_mds_reply_head *head = msg->front.iov_base;
@@ -1807,8 +1783,7 @@ out:
 /*
  * handle mds notification that our request has been forwarded.
  */
-void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
-                             struct ceph_msg *msg)
+static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        struct ceph_mds_request *req;
        u64 tid;
@@ -1874,8 +1849,7 @@ bad:
 /*
  * handle a mds session control message
  */
-void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
-                             struct ceph_msg *msg)
+static void handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        u32 op;
        u64 seq;
@@ -2315,7 +2289,7 @@ void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
        di->lease_session = NULL;
 }
 
-void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
        struct super_block *sb = mdsc->client->sb;
        struct inode *inode;
@@ -2882,5 +2856,78 @@ bad:
        return;
 }
 
+static void con_get(struct ceph_connection *con)
+{
+       struct ceph_mds_session *s = con->private;
+
+       ceph_get_mds_session(s);
+}
+
+static void con_put(struct ceph_connection *con)
+{
+       struct ceph_mds_session *s = con->private;
+
+       ceph_put_mds_session(s);
+}
+
+/*
+ * if the client is unresponsive for long enough, the mds will kill
+ * the session entirely.
+ */
+static void peer_reset(struct ceph_connection *con)
+{
+       struct ceph_mds_session *s = con->private;
+
+       pr_err("ceph mds%d gave us the boot.  IMPLEMENT RECONNECT.\n",
+              s->s_mds);
+}
+
+static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_mds_client *mdsc = s->s_mdsc;
+       int type = le16_to_cpu(msg->hdr.type);
+
+       switch (type) {
+       case CEPH_MSG_MDS_MAP:
+               ceph_mdsc_handle_map(mdsc, msg);
+               break;
+       case CEPH_MSG_CLIENT_SESSION:
+               handle_session(mdsc, msg);
+               break;
+       case CEPH_MSG_CLIENT_REPLY:
+               handle_reply(mdsc, msg);
+               break;
+       case CEPH_MSG_CLIENT_REQUEST_FORWARD:
+               handle_forward(mdsc, msg);
+               break;
+       case CEPH_MSG_CLIENT_CAPS:
+               ceph_handle_caps(mdsc, msg);
+               break;
+       case CEPH_MSG_CLIENT_SNAP:
+               ceph_handle_snap(mdsc, msg);
+               break;
+       case CEPH_MSG_CLIENT_LEASE:
+               handle_lease(mdsc, msg);
+               break;
+
+       default:
+               pr_err("ceph received unknown message type %d %s\n", type,
+                      ceph_msg_type_name(type));
+       }
+       ceph_msg_put(msg);
+}
+
+const static struct ceph_connection_operations mds_con_ops = {
+       .get = con_get,
+       .put = con_put,
+       .dispatch = dispatch,
+       .peer_reset = peer_reset,
+       .alloc_msg = ceph_alloc_msg,
+       .alloc_middle = ceph_alloc_middle,
+};
+
+
+
 
 /* eof */
index fc23d327101bbfc6b680ef1627841984d9fd3fdf..16867fa62ce9ac24a8490af6de043618a15d883a 100644 (file)
@@ -90,6 +90,7 @@ enum {
 };
 
 struct ceph_mds_session {
+       struct ceph_mds_client *s_mdsc;
        int               s_mds;
        int               s_state;
        unsigned long     s_ttl;      /* time until mds kills us */
@@ -284,18 +285,6 @@ extern void ceph_mdsc_stop(struct ceph_mds_client *mdsc);
 
 extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
 
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
-                                struct ceph_msg *msg);
-extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
-                                    struct ceph_msg *msg);
-extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc,
-                                  struct ceph_msg *msg);
-extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
-                                    struct ceph_msg *msg);
-
-extern void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc,
-                                  struct ceph_msg *msg);
-
 extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
                                    struct inode *inode,
                                    struct dentry *dn, int mask);
@@ -314,7 +303,6 @@ static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
 extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
 
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
-extern void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
                                  int stop_on_nosnap);
@@ -325,4 +313,7 @@ extern void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
                                     struct dentry *dentry, char action,
                                     u32 seq);
 
+extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
+                                struct ceph_msg *msg);
+
 #endif
index 4422aa3d38934b77d75a9c431a825e5eb4b4a60d..e83c7c73872dcbb65b48465db72a622fa268aaef 100644 (file)
@@ -257,7 +257,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
        return NULL;
 
 yes:
-       con->get(con);
+       con->ops->get(con);
        dout("get_connection %p nref = %d -> %d\n", con,
             atomic_read(&con->nref) - 1, atomic_read(&con->nref));
        return con;
@@ -295,16 +295,16 @@ void ceph_con_destroy(struct ceph_connection *con)
 /*
  * generic get/put
  */
-static void get_connection(struct ceph_connection *con)
+void ceph_con_get(struct ceph_connection *con)
 {
-       dout("get_connection %p nref = %d -> %d\n", con,
+       dout("con_get %p nref = %d -> %d\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1);
        atomic_inc(&con->nref);
 }
 
-static void put_connection(struct ceph_connection *con)
+void ceph_con_put(struct ceph_connection *con)
 {
-       dout("put_connection %p nref = %d -> %d\n", con,
+       dout("con_put %p nref = %d -> %d\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) - 1);
        BUG_ON(atomic_read(&con->nref) == 0);
        if (atomic_dec_and_test(&con->nref)) {
@@ -333,8 +333,6 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
        con->peer_addr = *addr;
 
        con->private = NULL;
-       con->get = get_connection;
-       con->put = put_connection;
 }
 
 /*
@@ -351,7 +349,7 @@ static int __register_connection(struct ceph_messenger *msgr,
 
        dout("register_connection %p %d -> %d\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1);
-       con->get(con);
+       con->ops->get(con);
 
        list_add(&con->list_all, &msgr->con_all);
 
@@ -367,7 +365,7 @@ static int __register_connection(struct ceph_messenger *msgr,
                rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
                if (rc < 0) {
                        list_del(&con->list_all);
-                       con->put(con);
+                       con->ops->put(con);
                        return rc;
                }
        }
@@ -417,7 +415,7 @@ static void __remove_connection(struct ceph_messenger *msgr,
                        list_del_init(&con->list_bucket);
                }
        }
-       con->put(con);
+       con->ops->put(con);
 }
 
 static void remove_connection(struct ceph_messenger *msgr,
@@ -930,7 +928,7 @@ static int process_connect(struct ceph_connection *con)
 
                /* Tell ceph about it. */
                pr_info("reset on %s%d\n", ENTITY_NAME(con->peer_name));
-               con->peer_reset(con->private, con);
+               con->ops->peer_reset(con);
                break;
 
        case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1089,8 +1087,8 @@ static int read_partial_message(struct ceph_connection *con)
        if (!con->in_msg) {
                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
                     con->in_hdr.front_len, con->in_hdr.data_len);
-               con->in_msg = con->msgr->alloc_msg(con->msgr->parent,
-                                                  &con->in_hdr);
+               con->in_msg = con->ops->alloc_msg(con->msgr->parent,
+                                                 &con->in_hdr);
                if (!con->in_msg) {
                        /* skip this message */
                        dout("alloc_msg returned NULL, skipping message\n");
@@ -1128,8 +1126,8 @@ static int read_partial_message(struct ceph_connection *con)
        while (middle_len > 0 && (!m->middle ||
                                  m->middle->vec.iov_len < middle_len)) {
                if (m->middle == NULL) {
-                       BUG_ON(!con->msgr->alloc_middle);
-                       ret = con->msgr->alloc_middle(con->msgr->parent, m);
+                       BUG_ON(!con->ops->alloc_middle);
+                       ret = con->ops->alloc_middle(con->msgr->parent, m);
                        if (ret < 0) {
                                dout("alloc_middle failed, skipping payload\n");
                                con->in_base_pos = -middle_len - data_len
@@ -1165,8 +1163,8 @@ static int read_partial_message(struct ceph_connection *con)
                /* find pages for data payload */
                want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
                ret = 0;
-               BUG_ON(!con->msgr->prepare_pages);
-               ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
+               BUG_ON(!con->ops->prepare_pages);
+               ret = con->ops->prepare_pages(con->msgr->parent, m, want);
                if (ret < 0) {
                        dout("%p prepare_pages failed, skipping payload\n", m);
                        con->in_base_pos = -data_len - sizeof(m->footer);
@@ -1285,7 +1283,7 @@ static void process_message(struct ceph_connection *con)
             le32_to_cpu(con->in_msg->hdr.front_len),
             le32_to_cpu(con->in_msg->hdr.data_len),
             con->in_front_crc, con->in_middle_crc, con->in_data_crc);
-       con->msgr->dispatch(con->msgr->parent, con->in_msg);
+       con->ops->dispatch(con, con->in_msg);
        con->in_msg = NULL;
        prepare_read_tag(con);
 }
@@ -1536,13 +1534,13 @@ static void ceph_queue_con(struct ceph_connection *con)
                return;
        }
 
-       con->get(con);
+       con->ops->get(con);
 
        set_bit(QUEUED, &con->state);
        if (test_bit(BUSY, &con->state) ||
            !queue_work(ceph_msgr_wq, &con->work.work)) {
                dout("ceph_queue_con %p - already BUSY or queued\n", con);
-               con->put(con);
+               con->ops->put(con);
        } else {
                dout("ceph_queue_con %p\n", con);
        }
@@ -1595,7 +1593,7 @@ done:
        dout("con_work %p done\n", con);
 
 out:
-       con->put(con);
+       con->ops->put(con);
 }
 
 
@@ -1645,7 +1643,7 @@ static void ceph_fault(struct ceph_connection *con)
        dout("fault queueing %p %d -> %d delay %lu\n", con,
             atomic_read(&con->nref), atomic_read(&con->nref) + 1,
             con->delay);
-       con->get(con);
+       con->ops->get(con);
        queue_delayed_work(ceph_msgr_wq, &con->work,
                           round_jiffies_relative(con->delay));
 }
@@ -1707,15 +1705,15 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
                                 list_all);
                dout("destroy removing connection %p\n", con);
                set_bit(CLOSED, &con->state);
-               con->get(con);
+               con->ops->get(con);
                __remove_connection(msgr, con);
 
                /* in case there's queued work.  drop a reference if
                 * we successfully cancel work. */
                spin_unlock(&msgr->con_lock);
                if (cancel_delayed_work_sync(&con->work))
-                       con->put(con);
-               con->put(con);
+                       con->ops->put(con);
+               con->ops->put(con);
                dout("destroy removed connection %p\n", con);
 
                spin_lock(&msgr->con_lock);
@@ -1758,7 +1756,7 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr,
        }
        spin_unlock(&msgr->con_lock);
        if (con)
-               con->put(con);
+               con->ops->put(con);
 }
 
 /*
@@ -1841,7 +1839,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                spin_lock(&msgr->con_lock);
                con = __get_connection(msgr, &msg->hdr.dst.addr);
                if (con) {
-                       con->put(newcon);
+                       con->ops->put(newcon);
                        dout("ceph_msg_send (lost race and) had connection "
                             "%p to peer %u.%u.%u.%u:%u\n", con,
                             IPQUADPORT(msg->hdr.dst.addr.ipaddr));
@@ -1895,7 +1893,7 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                ceph_queue_con(con);
 
-       con->put(con);
+       con->ops->put(con);
        dout("ceph_msg_send done\n");
        return ret;
 }
@@ -1921,9 +1919,9 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
             le32_to_cpu(msg->hdr.front_len),
             le32_to_cpu(msg->hdr.middle_len),
             le32_to_cpu(msg->hdr.data_len));
-       dout("ceph_con_send %p %p seq %llu for %s%d on %p pgs %d\n",
-            con, msg, le64_to_cpu(msg->hdr.seq),
-            ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages);
+       dout("ceph_con_send %p %s%d %p seq %llu pgs %d\n",
+            con, ENTITY_NAME(msg->hdr.dst.name), msg,
+            le64_to_cpu(msg->hdr.seq), msg->nr_pages);
        list_add_tail(&msg->list_head, &con->out_queue);
        spin_unlock(&con->out_queue_lock);
 
@@ -2014,6 +2012,48 @@ out:
        return ERR_PTR(-ENOMEM);
 }
 
+/*
+ * Generic message allocator, for incoming messages.
+ */
+struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+                               struct ceph_msg_header *hdr)
+{
+       int type = le32_to_cpu(hdr->type);
+       int front_len = le32_to_cpu(hdr->front_len);
+       struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
+
+       if (!msg) {
+               pr_err("ceph: unable to allocate msg type %d len %d\n",
+                      type, front_len);
+               return ERR_PTR(-ENOMEM);
+       }
+       return msg;
+}
+
+/*
+ * Allocate "middle" portion of a message, if it is needed and wasn't
+ * allocated by alloc_msg.  This allows us to read a small fixed-size
+ * per-type header in the front and then gracefully fail (i.e.,
+ * propagate the error to the caller based on info in the front) when
+ * the middle is too large.
+ */
+int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       int type = le32_to_cpu(msg->hdr.type);
+       int middle_len = le32_to_cpu(msg->hdr.middle_len);
+
+       dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
+            ceph_msg_type_name(type), middle_len);
+       BUG_ON(!middle_len);
+       BUG_ON(msg->middle);
+
+       msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
+       if (!msg->middle)
+               return -ENOMEM;
+       return 0;
+}
+
+
 /*
  * Free a generically kmalloc'd message.
  */
index 18ab875ab2d35078ca17b3c29a1497198b745fd5..96a2b12f7a4c81f840759cabd63d7e4987abd89b 100644 (file)
@@ -25,25 +25,28 @@ struct ceph_connection;
 extern struct workqueue_struct *ceph_msgr_wq;       /* receive work queue */
 
 /*
- * Ceph defines these callbacks for handling events:
+ * Ceph defines these callbacks for handling connection events.
  */
-/* handle an incoming message. */
-typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
-/* an incoming message has a data payload; tell me what pages I
- * should read the data into. */
-typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
-                                         int want);
-/* a remote host as terminated a message exchange session, and messages
- * we sent (or they tried to send us) may be lost. */
-typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
-                                       struct ceph_entity_name *pn);
-
-typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p,
-                                           struct ceph_msg_header *hdr);
-typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg);
-
-typedef void (*ceph_con_get_t)(struct ceph_connection *);
-typedef void (*ceph_con_put_t)(struct ceph_connection *);
+struct ceph_connection_operations {
+       void (*get)(struct ceph_connection *);
+       void (*put)(struct ceph_connection *);
+
+       /* handle an incoming message. */
+       void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
+
+       /* a remote host as terminated a message exchange session, and messages
+        * we sent (or they tried to send us) may be lost. */
+       void (*peer_reset) (struct ceph_connection *con);
+
+       struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
+                                       struct ceph_msg_header *hdr);
+       int (*alloc_middle) (struct ceph_connection *con,
+                            struct ceph_msg *msg);
+       /* an incoming message has a data payload; tell me what pages I
+        * should read the data into. */
+       int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
+                             int want);
+};
 
 static inline const char *ceph_name_type_str(int t)
 {
@@ -64,10 +67,6 @@ static inline const char *ceph_name_type_str(int t)
 
 struct ceph_messenger {
        void *parent;                    /* normally struct ceph_client * */
-       ceph_msgr_dispatch_t dispatch;
-       ceph_msgr_prepare_pages_t prepare_pages;
-       ceph_msgr_alloc_msg_t alloc_msg;
-       ceph_msgr_alloc_middle_t alloc_middle;
 
        struct ceph_entity_inst inst;    /* my name+address */
 
@@ -149,11 +148,9 @@ struct ceph_msg_pos {
  */
 struct ceph_connection {
        void *private;
-       ceph_con_get_t get;
-       ceph_con_put_t put;
        atomic_t nref;
 
-       ceph_msgr_peer_reset_t peer_reset;
+       const struct ceph_connection_operations *ops;
 
        struct ceph_messenger *msgr;
        struct socket *sock;
@@ -232,6 +229,12 @@ void ceph_con_destroy(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 extern void ceph_con_keepalive(struct ceph_connection *con);
 extern void ceph_con_close(struct ceph_connection *con);
+extern void ceph_con_get(struct ceph_connection *con);
+extern void ceph_con_put(struct ceph_connection *con);
+
+extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+                                      struct ceph_msg_header *hdr);
+extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
 
 extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
                                     struct ceph_entity_addr *addr);
index 9d80fbde32d19b63bbb49345aaf88b38ee70b5b8..c2e9435e680783325357512d5d9e50de23d188a7 100644 (file)
@@ -26,6 +26,8 @@
  * the cluster to try.
  */
 
+const static struct ceph_connection_operations mon_con_ops;
+
 /*
  * Decode a monmap blob (e.g., during mount).
  */
@@ -101,7 +103,7 @@ static int open_session(struct ceph_mon_client *monc, int newmon)
 
        if (monc->con) {
                dout("open_session closing mon%d\n", monc->last_mon);
-               monc->con->put(monc->con);
+               monc->con->ops->put(monc->con);
        }
 
        get_random_bytes(&r, 1);
@@ -116,6 +118,8 @@ static int open_session(struct ceph_mon_client *monc, int newmon)
        dout("open_session mon%d opened\n", monc->last_mon);
        ceph_con_init(monc->client->msgr, monc->con,
                      &monc->monmap->mon_inst[monc->last_mon].addr);
+       monc->con->private = monc;
+       monc->con->ops = &mon_con_ops;
        monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON);
        monc->con->peer_name.num = cpu_to_le32(monc->last_mon);
        return 0;
@@ -315,10 +319,10 @@ void ceph_monc_request_mount(struct ceph_mon_client *monc)
  * The monitor responds with mount ack indicate mount success.  The
  * included client ticket allows the client to talk to MDSs and OSDs.
  */
-void ceph_handle_mount_ack(struct ceph_client *client, struct ceph_msg *msg)
+static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
 {
-       struct ceph_mon_client *monc = &client->monc;
-       struct ceph_monmap *monmap = NULL, *old = client->monc.monmap;
+       struct ceph_client *client = monc->client;
+       struct ceph_monmap *monmap = NULL, *old = monc->monmap;
        void *p, *end;
        s32 result;
        u32 len;
@@ -432,8 +436,7 @@ void ceph_monc_request_umount(struct ceph_mon_client *monc)
        mutex_unlock(&monc->req_mutex);
 }
 
-void ceph_monc_handle_umount(struct ceph_mon_client *monc,
-                            struct ceph_msg *msg)
+static void handle_umount(struct ceph_mon_client *monc, struct ceph_msg *msg)
 {
        dout("handle_umount\n");
        mutex_lock(&monc->req_mutex);
@@ -447,8 +450,8 @@ void ceph_monc_handle_umount(struct ceph_mon_client *monc,
 /*
  * statfs
  */
-void ceph_monc_handle_statfs_reply(struct ceph_mon_client *monc,
-                                  struct ceph_msg *msg)
+static void handle_statfs_reply(struct ceph_mon_client *monc,
+                               struct ceph_msg *msg)
 {
        struct ceph_mon_statfs_request *req;
        struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
@@ -618,8 +621,51 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
        cancel_timeout(&monc->umountreq);
        cancel_delayed_work_sync(&monc->statfs_delayed_work);
        if (monc->con) {
-               monc->con->put(monc->con);
+               monc->con->ops->put(monc->con);
                monc->con = NULL;
        }
        kfree(monc->monmap);
 }
+
+
+/*
+ * handle incoming message
+ */
+static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       struct ceph_mon_client *monc = con->private;
+       int type = le16_to_cpu(msg->hdr.type);
+
+       switch (type) {
+       case CEPH_MSG_CLIENT_MOUNT_ACK:
+               handle_mount_ack(monc, msg);
+               break;
+       case CEPH_MSG_STATFS_REPLY:
+               handle_statfs_reply(monc, msg);
+               break;
+       case CEPH_MSG_CLIENT_UNMOUNT:
+               handle_umount(monc, msg);
+               break;
+
+       case CEPH_MSG_MDS_MAP:
+               ceph_mdsc_handle_map(&monc->client->mdsc, msg);
+               break;
+
+       case CEPH_MSG_OSD_MAP:
+               ceph_osdc_handle_map(&monc->client->osdc, msg);
+               break;
+
+       default:
+               pr_err("ceph received unknown message type %d %s\n", type,
+                      ceph_msg_type_name(type));
+       }
+       ceph_msg_put(msg);
+}
+
+const static struct ceph_connection_operations mon_con_ops = {
+       .get = ceph_con_get,
+       .put = ceph_con_put,
+       .dispatch = dispatch,
+       .alloc_msg = ceph_alloc_msg,
+       .alloc_middle = ceph_alloc_middle,
+};
index 3685a031c8a02e10c63ce25d655a4b56a0384d94..823c5bd1964c9c8d081bf8fc8ad0b35994156863 100644 (file)
@@ -92,17 +92,12 @@ extern void ceph_monc_request_osdmap(struct ceph_mon_client *monc, u32 want);
 extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
 
 extern void ceph_monc_request_mount(struct ceph_mon_client *monc);
-extern void ceph_handle_mount_ack(struct ceph_client *client,
-                                 struct ceph_msg *msg);
+extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
 extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
 
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
                               struct ceph_statfs *buf);
-extern void ceph_monc_handle_statfs_reply(struct ceph_mon_client *monc,
-                                         struct ceph_msg *msg);
 
-extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
-extern void ceph_monc_handle_umount(struct ceph_mon_client *monc,
-                                   struct ceph_msg *msg);
+
 
 #endif
index ef6df9f107374de129a43b40d90e9f3070a32076..fc9753d683a7cc976dd302d142a1240d1a23676d 100644 (file)
@@ -11,6 +11,7 @@
 #include "messenger.h"
 #include "decode.h"
 
+const static struct ceph_connection_operations osd_con_ops;
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
@@ -291,9 +292,9 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
  * reply (because it does not get notification when clients, mds' leave
  * the cluster).
  */
-static void peer_reset(void *p, struct ceph_connection *con)
+static void peer_reset(struct ceph_connection *con)
 {
-       struct ceph_osd *osd = p;
+       struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc = osd->o_osdc;
        
        down_read(&osdc->map_sem);
@@ -317,7 +318,7 @@ static int init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
        ceph_con_init(osdc->client->msgr, osd->o_con,
                      &osdc->osdmap->osd_addr[o]);
        osd->o_con->private = osd;
-       osd->o_con->peer_reset = peer_reset;
+       osd->o_con->ops = &osd_con_ops;
        osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
        osd->o_con->peer_name.num = cpu_to_le32(o);
        return 0;
@@ -327,7 +328,7 @@ static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
        dout("destroy_osd %p\n", osd);
        rb_erase(&osd->o_node, &osdc->osds);
-       osd->o_con->put(osd->o_con);
+       osd->o_con->ops->put(osd->o_con);
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -597,7 +598,7 @@ static void handle_timeout(struct work_struct *work)
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  */
-void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
        struct ceph_osd_request *req;
@@ -1032,6 +1033,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        mutex_init(&osdc->request_mutex);
        osdc->timeout_tid = 0;
        osdc->last_tid = 0;
+       osdc->osds = RB_ROOT;
        osdc->requests = RB_ROOT;
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
@@ -1135,3 +1137,37 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        return rc;
 }
 
+/*
+ * handle incoming message
+ */
+static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       struct ceph_osd *osd = con->private;
+       struct ceph_osd_client *osdc = osd->o_osdc;
+       int type = le16_to_cpu(msg->hdr.type);
+
+       switch (type) {
+       case CEPH_MSG_OSD_MAP:
+               ceph_osdc_handle_map(osdc, msg);
+               break;
+       case CEPH_MSG_OSD_OPREPLY:
+               handle_reply(osdc, msg);
+               break;
+
+       default:
+               pr_err("ceph received unknown message type %d %s\n", type,
+                      ceph_msg_type_name(type));
+       }
+       ceph_msg_put(msg);
+}
+
+const static struct ceph_connection_operations osd_con_ops = {
+       .get = ceph_con_get,
+       .put = ceph_con_put,
+       .dispatch = dispatch,
+       .peer_reset = peer_reset,
+       .alloc_msg = ceph_alloc_msg,
+       .alloc_middle = ceph_alloc_middle,
+};
+
+
index a0d8cabaa8d8ed2ed6cb1b740e06bb630048e5e5..cfc5a4a2864a3dba6a92a165479bcafefe5e872b 100644 (file)
 /*
  * Ceph superblock operations
  *
- * Handle the basics of mounting, unmounting.  Also dispatch message
- * types to appropriate handlers and subsystems.
+ * Handle the basics of mounting, unmounting.
  */
 
-static void ceph_dispatch(void *p, struct ceph_msg *msg);
-static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr);
-static int ceph_alloc_middle(void *p, struct ceph_msg *msg);
 
 /*
  * find filename portion of a path (/foo/bar/baz -> baz)
@@ -762,10 +758,6 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                        goto out;
                }
                client->msgr->parent = client;
-               client->msgr->dispatch = ceph_dispatch;
-               client->msgr->prepare_pages = ceph_osdc_prepare_pages;
-               client->msgr->alloc_msg = ceph_alloc_msg;
-               client->msgr->alloc_middle = ceph_alloc_middle;
        }
 
        /* send mount request, and wait for mon, mds, and osd maps */
@@ -825,6 +817,7 @@ out:
        return err;
 }
 
+#if 0
 static struct ceph_msg_pool *get_pool(struct ceph_client *client, int type)
 {
        switch (type) {
@@ -850,7 +843,8 @@ static struct ceph_msg_pool *get_pool(struct ceph_client *client, int type)
  * Allocate incoming message.  Return message, or NULL to ignore message,
  * or error to fault connection.
  */
-static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr)
+struct ceph_msg *my_ceph_alloc_msg(struct ceph_connection *con,
+                               struct ceph_msg_header *hdr)
 {
        struct ceph_client *client = p;
        int type = le32_to_cpu(hdr->type);
@@ -886,94 +880,7 @@ static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr)
        msg->front.iov_len = front_len;
        return msg;
 }
-
-/*
- * Allocate "middle" portion of a message, if it is needed and wasn't
- * allocated by alloc_msg.  This allows us to read a small fixed-size
- * per-type header in the front and then gracefully fail (i.e.,
- * propagate the error to the caller based on info in the front) when
- * the middle is too large.
- */
-static int ceph_alloc_middle(void *p, struct ceph_msg *msg)
-{
-       int type = le32_to_cpu(msg->hdr.type);
-       int middle_len = le32_to_cpu(msg->hdr.middle_len);
-
-       dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
-            ceph_msg_type_name(type), middle_len);
-       BUG_ON(!middle_len);
-       BUG_ON(msg->middle);
-
-       msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
-       if (!msg->middle)
-               return -ENOMEM;
-       return 0;
-}
-
-
-/*
- * Process an incoming message.
- *
- * This should be relatively fast and must not do any work that waits
- * on other messages to be received.
- */
-void ceph_dispatch(void *p, struct ceph_msg *msg)
-{
-       struct ceph_client *client = p;
-       int type = le16_to_cpu(msg->hdr.type);
-
-       switch (type) {
-       case CEPH_MSG_CLIENT_MOUNT_ACK:
-               ceph_handle_mount_ack(client, msg);
-               break;
-
-               /* mon client */
-       case CEPH_MSG_STATFS_REPLY:
-               ceph_monc_handle_statfs_reply(&client->monc, msg);
-               break;
-       case CEPH_MSG_CLIENT_UNMOUNT:
-               ceph_monc_handle_umount(&client->monc, msg);
-               break;
-
-               /* mds client */
-       case CEPH_MSG_MDS_MAP:
-               ceph_mdsc_handle_map(&client->mdsc, msg);
-               break;
-       case CEPH_MSG_CLIENT_SESSION:
-               ceph_mdsc_handle_session(&client->mdsc, msg);
-               break;
-       case CEPH_MSG_CLIENT_REPLY:
-               ceph_mdsc_handle_reply(&client->mdsc, msg);
-               break;
-       case CEPH_MSG_CLIENT_REQUEST_FORWARD:
-               ceph_mdsc_handle_forward(&client->mdsc, msg);
-               break;
-       case CEPH_MSG_CLIENT_CAPS:
-               ceph_handle_caps(&client->mdsc, msg);
-               break;
-       case CEPH_MSG_CLIENT_SNAP:
-               ceph_handle_snap(&client->mdsc, msg);
-               break;
-       case CEPH_MSG_CLIENT_LEASE:
-               ceph_mdsc_handle_lease(&client->mdsc, msg);
-               break;
-
-               /* osd client */
-       case CEPH_MSG_OSD_MAP:
-               ceph_osdc_handle_map(&client->osdc, msg);
-               break;
-       case CEPH_MSG_OSD_OPREPLY:
-               ceph_osdc_handle_reply(&client->osdc, msg);
-               break;
-
-       default:
-               pr_err("ceph received unknown message type %d %s\n", type,
-                      ceph_msg_type_name(type));
-       }
-
-       ceph_msg_put(msg);
-}
-
+#endif
 
 static int ceph_set_super(struct super_block *s, void *data)
 {
index 5d5c433fd268631f24d977f39155096a222bd6dc..ddde2b0f75bf8f84b18b962f64d1206271c7070b 100644 (file)
@@ -794,6 +794,12 @@ static inline void __ceph_fsid_set_major(ceph_fsid_t *fsid, __le64 val)
        put_unaligned_le64(val, &fsid->fsid[0]);
 }
 
+/*
+extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
+extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+                                      struct ceph_msg_header *hdr);
+*/
+
 /* inode.c */
 extern const struct inode_operations ceph_file_iops;