]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: handle osd resets, timeouts, etc.
authorSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 00:57:42 +0000 (17:57 -0700)
committerSage Weil <sage@newdream.net>
Thu, 27 Aug 2009 00:57:42 +0000 (17:57 -0700)
src/kernel/osd_client.c

index c5a78f31899313b8460c501f0f959765c53313bb..a0d7dd30198fb1e7e958438b0c88468f15b78e90 100644 (file)
@@ -305,12 +305,9 @@ static void peer_reset(struct ceph_connection *con)
 /*
  * Track open sessions with osds.
  */
-static int init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
+static int open_osd_session(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       dout("init_osd %p osd%d\n", osd, o);
-       osd->o_osdc = osdc;
-       osd->o_osd = o;
-       INIT_LIST_HEAD(&osd->o_requests);
+       int o = osd->o_osd;
 
        osd->o_con = kzalloc(sizeof(*osd->o_con), GFP_NOFS);
        if (!osd->o_con)
@@ -324,11 +321,37 @@ static int init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
        return 0;
 }
 
+static void init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
+{
+       dout("init_osd %p osd%d\n", osd, o);
+       osd->o_osdc = osdc;
+       osd->o_osd = o;
+       INIT_LIST_HEAD(&osd->o_requests);
+}
+
 static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
        dout("destroy_osd %p\n", osd);
+       BUG_ON(!list_empty(&osd->o_requests));
        rb_erase(&osd->o_node, &osdc->osds);
+       ceph_con_close(osd->o_con);
        osd->o_con->ops->put(osd->o_con);
+       kfree(osd);
+}
+
+static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+{
+       int ret = 0;
+
+       dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+       if (list_empty(&osd->o_requests)) {
+               destroy_osd(osdc, osd);
+       } else {
+               ceph_con_close(osd->o_con);
+               osd->o_con->ops->put(osd->o_con);
+               ret = open_osd_session(osdc, osd);
+       }
+       return ret;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -483,6 +506,8 @@ static int map_osds(struct ceph_osd_client *osdc,
                if (!req->r_osd)
                        return -ENOMEM;
                init_osd(osdc, req->r_osd, o);
+#warning check return value
+               open_osd_session(osdc, req->r_osd);
                __insert_osd(osdc, req->r_osd);
        }
        list_add(&req->r_osd_item, &req->r_osd->o_requests);
@@ -534,13 +559,12 @@ static int send_request(struct ceph_osd_client *osdc,
  */
 static void handle_timeout(struct work_struct *work)
 {
-#if 0
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
        struct ceph_osd_request *req;
+       struct ceph_osd *osd;
        unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
        unsigned long next_timeout = timeout + jiffies;
-       RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
        struct rb_node *p;
 
        dout("timeout\n");
@@ -563,27 +587,22 @@ static void handle_timeout(struct work_struct *work)
                                req->r_resend = false;
                        continue;
                }
-
+       }
+       for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
+               osd = rb_entry(p, struct ceph_osd, o_node);
+               if (list_empty(&osd->o_requests))
+                       continue;
+               req = list_first_entry(&osd->o_requests,
+                                      struct ceph_osd_request, r_osd_item);
                if (time_before(jiffies, req->r_timeout_stamp))
                        continue;
 
+               dout(" tid %llu (at least) timed out on osd%d\n",
+                    req->r_tid, osd->o_osd);
                req->r_timeout_stamp = next_timeout;
-               if (req->r_last_osd >= 0 &&
-                   radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
-                       struct ceph_entity_name n = {
-                               .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
-                               .num = cpu_to_le32(req->r_last_osd)
-                       };
-                       dout(" tid %llu (at least) timed out on osd%d\n",
-                            req->r_tid, req->r_last_osd);
-                       radix_tree_insert(&pings, req->r_last_osd, req);
-                       ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
-               }
+               ceph_con_keepalive(osd->o_con);
        }
 
-       while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
-               radix_tree_delete(&pings, req->r_last_osd);
-
        if (osdc->timeout_tid)
                schedule_delayed_work(&osdc->timeout_work,
                                      round_jiffies_relative(timeout));
@@ -591,7 +610,6 @@ static void handle_timeout(struct work_struct *work)
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
-#endif
 }
 
 /*
@@ -704,36 +722,43 @@ bad:
 static void kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *osd)
 {
-#warning fixme
-#if 0
        struct ceph_osd_request *req;
-       struct rb_node *p;
+       struct rb_node *p, *n;
        int needmap = 0;
        int err;
 
        mutex_lock(&osdc->request_mutex);
+       if (!osd) {
+               for (p = rb_first(&osdc->osds); p; p = n) {
+                       n = rb_next(p);
+                       osd = rb_entry(p, struct ceph_osd, o_node);
+                       if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+                           !ceph_entity_addr_equal(&osd->o_con->peer_addr,
+                                           ceph_osd_addr(osdc->osdmap,
+                                                         osd->o_osd)))
+                               reset_osd(osdc, osd);
+               }
+       }
+
        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
 
                if (req->r_resend)
                        goto kick;
-               if (who && ceph_entity_addr_equal(who, &req->r_last_osd_addr))
+               if (osd == req->r_osd)
                        goto kick;
 
                if (map_osds(osdc, req) == 0)
                        continue;  /* no change */
 
-               if (req->r_last_osd < 0) {
+               if (req->r_osd == NULL) {
                        dout("tid %llu maps to no valid osd\n", req->r_tid);
                        needmap++;  /* request a newer map */
-                       memset(&req->r_last_osd_addr, 0,
-                              sizeof(req->r_last_osd_addr));
                        continue;
                }
 
        kick:
-               dout("kicking tid %llu osd%d\n", req->r_tid,
-                    req->r_last_osd);
+               dout("kicking tid %llu osd%d\n", req->r_tid, req->r_osd->o_osd);
                ceph_osdc_get_request(req);
                mutex_unlock(&osdc->request_mutex);
                req->r_request = ceph_msg_maybe_dup(req->r_request);
@@ -753,7 +778,6 @@ static void kick_requests(struct ceph_osd_client *osdc,
                ceph_monc_request_osdmap(&osdc->client->monc,
                                         osdc->osdmap->epoch+1);
        }
-#endif
 }
 
 /*