]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: nofail mode for osd writes
authorSage Weil <sage@newdream.net>
Wed, 29 Jul 2009 20:28:23 +0000 (13:28 -0700)
committerSage Weil <sage@newdream.net>
Wed, 29 Jul 2009 22:53:33 +0000 (15:53 -0700)
If nofail is specified, allocate request from mempool, and do not
return error on message send failure.  Instead, mark the request,
and periodically retry.

This isn't perfect: we can still starve indefinitely trying to
send the write, but it'll do until we have a better way to reserve
resources for writeback messages.

src/TODO
src/kernel/addr.c
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index 9db6635841aa16f7dee8fac4c7c64d8d53d95483..ab60920328fdb448e13ccf73a89328e69c10abc2 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -107,7 +107,6 @@ repair
 - mds scrubbing
 
 kclient
-- mempool for osd_request (if caller requests)
 - ensure cap_snaps reflush after client reconnect 
 - fix up mds selection, and ESTALE handling
 - make cap import/export efficient
@@ -116,10 +115,6 @@ kclient
 - unwind writeback start error in addr.c (see fixme)... by redirtying pages?
 - flock, fnctl locks
 - ACLs
-- make writepages maybe skip pages with errors?
-  - EIO, or ENOSPC?
-  - ... writeback vs ENOSPC vs flush vs close()... hrm...
-- set mapping bits for ENOSPC, EIO?
 - should we try to ref CAP_PIN on special inodes that are open?  
 - fix readdir vs fragment race by keeping a separate frag pos, and ignoring dentries below it
 - reconnect after being disconnected from the mds
@@ -184,15 +179,6 @@ mds
   - linkage vs cdentry replicas and remote rename....
   - rename: importing inode... also journal imported client map?
 
-
-
-journaler
-- fix up for large events (e.g. imports)
-- should we pad with zeros to avoid splitting individual entries?
-  - make it a g_conf flag?
-  - have to fix reader to skip over zeros (either <4 bytes for size, or zeroed sizes)
-
-
 mon
 - mds injectargs N should take mds# or id.  * should bcast to standy mds's.
 - paxos need to clean up old states.
index c8d4577c1f099d3d8a0dd3276a5f5783a9fefe25..8cb7022707391e423bce565bd10f7a1882fb01d6 100644 (file)
@@ -428,7 +428,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
                                   page_off, len,
                                   ci->i_truncate_seq, ci->i_truncate_size,
                                   &inode->i_mtime,
-                                  &page, 1, 0, 0);
+                                  &page, 1, 0, 0, true);
        if (err < 0) {
                dout("writepage setting page/mapping error %d %p\n", err, page);
                SetPageError(page);
@@ -552,20 +552,16 @@ static void writepages_finish(struct ceph_osd_request *req)
  * mempool.  we avoid the mempool if we can because req->r_num_pages
  * may be less than the maximum write size.
  */
-static int alloc_page_vec(struct ceph_client *client,
-                         struct ceph_osd_request *req)
+static void alloc_page_vec(struct ceph_client *client,
+                          struct ceph_osd_request *req)
 {
        req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
                               GFP_NOFS);
-       if (req->r_pages) {
-               req->r_pages_from_pool = 0;
-               return 0;
+       if (!req->r_pages) {
+               req->r_pages = mempool_alloc(client->wb_pagevec_pool, GFP_NOFS);
+               req->r_pages_from_pool = 1;
+               WARN_ON(!req->r_pages);
        }
-
-       req->r_pages = mempool_alloc(client->wb_pagevec_pool, GFP_NOFS);
-       req->r_pages_from_pool = 1;
-       WARN_ON(!req->r_pages);
-       return -ENOMEM;
 }
 
 /*
@@ -770,9 +766,7 @@ get_more_pages:
                                            &inode->i_mtime, true);
                                max_pages = req->r_num_pages;
 
-                               rc = alloc_page_vec(client, req);
-                               if (rc)
-                                       goto out;
+                               alloc_page_vec(client, req);
                                req->r_callback = writepages_finish;
                                req->r_inode = inode;
                                req->r_wbc = wbc;
@@ -828,12 +822,8 @@ get_more_pages:
                op->payload_len = op->length;
                req->r_request->hdr.data_len = cpu_to_le32(len);
 
-               rc = ceph_osdc_start_request(&client->osdc, req);
+               ceph_osdc_start_request(&client->osdc, req, true);
                req = NULL;
-               /*
-                * FIXME: if writepages_start fails (ENOMEM?) we should
-                * really redirty all those pages and release req..
-                */
 
                /* continue? */
                index = next;
index 436e5c026e1f00dac6a111e6c052f3275c146a74..416fc82d3fb9864fab580b1dc0227081ef182ffa 100644 (file)
@@ -618,7 +618,7 @@ more:
        req->r_num_pages = num_pages;
        req->r_inode = inode;
 
-       ret = ceph_osdc_start_request(&client->osdc, req);
+       ret = ceph_osdc_start_request(&client->osdc, req, false);
        if (!ret) {
                if (req->r_safe_callback) {
                        /*
index 1d81917aee879ab5e0b280608527709e0e068ac7..553d58fb569a1f904c03fcf89278bc3ac2ededba 100644 (file)
@@ -313,63 +313,6 @@ static void register_request(struct ceph_osd_client *osdc,
        mutex_unlock(&osdc->request_mutex);
 }
 
-/*
- * Timeout callback, called every N seconds when 1 or more osd
- * requests has been active for more than N seconds.  When this
- * happens, we ping all OSDs with requests who have timed out to
- * ensure any communications channel reset is detected.  Reset the
- * request timeouts another N seconds in the future as we go.
- * Reschedule the timeout event another N seconds in future (unless
- * there are no open requests).
- */
-static void handle_timeout(struct work_struct *work)
-{
-       struct ceph_osd_client *osdc =
-               container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req;
-       unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies;
-       RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
-       struct rb_node *p;
-
-       dout("timeout\n");
-       down_read(&osdc->map_sem);
-
-       ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
-
-       mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-               req = rb_entry(p, struct ceph_osd_request, r_node);
-
-               if (time_before(jiffies, req->r_timeout_stamp))
-                       continue;
-
-               req->r_timeout_stamp = next_timeout;
-               if (req->r_last_osd >= 0 &&
-                   radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
-                       struct ceph_entity_name n = {
-                               .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
-                               .num = cpu_to_le32(req->r_last_osd)
-                       };
-                       dout(" tid %llu (at least) timed out on osd%d\n",
-                            req->r_tid, req->r_last_osd);
-                       radix_tree_insert(&pings, req->r_last_osd, req);
-                       ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
-               }
-       }
-
-       while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
-               radix_tree_delete(&pings, req->r_last_osd);
-
-       if (osdc->timeout_tid)
-               schedule_delayed_work(&osdc->timeout_work,
-                                     round_jiffies_relative(timeout));
-
-       mutex_unlock(&osdc->request_mutex);
-
-       up_read(&osdc->map_sem);
-}
-
 /*
  * called under osdc->request_mutex
  */
@@ -504,6 +447,75 @@ static int send_request(struct ceph_osd_client *osdc,
                             BASE_DELAY_INTERVAL);
 }
 
+/*
+ * Timeout callback, called every N seconds when 1 or more osd
+ * requests has been active for more than N seconds.  When this
+ * happens, we ping all OSDs with requests who have timed out to
+ * ensure any communications channel reset is detected.  Reset the
+ * request timeouts another N seconds in the future as we go.
+ * Reschedule the timeout event another N seconds in future (unless
+ * there are no open requests).
+ */
+static void handle_timeout(struct work_struct *work)
+{
+       struct ceph_osd_client *osdc =
+               container_of(work, struct ceph_osd_client, timeout_work.work);
+       struct ceph_osd_request *req;
+       unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
+       unsigned long next_timeout = timeout + jiffies;
+       RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
+       struct rb_node *p;
+
+       dout("timeout\n");
+       down_read(&osdc->map_sem);
+
+       ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
+
+       mutex_lock(&osdc->request_mutex);
+       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+               req = rb_entry(p, struct ceph_osd_request, r_node);
+
+               if (req->r_resend) {
+                       int err;
+
+                       dout("osdc resending prev failed %lld\n", req->r_tid);
+                       err = send_request(osdc, req);
+                       if (err)
+                               dout("osdc failed again on %lld\n", req->r_tid);
+                       else
+                               req->r_resend = false;
+                       continue;
+               }
+
+               if (time_before(jiffies, req->r_timeout_stamp))
+                       continue;
+
+               req->r_timeout_stamp = next_timeout;
+               if (req->r_last_osd >= 0 &&
+                   radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
+                       struct ceph_entity_name n = {
+                               .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
+                               .num = cpu_to_le32(req->r_last_osd)
+                       };
+                       dout(" tid %llu (at least) timed out on osd%d\n",
+                            req->r_tid, req->r_last_osd);
+                       radix_tree_insert(&pings, req->r_last_osd, req);
+                       ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
+               }
+       }
+
+       while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
+               radix_tree_delete(&pings, req->r_last_osd);
+
+       if (osdc->timeout_tid)
+               schedule_delayed_work(&osdc->timeout_work,
+                                     round_jiffies_relative(timeout));
+
+       mutex_unlock(&osdc->request_mutex);
+
+       up_read(&osdc->map_sem);
+}
+
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
@@ -606,11 +618,14 @@ static void kick_requests(struct ceph_osd_client *osdc,
        struct ceph_osd_request *req;
        struct rb_node *p;
        int needmap = 0;
+       int err;
 
        mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
 
+               if (req->r_resend)
+                       goto kick;
                if (who && ceph_entity_addr_equal(who, &req->r_last_osd_addr))
                        goto kick;
 
@@ -633,7 +648,9 @@ static void kick_requests(struct ceph_osd_client *osdc,
                req->r_request = ceph_msg_maybe_dup(req->r_request);
                if (!req->r_aborted) {
                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
-                       send_request(osdc, req);
+                       err = send_request(osdc, req);
+                       if (err)
+                               req->r_resend = true;
                }
                ceph_osdc_put_request(req);
                mutex_lock(&osdc->request_mutex);
@@ -816,7 +833,8 @@ out:
  * Register request, send initial attempt.
  */
 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
-                           struct ceph_osd_request *req)
+                           struct ceph_osd_request *req,
+                           bool nofail)
 {
        int rc;
 
@@ -828,6 +846,18 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
        down_read(&osdc->map_sem);
        rc = send_request(osdc, req);
        up_read(&osdc->map_sem);
+       if (rc) {
+               if (nofail) {
+                       dout("osdc_start_request failed send, marking %lld\n",
+                            req->r_tid);
+                       req->r_resend = true;
+                       rc = 0;
+               } else {
+                       mutex_lock(&osdc->request_mutex);
+                       __unregister_request(osdc, req);
+                       mutex_unlock(&osdc->request_mutex);
+               }
+       }
        return rc;
 }
 
@@ -987,7 +1017,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        dout("readpages final extent is %llu~%llu (%d pages)\n",
             off, len, req->r_num_pages);
 
-       rc = ceph_osdc_start_request(osdc, req);
+       rc = ceph_osdc_start_request(osdc, req, false);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);
 
@@ -1041,7 +1071,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         u32 truncate_seq, u64 truncate_size,
                         struct timespec *mtime,
                         struct page **pages, int num_pages,
-                        int flags, int do_sync)
+                        int flags, int do_sync, bool nofail)
 {
        struct ceph_osd_request *req;
        int rc = 0;
@@ -1053,7 +1083,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                            CEPH_OSD_FLAG_WRITE,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
-                                   true);
+                                   nofail);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1063,7 +1093,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        dout("writepages %llu~%llu (%d pages)\n", off, len,
             req->r_num_pages);
 
-       rc = ceph_osdc_start_request(osdc, req);
+       rc = ceph_osdc_start_request(osdc, req, nofail);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);
 
index a0a5117f8e16b672250e2674b15c5b32495eca3d..fa6c5e15e5ae48095e9ab6a35e404721f7f5ddbf 100644 (file)
@@ -45,6 +45,7 @@ struct ceph_osd_request {
        int               r_last_osd;         /* pg osds */
        struct ceph_entity_addr r_last_osd_addr;
        unsigned long     r_timeout_stamp;
+       bool              r_resend;           /* msg send failed, needs retry */
 
        struct ceph_file_layout r_file_layout;
        struct ceph_snap_context *r_snapc;    /* snap context for writes */
@@ -106,7 +107,8 @@ static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
 extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
-                                  struct ceph_osd_request *req);
+                                  struct ceph_osd_request *req,
+                                  bool nofail);
 extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
                                  struct ceph_osd_request *req);
 extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
@@ -128,7 +130,7 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                u32 truncate_seq, u64 truncate_size,
                                struct timespec *mtime,
                                struct page **pages, int nr_pages,
-                               int flags, int do_sync);
+                               int flags, int do_sync, bool nofail);
 
 #endif