]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use connection api for osdc (almost, no kick or timeout)
authorSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 22:39:20 +0000 (15:39 -0700)
committerSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 22:39:20 +0000 (15:39 -0700)
src/kernel/osd_client.c
src/kernel/osd_client.h

index 51d207dd2eb973055916a73a6dd8610c224914fa..232f00078ac6321507ddfa8073c39515edb2c3a4 100644 (file)
@@ -142,7 +142,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
        req->r_flags = flags;
-       req->r_last_osd = -1;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 
@@ -281,6 +280,74 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
        return NULL;
 }
 
+
+/*
+ * Track open sessions with osds.
+ */
+static int init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
+{
+       dout("init_osd %p osd%d\n", osd, o);
+       osd->o_osd = o;
+       INIT_LIST_HEAD(&osd->o_requests);
+
+       osd->o_con = kzalloc(sizeof(*osd->o_con), GFP_NOFS);
+       if (!osd->o_con)
+               return -ENOMEM;
+       ceph_con_init(osdc->client->msgr, osd->o_con,
+                     &osdc->osdmap->osd_addr[o]);
+       osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
+       osd->o_con->peer_name.num = cpu_to_le32(o);
+       return 0;
+}
+
+static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+{
+       dout("destroy_osd %p\n", osd);
+       rb_erase(&osd->o_node, &osdc->osds);
+       osd->o_con->put(osd->o_con);
+}
+
+static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
+{
+       struct rb_node **p = &osdc->osds.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd *osd = NULL;
+
+       while (*p) {
+               parent = *p;
+               osd = rb_entry(parent, struct ceph_osd, o_node);
+               if (new->o_osd < osd->o_osd)
+                       p = &(*p)->rb_left;
+               else if (new->o_osd > osd->o_osd)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->o_node, parent, p);
+       rb_insert_color(&new->o_node, &osdc->osds);
+}
+
+static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
+{
+       struct ceph_osd *osd;
+       struct rb_node *n = osdc->osds.rb_node;
+
+       while (n) {
+               osd = rb_entry(n, struct ceph_osd, o_node);
+               if (o < osd->o_osd)
+                       n = n->rb_left;
+               else if (o > osd->o_osd)
+                       n = n->rb_right;
+               else
+                       return osd;
+       }
+       return NULL;
+}
+
+
+
+
 /*
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
@@ -321,6 +388,12 @@ static void __unregister_request(struct ceph_osd_client *osdc,
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
        rb_erase(&req->r_node, &osdc->requests);
        osdc->num_requests--;
+
+       list_del_init(&req->r_osd_item);
+       if (list_empty(&req->r_osd->o_requests))
+               destroy_osd(osdc, req->r_osd);
+       req->r_osd = NULL;
+
        ceph_osdc_put_request(req);
 
        if (req->r_tid == osdc->timeout_tid) {
@@ -341,6 +414,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
        }
 }
 
+
 /*
  * Pick an osd (the first 'up' osd in the pg), and put result in
  * req->r_last_osd[_addr].  If none, set to -1.
@@ -354,7 +428,7 @@ static int map_osds(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        union ceph_pg pgid;
-       int osd = -1;
+       int o = -1;
        int err;
 
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
@@ -362,16 +436,32 @@ static int map_osds(struct ceph_osd_client *osdc,
        if (err)
                return err;
        pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
-       osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
-       dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
-            req->r_tid, pgid.pg64, pgid.pg.pool, osd, req->r_last_osd);
-       if (req->r_last_osd == osd &&
-           (osd < 0 || ceph_entity_addr_equal(&osdc->osdmap->osd_addr[osd],
-                                              &req->r_last_osd_addr)))
+       o = ceph_calc_pg_primary(osdc->osdmap, pgid);
+
+       if (req->r_osd && req->r_osd->o_osd == o)
                return 0;
-       req->r_last_osd = osd;
-       if (osd >= 0)
-               req->r_last_osd_addr = osdc->osdmap->osd_addr[osd];
+
+       dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
+            req->r_tid, pgid.pg64, pgid.pg.pool, o,
+            req->r_osd ? req->r_osd->o_osd : -1);
+
+       /* XXX FIXME: try to reuse r_osd where possible */
+       if (req->r_osd) {
+               list_del_init(&req->r_osd_item);
+               if (list_empty(&req->r_osd->o_requests))
+                       destroy_osd(osdc, req->r_osd);
+               req->r_osd = NULL;
+       }
+
+       req->r_osd = __lookup_osd(osdc, o);
+       if (!req->r_osd) {
+               req->r_osd = kmalloc(sizeof(*req->r_osd), GFP_NOFS);
+               if (!req->r_osd)
+                       return -ENOMEM;
+               init_osd(osdc, req->r_osd, o);
+               __insert_osd(osdc, req->r_osd);
+       }
+       list_add(&req->r_osd_item, &req->r_osd->o_requests);
        return 1;
 }
 
@@ -382,35 +472,31 @@ static int send_request(struct ceph_osd_client *osdc,
                        struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead;
-       int osd;
+       int err;
 
-       map_osds(osdc, req);
-       if (req->r_last_osd < 0) {
+       err = map_osds(osdc, req);
+       if (err == -ENOMEM)
+               return err;
+       if (req->r_osd == NULL) {
                dout("send_request %p no up osds in pg\n", req);
                ceph_monc_request_osdmap(&osdc->client->monc,
                                         osdc->osdmap->epoch+1);
                return 0;
        }
-       osd = req->r_last_osd;
 
        dout("send_request %p tid %llu to osd%d flags %d\n",
-            req, req->r_tid, osd, req->r_flags);
+            req, req->r_tid, req->r_osd->o_osd, req->r_flags);
 
        reqhead = req->r_request->front.iov_base;
        reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_request->hdr.dst.name.type =
-               cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
-       req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
-       req->r_request->hdr.dst.addr = req->r_last_osd_addr;
-
        req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
-       return ceph_msg_send(osdc->client->msgr, req->r_request,
-                            BASE_DELAY_INTERVAL);
+       ceph_con_send(req->r_osd->o_con, req->r_request);
+       return 0;
 }
 
 /*
@@ -424,6 +510,7 @@ static int send_request(struct ceph_osd_client *osdc,
  */
 static void handle_timeout(struct work_struct *work)
 {
+#if 0
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
        struct ceph_osd_request *req;
@@ -480,6 +567,7 @@ static void handle_timeout(struct work_struct *work)
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
+#endif
 }
 
 /*
@@ -585,11 +673,12 @@ bad:
  *
  * If @who is specified, resubmit requests for that specific osd.
  *
- * Caller should hold map_sem for read.
+ * Caller should hold map_sem for read and request_mutex.
  */
 static void kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_entity_addr *who)
 {
+#if 0
        struct ceph_osd_request *req;
        struct rb_node *p;
        int needmap = 0;
@@ -637,6 +726,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
                ceph_monc_request_osdmap(&osdc->client->monc,
                                         osdc->osdmap->epoch+1);
        }
+#endif
 }
 
 /*
index 026638bc9a4b51870e83ff53c034708eba607b1a..621024c40f5130744660dc64d8881f020bbf0a9e 100644 (file)
@@ -1,9 +1,9 @@
 #ifndef _FS_CEPH_OSD_CLIENT_H
 #define _FS_CEPH_OSD_CLIENT_H
 
-#include <linux/radix-tree.h>
 #include <linux/completion.h>
 #include <linux/mempool.h>
+#include <linux/rbtree.h>
 
 #include "types.h"
 #include "osdmap.h"
@@ -12,6 +12,7 @@ struct ceph_msg;
 struct ceph_snap_context;
 struct ceph_osd_request;
 struct ceph_osd_client;
+struct ceph_connection;
 
 /*
  * completion callback for async writepages
@@ -19,10 +20,20 @@ struct ceph_osd_client;
 typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
                                     struct ceph_msg *);
 
+/* a given osd we're communicating with */
+struct ceph_osd {
+       int o_osd;
+       struct rb_node o_node;
+       struct ceph_connection *o_con;
+       struct list_head o_requests;
+};
+
 /* an in-flight request */
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
        struct rb_node  r_node;
+       struct list_head r_osd_item;
+       struct ceph_osd *r_osd;
 
        struct ceph_msg  *r_request, *r_reply;
        int               r_result;
@@ -43,8 +54,6 @@ struct ceph_osd_request {
 
        char              r_oid[40];          /* object name */
        int               r_oid_len;
-       int               r_last_osd;         /* pg osds */
-       struct ceph_entity_addr r_last_osd_addr;
        unsigned long     r_timeout_stamp;
        bool              r_resend;           /* msg send failed, needs retry */
 
@@ -65,6 +74,7 @@ struct ceph_osd_client {
        u64                    last_requested_map;
 
        struct mutex           request_mutex;
+       struct rb_root         osds;          /* osds */
        u64                    timeout_tid;   /* tid of timeout triggering rq */
        u64                    last_tid;      /* tid of last request */
        struct rb_root         requests;      /* pending requests */