]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: resend osd ops when pg membership changes
authorSage Weil <sage@newdream.net>
Fri, 23 Jan 2009 20:16:02 +0000 (12:16 -0800)
committerSage Weil <sage@newdream.net>
Fri, 23 Jan 2009 20:43:14 +0000 (12:43 -0800)
Previously we resent only when the primary OSD changed.  See
f49f78d03ba9786f4e8423c530f0e016a8d814fa.

src/kernel/osd_client.c
src/kernel/osd_client.h

index 748c965e1eda2c2dd9600cee4fcbeaf544da9508..248b29147409369db9b4710c7c691b238e75ab26 100644 (file)
@@ -238,18 +238,18 @@ static void __unregister_request(struct ceph_osd_client *osdc,
  * pick an osd.  the first up osd in the pg.  or -1.
  * caller should hold map_sem for read.
  */
-static int pick_osd(struct ceph_osd_client *osdc,
+static int map_osds(struct ceph_osd_client *osdc,
                    struct ceph_osd_request *req)
 {
        int ruleno;
        unsigned pps; /* placement ps */
-       int osds[10];
-       int i, num;
+       int osds[MAX_PG_SIZE];
+       int i, j, num;
 
        ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool,
                                 req->r_pgid.pg.type, req->r_pgid.pg.size);
        if (ruleno < 0) {
-               derr(0, "pick_osd no crush rule for pool %d type %d size %d\n",
+               derr(0, "map_osds no crush rule for pool %d type %d size %d\n",
                     req->r_pgid.pg.pool, req->r_pgid.pg.type,
                     req->r_pgid.pg.size);
                return -1;
@@ -267,10 +267,17 @@ static int pick_osd(struct ceph_osd_client *osdc,
                            min_t(int, req->r_pgid.pg.size, ARRAY_SIZE(osds)),
                            req->r_pgid.pg.preferred, osdc->osdmap->osd_weight);
 
-       for (i = 0; i < num; i++)
+       /* filter out down osds */
+       for (i = 0, j = 0; i < num; i++)
                if (ceph_osd_is_up(osdc->osdmap, osds[i]))
-                       return osds[i];
-       return -1;
+                       osds[j++] = osds[i];
+       num = j;
+
+       /* same? */
+       if (num == req->r_pg_num_osds &&
+           memcpy(osds, req->r_pg_osds, sizeof(osds[0]) * num) == 0)
+               return 0;
+       return 1;
 }
 
 /*
@@ -282,13 +289,14 @@ static int send_request(struct ceph_osd_client *osdc,
        struct ceph_osd_request_head *reqhead;
        int osd;
 
-       osd = pick_osd(osdc, req);
-       if (osd < 0) {
+       map_osds(osdc, req);
+       if (req->r_pg_num_osds == 0) {
                dout(10, "send_request %p no up osds in pg\n", req);
                ceph_monc_request_osdmap(&osdc->client->monc,
                                         osdc->osdmap->epoch+1);
                return 0;
        }
+       osd = req->r_pg_osds[0];
 
        dout(10, "send_request %p tid %llu to osd%d flags %d\n",
             req, req->r_tid, osd, req->r_flags);
@@ -302,7 +310,6 @@ static int send_request(struct ceph_osd_client *osdc,
        req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
        req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
 
-       req->r_last_osd = osd;
        req->r_last_osd_addr = req->r_request->hdr.dst.addr;
        req->r_last_stamp = jiffies;
 
@@ -386,7 +393,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
        struct ceph_osd_request *req;
        u64 next_tid = 0;
        int got;
-       int osd;
        int needmap = 0;
 
        mutex_lock(&osdc->request_mutex);
@@ -396,30 +402,29 @@ static void kick_requests(struct ceph_osd_client *osdc,
                if (got == 0)
                        break;
                next_tid = req->r_tid + 1;
-               osd = pick_osd(osdc, req);
-               if (osd < 0 || osd >= osdc->osdmap->max_osd) {
+
+               if (map_osds(osdc, req) == 0)
+                       continue;  /* no change */
+
+               if (req->r_pg_num_osds == 0) {
                        dout(20, "tid %llu maps to no valid osd\n", req->r_tid);
                        needmap++;  /* request a newer map */
-                       req->r_last_osd = -1;
                        memset(&req->r_last_osd_addr, 0,
                               sizeof(req->r_last_osd_addr));
                        continue;
                }
-               if (!ceph_entity_addr_equal(&req->r_last_osd_addr,
-                                           &osdc->osdmap->osd_addr[osd]) ||
-                   (who && ceph_entity_addr_equal(&req->r_last_osd_addr,
-                                                  who))) {
-                       dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd);
-                       get_request(req);
-                       mutex_unlock(&osdc->request_mutex);
-                       req->r_request = ceph_msg_maybe_dup(req->r_request);
-                       if (!req->r_aborted) {
-                               req->r_flags |= CEPH_OSD_OP_RETRY;
-                               send_request(osdc, req);
-                       }
-                       ceph_osdc_put_request(req);
-                       mutex_lock(&osdc->request_mutex);
+
+               dout(20, "kicking tid %llu osd%d\n", req->r_tid,
+                    req->r_pg_osds[0]);
+               get_request(req);
+               mutex_unlock(&osdc->request_mutex);
+               req->r_request = ceph_msg_maybe_dup(req->r_request);
+               if (!req->r_aborted) {
+                       req->r_flags |= CEPH_OSD_OP_RETRY;
+                       send_request(osdc, req);
                }
+               ceph_osdc_put_request(req);
+               mutex_lock(&osdc->request_mutex);
        }
        mutex_unlock(&osdc->request_mutex);
 
@@ -725,10 +730,11 @@ static void handle_timeout(struct work_struct *work)
                if (got == 0)
                        break;
                next_tid = req->r_tid + 1;
-               if (time_after(jiffies, req->r_last_stamp + timeout)) {
+               if (time_after(jiffies, req->r_last_stamp + timeout) &&
+                   req->r_pg_num_osds > 0) {
                        struct ceph_entity_name n = {
                                .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
-                               .num = cpu_to_le32(req->r_last_osd)
+                               .num = cpu_to_le32(req->r_pg_osds[0])
                        };
                        ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
                }
index a01de8ecb626ce7d4eacac529b1dc22a3fe78da2..7e6aff8caba9c362ccdbf5ee12611c4f16479ad5 100644 (file)
@@ -32,6 +32,8 @@ struct ceph_osd_request;
  */
 typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
+#define MAX_PG_SIZE 10
+
 /* an in-flight request */
 struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
@@ -47,7 +49,8 @@ struct ceph_osd_request {
        struct inode *r_inode;                /* needed for async write */
        struct writeback_control *r_wbc;
 
-       int               r_last_osd;         /* last osd we sent request to */
+       int               r_pg_osds[MAX_PG_SIZE];   /* pg osds */
+       int               r_pg_num_osds;
        struct ceph_entity_addr r_last_osd_addr;
        unsigned long     r_last_stamp;