page_off, len,
ci->i_truncate_seq, ci->i_truncate_size,
&inode->i_mtime,
- &page, 1, 0, 0);
+ &page, 1, 0, 0, true);
if (err < 0) {
dout("writepage setting page/mapping error %d %p\n", err, page);
SetPageError(page);
* mempool. we avoid the mempool if we can because req->r_num_pages
* may be less than the maximum write size.
*/
-static int alloc_page_vec(struct ceph_client *client,
- struct ceph_osd_request *req)
+static void alloc_page_vec(struct ceph_client *client,
+ struct ceph_osd_request *req)
{
req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
GFP_NOFS);
- if (req->r_pages) {
- req->r_pages_from_pool = 0;
- return 0;
+ if (!req->r_pages) {
+ req->r_pages = mempool_alloc(client->wb_pagevec_pool, GFP_NOFS);
+ req->r_pages_from_pool = 1;
+ WARN_ON(!req->r_pages);
}
-
- req->r_pages = mempool_alloc(client->wb_pagevec_pool, GFP_NOFS);
- req->r_pages_from_pool = 1;
- WARN_ON(!req->r_pages);
- return -ENOMEM;
}
/*
&inode->i_mtime, true);
max_pages = req->r_num_pages;
- rc = alloc_page_vec(client, req);
- if (rc)
- goto out;
+ alloc_page_vec(client, req);
req->r_callback = writepages_finish;
req->r_inode = inode;
req->r_wbc = wbc;
op->payload_len = op->length;
req->r_request->hdr.data_len = cpu_to_le32(len);
- rc = ceph_osdc_start_request(&client->osdc, req);
+ ceph_osdc_start_request(&client->osdc, req, true);
req = NULL;
- /*
- * FIXME: if writepages_start fails (ENOMEM?) we should
- * really redirty all those pages and release req..
- */
/* continue? */
index = next;
mutex_unlock(&osdc->request_mutex);
}
-/*
- * Timeout callback, called every N seconds when 1 or more osd
- * requests has been active for more than N seconds. When this
- * happens, we ping all OSDs with requests who have timed out to
- * ensure any communications channel reset is detected. Reset the
- * request timeouts another N seconds in the future as we go.
- * Reschedule the timeout event another N seconds in future (unless
- * there are no open requests).
- */
-static void handle_timeout(struct work_struct *work)
-{
- struct ceph_osd_client *osdc =
- container_of(work, struct ceph_osd_client, timeout_work.work);
- struct ceph_osd_request *req;
- unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
- unsigned long next_timeout = timeout + jiffies;
- RADIX_TREE(pings, GFP_NOFS); /* only send 1 ping per osd */
- struct rb_node *p;
-
- dout("timeout\n");
- down_read(&osdc->map_sem);
-
- ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
-
- mutex_lock(&osdc->request_mutex);
- for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
- req = rb_entry(p, struct ceph_osd_request, r_node);
-
- if (time_before(jiffies, req->r_timeout_stamp))
- continue;
-
- req->r_timeout_stamp = next_timeout;
- if (req->r_last_osd >= 0 &&
- radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
- struct ceph_entity_name n = {
- .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
- .num = cpu_to_le32(req->r_last_osd)
- };
- dout(" tid %llu (at least) timed out on osd%d\n",
- req->r_tid, req->r_last_osd);
- radix_tree_insert(&pings, req->r_last_osd, req);
- ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
- }
- }
-
- while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
- radix_tree_delete(&pings, req->r_last_osd);
-
- if (osdc->timeout_tid)
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(timeout));
-
- mutex_unlock(&osdc->request_mutex);
-
- up_read(&osdc->map_sem);
-}
-
/*
* called under osdc->request_mutex
*/
BASE_DELAY_INTERVAL);
}
+/*
+ * Timeout callback, called every N seconds when 1 or more osd
+ * requests has been active for more than N seconds. When this
+ * happens, we ping all OSDs with requests who have timed out to
+ * ensure any communications channel reset is detected. Reset the
+ * request timeouts another N seconds in the future as we go.
+ * Reschedule the timeout event another N seconds in future (unless
+ * there are no open requests).
+ */
+static void handle_timeout(struct work_struct *work)
+{
+ struct ceph_osd_client *osdc =
+ container_of(work, struct ceph_osd_client, timeout_work.work);
+ struct ceph_osd_request *req;
+ unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
+ unsigned long next_timeout = timeout + jiffies;
+ RADIX_TREE(pings, GFP_NOFS); /* only send 1 ping per osd */
+ struct rb_node *p;
+
+ dout("timeout\n");
+ down_read(&osdc->map_sem);
+
+ ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
+
+ mutex_lock(&osdc->request_mutex);
+ for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+ req = rb_entry(p, struct ceph_osd_request, r_node);
+
+ if (req->r_resend) {
+ int err;
+
+ dout("osdc resending prev failed %lld\n", req->r_tid);
+ err = send_request(osdc, req);
+ if (err)
+ dout("osdc failed again on %lld\n", req->r_tid);
+ else
+ req->r_resend = false;
+ continue;
+ }
+
+ if (time_before(jiffies, req->r_timeout_stamp))
+ continue;
+
+ req->r_timeout_stamp = next_timeout;
+ if (req->r_last_osd >= 0 &&
+ radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
+ struct ceph_entity_name n = {
+ .type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
+ .num = cpu_to_le32(req->r_last_osd)
+ };
+ dout(" tid %llu (at least) timed out on osd%d\n",
+ req->r_tid, req->r_last_osd);
+ radix_tree_insert(&pings, req->r_last_osd, req);
+ ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
+ }
+ }
+
+ while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
+ radix_tree_delete(&pings, req->r_last_osd);
+
+ if (osdc->timeout_tid)
+ schedule_delayed_work(&osdc->timeout_work,
+ round_jiffies_relative(timeout));
+
+ mutex_unlock(&osdc->request_mutex);
+
+ up_read(&osdc->map_sem);
+}
+
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
struct ceph_osd_request *req;
struct rb_node *p;
int needmap = 0;
+ int err;
mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
req = rb_entry(p, struct ceph_osd_request, r_node);
+ if (req->r_resend)
+ goto kick;
if (who && ceph_entity_addr_equal(who, &req->r_last_osd_addr))
goto kick;
req->r_request = ceph_msg_maybe_dup(req->r_request);
if (!req->r_aborted) {
req->r_flags |= CEPH_OSD_FLAG_RETRY;
- send_request(osdc, req);
+ err = send_request(osdc, req);
+ if (err)
+ req->r_resend = true;
}
ceph_osdc_put_request(req);
mutex_lock(&osdc->request_mutex);
* Register request, send initial attempt.
*/
int ceph_osdc_start_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+ struct ceph_osd_request *req,
+ bool nofail)
{
int rc;
down_read(&osdc->map_sem);
rc = send_request(osdc, req);
up_read(&osdc->map_sem);
+ if (rc) {
+ if (nofail) {
+ dout("osdc_start_request failed send, marking %lld\n",
+ req->r_tid);
+ req->r_resend = true;
+ rc = 0;
+ } else {
+ mutex_lock(&osdc->request_mutex);
+ __unregister_request(osdc, req);
+ mutex_unlock(&osdc->request_mutex);
+ }
+ }
return rc;
}
dout("readpages final extent is %llu~%llu (%d pages)\n",
off, len, req->r_num_pages);
- rc = ceph_osdc_start_request(osdc, req);
+ rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
u32 truncate_seq, u64 truncate_size,
struct timespec *mtime,
struct page **pages, int num_pages,
- int flags, int do_sync)
+ int flags, int do_sync, bool nofail)
{
struct ceph_osd_request *req;
int rc = 0;
CEPH_OSD_FLAG_WRITE,
snapc, do_sync,
truncate_seq, truncate_size, mtime,
- true);
+ nofail);
if (IS_ERR(req))
return PTR_ERR(req);
dout("writepages %llu~%llu (%d pages)\n", off, len,
req->r_num_pages);
- rc = ceph_osdc_start_request(osdc, req);
+ rc = ceph_osdc_start_request(osdc, req, nofail);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);