]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: replace osdc request_tree with rbtree
authorSage Weil <sage@newdream.net>
Mon, 27 Jul 2009 22:05:56 +0000 (15:05 -0700)
committerSage Weil <sage@newdream.net>
Mon, 27 Jul 2009 22:05:56 +0000 (15:05 -0700)
This avoids a possible memory allocation in writeout path.

src/TODO
src/kernel/debugfs.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index e9933e1d42b8197da0d6d8ed73fc541cfcc42046..99587724bbd6ccbf045c7726142dc485b19b6a50 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -47,6 +47,8 @@ v0.11
 
 v0.12
 - osdmap: allow explicit pg 'override' mappings
+- http gw
+
 
 bugs
 - premature filejournal trimming?
@@ -98,8 +100,8 @@ repair
 - mds scrubbing
 
 kclient
+- mempool for osd_request (if caller requests)
 - ensure cap_snaps reflush after client reconnect 
-- return EBADF on files without caps
 - fix up mds selection, and ESTALE handling
 - make cap import/export efficient
 - simplify mds auth tracking?
index a41238dddce1b145990228818d237e8035d77ee1..b7de0bed552f6382ee090c44a0564d3f1b288bce 100644 (file)
@@ -220,27 +220,22 @@ static int mdsc_show(struct seq_file *s, void *p)
        return 0;
 }
 
-static int osdc_show(struct seq_file *s, void *p)
+static int osdc_show(struct seq_file *s, void *pp)
 {
        struct ceph_client *client = s->private;
        struct ceph_osd_client *osdc = &client->osdc;
-       u64 nexttid = 0;
+       struct rb_node *p;
 
        mutex_lock(&osdc->request_mutex);
-       while (nexttid < osdc->last_tid) {
+       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
                struct ceph_osd_request *req;
                struct ceph_osd_request_head *head;
                struct ceph_osd_op *op;
                int num_ops;
                int opcode, olen;
-               int got, i;
-
-               got = radix_tree_gang_lookup(&osdc->request_tree,
-                                            (void **)&req, nexttid, 1);
-               if (got == 0)
-                       break;
+               int i;
 
-               nexttid = req->r_tid + 1;
+               req = rb_entry(p, struct ceph_osd_request, r_node);
 
                seq_printf(s, "%lld\t%u.%u.%u.%u:%u (%s%d)\t",
                           req->r_tid,
index 5767ecb5b24fdf66136a9acf1707bfa6c0d5c9e3..8514552fbce309840b78e9e03ee5081013d6f7e9 100644 (file)
@@ -206,25 +206,86 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        return req;
 }
 
+/*
+ * We keep osd requests in an rbtree, sorted by ->r_tid.
+ */
+static void __insert_request(struct ceph_osd_client *osdc,
+                            struct ceph_osd_request *new)
+{
+       struct rb_node **p = &osdc->requests.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_request *req = NULL;
+
+       while (*p) {
+               parent = *p;
+               req = rb_entry(parent, struct ceph_osd_request, r_node);
+               if (new->r_tid < req->r_tid)
+                       p = &(*p)->rb_left;
+               else if (new->r_tid > req->r_tid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->r_node, parent, p);
+       rb_insert_color(&new->r_node, &osdc->requests);
+}
+
+static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
+                                                u64 tid)
+{
+       struct ceph_osd_request *req;
+       struct rb_node *n = osdc->requests.rb_node;
+
+       while (n) {
+               req = rb_entry(n, struct ceph_osd_request, r_node);
+               if (tid < req->r_tid)
+                       n = n->rb_left;
+               else if (tid > req->r_tid)
+                       n = n->rb_right;
+               else
+                       return req;
+       }
+       return NULL;
+}
+
+static struct ceph_osd_request *
+__lookup_request_ge(struct ceph_osd_client *osdc,
+                   u64 tid)
+{
+       struct ceph_osd_request *req;
+       struct rb_node *n = osdc->requests.rb_node;
+
+       while (n) {
+               req = rb_entry(n, struct ceph_osd_request, r_node);
+               if (tid < req->r_tid) {
+                       if (!n->rb_left)
+                               return req;
+                       n = n->rb_left;
+               } else if (tid > req->r_tid) {
+                       n = n->rb_right;
+               } else {
+                       return req;
+               }
+       }
+       return NULL;
+}
+
 /*
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
  */
-static int register_request(struct ceph_osd_client *osdc,
-                           struct ceph_osd_request *req)
+static void register_request(struct ceph_osd_client *osdc,
+                            struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *head = req->r_request->front.iov_base;
-       int rc;
 
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        head->tid = cpu_to_le64(req->r_tid);
 
        dout("register_request %p tid %lld\n", req, req->r_tid);
-       rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
-       if (rc < 0)
-               goto out;
-
+       __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
 
@@ -238,9 +299,7 @@ static int register_request(struct ceph_osd_client *osdc,
                schedule_delayed_work(&osdc->timeout_work,
                      round_jiffies_relative(req->r_timeout_stamp - jiffies));
        }
-out:
        mutex_unlock(&osdc->request_mutex);
-       return rc;
 }
 
 /*
@@ -260,8 +319,7 @@ static void handle_timeout(struct work_struct *work)
        unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
        unsigned long next_timeout = timeout + jiffies;
        RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
-       u64 next_tid = 0;
-       int got;
+       struct rb_node *p;
 
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -269,14 +327,11 @@ static void handle_timeout(struct work_struct *work)
        ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
 
        mutex_lock(&osdc->request_mutex);
-       while (1) {
-               got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
-                                            next_tid, 1);
-               if (got == 0)
-                       break;
-               next_tid = req->r_tid + 1;
+       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+               req = rb_entry(p, struct ceph_osd_request, r_node);
+
                if (time_before(jiffies, req->r_timeout_stamp))
-                       goto next;
+                       continue;
 
                req->r_timeout_stamp = next_timeout;
                if (req->r_last_osd >= 0 &&
@@ -290,10 +345,6 @@ static void handle_timeout(struct work_struct *work)
                        radix_tree_insert(&pings, req->r_last_osd, req);
                        ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
                }
-
-       next:
-               got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
-                                            next_tid, 1);
        }
 
        while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
@@ -315,8 +366,7 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                                 struct ceph_osd_request *req)
 {
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
-       radix_tree_delete(&osdc->request_tree, req->r_tid);
-
+       rb_erase(&req->r_node, &osdc->requests);
        osdc->num_requests--;
        ceph_osdc_put_request(req);
 
@@ -326,11 +376,8 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                        osdc->timeout_tid = 0;
                        cancel_delayed_work(&osdc->timeout_work);
                } else {
-                       int ret;
-
-                       ret = radix_tree_gang_lookup(&osdc->request_tree,
-                                                    (void **)&req, 0, 1);
-                       BUG_ON(ret != 1);
+                       req = rb_entry(rb_first(&osdc->requests),
+                                      struct ceph_osd_request, r_node);
                        osdc->timeout_tid = req->r_tid;
                        dout("rescheduled timeout on tid %llu at %lu\n",
                             req->r_tid, req->r_timeout_stamp);
@@ -469,7 +516,7 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 
        /* lookup */
        mutex_lock(&osdc->request_mutex);
-       req = radix_tree_lookup(&osdc->request_tree, tid);
+       req = __lookup_request(osdc, tid);
        if (req == NULL) {
                dout("handle_reply tid %llu dne\n", tid);
                mutex_unlock(&osdc->request_mutex);
@@ -546,17 +593,12 @@ static void kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_entity_addr *who)
 {
        struct ceph_osd_request *req;
-       u64 next_tid = 0;
-       int got;
+       struct rb_node *p;
        int needmap = 0;
 
        mutex_lock(&osdc->request_mutex);
-       while (1) {
-               got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
-                                            next_tid, 1);
-               if (got == 0)
-                       break;
-               next_tid = req->r_tid + 1;
+       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+               req = rb_entry(p, struct ceph_osd_request, r_node);
 
                if (who && ceph_entity_addr_equal(who, &req->r_last_osd_addr))
                        goto kick;
@@ -739,7 +781,7 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
 
        tid = le64_to_cpu(rhead->tid);
        mutex_lock(&osdc->request_mutex);
-       req = radix_tree_lookup(&osdc->request_tree, tid);
+       req = __lookup_request(osdc, tid);
        if (!req) {
                dout("prepare_pages unknown tid %llu\n", tid);
                goto out;
@@ -770,9 +812,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
        req->r_request->pages = req->r_pages;
        req->r_request->nr_pages = req->r_num_pages;
 
-       rc = register_request(osdc, req);
-       if (rc < 0)
-               return rc;
+       register_request(osdc, req);
 
        down_read(&osdc->map_sem);
        rc = send_request(osdc, req);
@@ -843,14 +883,12 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)
 {
        struct ceph_osd_request *req;
        u64 last_tid, next_tid = 0;
-       int got;
 
        mutex_lock(&osdc->request_mutex);
        last_tid = osdc->last_tid;
        while (1) {
-               got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
-                                            next_tid, 1);
-               if (!got)
+               req = __lookup_request_ge(osdc, next_tid);
+               if (!req)
                        break;
                if (req->r_tid > last_tid)
                        break;
@@ -885,7 +923,7 @@ void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        mutex_init(&osdc->request_mutex);
        osdc->timeout_tid = 0;
        osdc->last_tid = 0;
-       INIT_RADIX_TREE(&osdc->request_tree, GFP_NOFS);
+       osdc->requests = RB_ROOT;
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 }
index 7634b1f8497b187d9dee1ea713932f14fa7c016c..5aa2b6abc407ce01154cd13b4e277c969d7a2380 100644 (file)
@@ -19,6 +19,7 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 /* an in-flight request */
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
+       struct rb_node  r_node;
 
        struct ceph_msg  *r_request;
        struct ceph_msg  *r_reply;
@@ -60,7 +61,7 @@ struct ceph_osd_client {
        struct mutex           request_mutex;
        u64                    timeout_tid;   /* tid of timeout triggering rq */
        u64                    last_tid;      /* tid of last request */
-       struct radix_tree_root request_tree;  /* pending requests, by tid */
+       struct rb_root         requests;      /* pending requests */
        int                    num_requests;
        struct delayed_work    timeout_work;
        struct dentry          *debugfs_file;