]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: embed ceph_connection in ceph_osd
authorSage Weil <sage@newdream.net>
Thu, 3 Sep 2009 22:51:20 +0000 (15:51 -0700)
committerSage Weil <sage@newdream.net>
Thu, 3 Sep 2009 22:51:20 +0000 (15:51 -0700)
This avoids some memory (re)allocation, and cleans the code up
somewhat.

src/kernel/messenger.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index 0370082651252a7967f4cd04faa105189b303dba..2a30c02fb4285117e807db4c81c687d68d4c151f 100644 (file)
@@ -257,7 +257,7 @@ static void reset_connection(struct ceph_connection *con)
  */
 void ceph_con_close(struct ceph_connection *con)
 {
-       dout("close %p peer %u.%u.%u.%u:%u\n", con,
+       dout("con_close %p peer %u.%u.%u.%u:%u\n", con,
             IPQUADPORT(con->peer_addr.ipaddr));
        set_bit(CLOSED, &con->state);  /* in case there's queued work */
        clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
index 478922c44af4c9968e8ba99b94ff802c3de44712..c1b8c2a5ab34e6142014931a64ddb00430dc5d1f 100644 (file)
@@ -303,55 +303,72 @@ static void osd_reset(struct ceph_connection *con)
 /*
  * Track open sessions with osds.
  */
-static int open_osd_session(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
 {
-       int o = osd->o_osd;
+       struct ceph_osd *osd;
 
-       osd->o_con = kzalloc(sizeof(*osd->o_con), GFP_NOFS);
-       if (!osd->o_con)
-               return -ENOMEM;
-       ceph_con_init(osdc->client->msgr, osd->o_con);
-       osd->o_con->private = osd;
-       osd->o_con->ops = &osd_con_ops;
-       osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
-       osd->o_con->peer_name.num = cpu_to_le32(o);
-       ceph_con_open(osd->o_con, &osdc->osdmap->osd_addr[o]);
-       return 0;
-}
+       osd = kzalloc(sizeof(*osd), GFP_NOFS);
+       if (!osd)
+               return NULL;
 
-static void init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
-{
-       dout("init_osd %p osd%d\n", osd, o);
+       atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
-       osd->o_osd = o;
        INIT_LIST_HEAD(&osd->o_requests);
-       osd->o_con = NULL;
+
+       ceph_con_init(osdc->client->msgr, &osd->o_con);
+       osd->o_con.private = osd;
+       osd->o_con.ops = &osd_con_ops;
+       osd->o_con.peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
+       return osd;
+}
+
+static struct ceph_osd *get_osd(struct ceph_osd *osd)
+{
+       if (atomic_inc_not_zero(&osd->o_ref)) {
+               dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
+                    atomic_read(&osd->o_ref));
+               return osd;
+       } else {
+               dout("get_osd %p FAIL\n", osd);
+               return NULL;
+       }
+}
+
+static void put_osd(struct ceph_osd *osd)
+{
+       dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
+            atomic_read(&osd->o_ref) - 1);
+       if (atomic_dec_and_test(&osd->o_ref)) {
+               ceph_con_shutdown(&osd->o_con);
+               kfree(osd);
+       }
 }
 
-static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+/*
+ * remove an osd from our map
+ */
+static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       dout("destroy_osd %p\n", osd);
+       dout("remove_osd %p\n", osd);
        BUG_ON(!list_empty(&osd->o_requests));
        rb_erase(&osd->o_node, &osdc->osds);
-       osd->o_con->private = NULL;
-       ceph_con_close(osd->o_con);
-       osd->o_con->ops->put(osd->o_con);
-       osd->o_con = NULL;
+       ceph_con_close(&osd->o_con);
+       put_osd(osd);
 }
 
+/*
+ * reset osd connect
+ */
 static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
        int ret = 0;
 
        dout("reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests)) {
-               destroy_osd(osdc, osd);
-               kfree(osd);
+               remove_osd(osdc, osd);
        } else {
-               ceph_con_close(osd->o_con);
-               osd->o_con->ops->put(osd->o_con);
-               osd->o_con = NULL;
-               ret = open_osd_session(osdc, osd);
+               ceph_con_close(&osd->o_con);
+               ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
        }
        return ret;
 }
@@ -395,8 +412,6 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 }
 
 
-
-
 /*
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
@@ -439,10 +454,8 @@ static void __unregister_request(struct ceph_osd_client *osdc,
        osdc->num_requests--;
 
        list_del_init(&req->r_osd_item);
-       if (list_empty(&req->r_osd->o_requests)) {
-               destroy_osd(osdc, req->r_osd);
-               kfree(req->r_osd);
-       }
+       if (list_empty(&req->r_osd->o_requests))
+               remove_osd(osdc, req->r_osd);
        req->r_osd = NULL;
 
        ceph_osdc_put_request(req);
@@ -503,8 +516,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests)) {
                        /* try to re-use r_osd if possible */
-                       newosd = req->r_osd;
-                       destroy_osd(osdc, newosd);
+                       newosd = get_osd(req->r_osd);
+                       remove_osd(osdc, newosd);
                }
                req->r_osd = NULL;
        }
@@ -516,12 +529,17 @@ static int __map_osds(struct ceph_osd_client *osdc,
                        newosd = NULL;
                } else {
                        err = -ENOMEM;
-                       req->r_osd = kmalloc(sizeof(*req->r_osd), GFP_NOFS);
+                       req->r_osd = create_osd(osdc);
                        if (!req->r_osd)
                                goto out;
                }
-               init_osd(osdc, req->r_osd, o);
+
+               dout("map_osds osd %p is osd%d\n", req->r_osd, o);
+               req->r_osd->o_osd = o;
+               req->r_osd->o_con.peer_name.num = cpu_to_le32(o);
                __insert_osd(osdc, req->r_osd);
+
+               ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
        }
 
        if (req->r_osd)
@@ -529,7 +547,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
        err = 1;   /* osd changed */
 
 out:
-       kfree(newosd);
+       if (newosd)
+               put_osd(newosd);
        return err;
 }
 
@@ -554,12 +573,6 @@ static int send_request(struct ceph_osd_client *osdc,
        dout("send_request %p tid %llu to osd%d flags %d\n",
             req, req->r_tid, req->r_osd->o_osd, req->r_flags);
 
-       if (req->r_osd->o_con == NULL) {
-               err = open_osd_session(osdc, req->r_osd);
-               if (err < 0)
-                       return err;
-       }
-
        reqhead = req->r_request->front.iov_base;
        reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
@@ -568,7 +581,7 @@ static int send_request(struct ceph_osd_client *osdc,
        req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
-       ceph_con_send(req->r_osd->o_con, req->r_request);
+       ceph_con_send(&req->r_osd->o_con, req->r_request);
        return 0;
 }
 
@@ -624,7 +637,7 @@ static void handle_timeout(struct work_struct *work)
                dout(" tid %llu (at least) timed out on osd%d\n",
                     req->r_tid, osd->o_osd);
                req->r_timeout_stamp = next_timeout;
-               ceph_con_keepalive(osd->o_con);
+               ceph_con_keepalive(&osd->o_con);
        }
 
        if (osdc->timeout_tid)
@@ -760,7 +773,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
 
                        n = rb_next(p);
                        if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-                           !ceph_entity_addr_equal(&osd->o_con->peer_addr,
+                           !ceph_entity_addr_equal(&osd->o_con.peer_addr,
                                            ceph_osd_addr(osdc->osdmap,
                                                          osd->o_osd)))
                                reset_osd(osdc, osd);
@@ -1237,9 +1250,26 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        ceph_msg_put(msg);
 }
 
+/*
+ * Wrappers to refcount containing ceph_osd struct
+ */
+static struct ceph_connection *get_osd_con(struct ceph_connection *con)
+{
+       struct ceph_osd *osd = con->private;
+       if (get_osd(osd))
+               return con;
+       return NULL;
+}
+
+static void put_osd_con(struct ceph_connection *con)
+{
+       struct ceph_osd *osd = con->private;
+       put_osd(osd);
+}
+
 const static struct ceph_connection_operations osd_con_ops = {
-       .get = ceph_con_get,
-       .put = ceph_con_put,
+       .get = get_osd_con,
+       .put = put_osd_con,
        .dispatch = dispatch,
        .peer_reset = osd_reset,
        .alloc_msg = ceph_alloc_msg,
index 26539858c7a884738fd94ffb4f12b79dae56e570..b3e574c557539fed1d24bf65a8a9516ecf305b6b 100644 (file)
@@ -7,12 +7,12 @@
 
 #include "types.h"
 #include "osdmap.h"
+#include "messenger.h"
 
 struct ceph_msg;
 struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
-struct ceph_connection;
 
 /*
  * completion callback for async writepages
@@ -22,10 +22,11 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
 
 /* a given osd we're communicating with */
 struct ceph_osd {
+       atomic_t o_ref;
        struct ceph_osd_client *o_osdc;
        int o_osd;
        struct rb_node o_node;
-       struct ceph_connection *o_con;
+       struct ceph_connection o_con;
        struct list_head o_requests;
 };