From f7e20e206ec5aa1666de2efc515483453aae6625 Mon Sep 17 00:00:00 2001 From: David Howells Date: Thu, 20 Jan 2022 12:08:33 +0000 Subject: [PATCH] ceph: Pass an iter down from writepages --- fs/ceph/addr.c | 870 ++++++++++++++++++++++++------------------------- 1 file changed, 425 insertions(+), 445 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 78908c16ccbe4..6dd8bfba0673e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -481,33 +481,6 @@ get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl, return snapc; } -static u64 get_writepages_data_length(struct inode *inode, - struct page *page, u64 start) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_snap_context *snapc = page_snap_context(page); - struct ceph_cap_snap *capsnap = NULL; - u64 end = i_size_read(inode); - - if (snapc != ci->i_head_snapc) { - bool found = false; - spin_lock(&ci->i_ceph_lock); - list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { - if (capsnap->context == snapc) { - if (!capsnap->writing) - end = capsnap->size; - found = true; - break; - } - } - spin_unlock(&ci->i_ceph_lock); - WARN_ON(!found); - } - if (end > page_offset(page) + thp_size(page)) - end = page_offset(page) + thp_size(page); - return end > start ? end - start : 0; -} - /* * Write a single page, but leave the page locked. * @@ -659,6 +632,50 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc) return err; } +/* + * completion of write to server + */ +static void ceph_pages_written_back(struct inode *inode, + struct ceph_fs_client *fsc, + loff_t start, unsigned int len, + bool lost_cap) +{ + struct address_space *mapping = inode->i_mapping; + struct folio *folio; + pgoff_t end; + + XA_STATE(xas, &mapping->i_pages, start / PAGE_SIZE); + + printk("written_back %x @%llx\n", len, start); + + rcu_read_lock(); + + end = (start + len - 1) / PAGE_SIZE; + xas_for_each(&xas, folio, end) { + if (!folio_test_writeback(folio)) { + printk("bad %x @%llx page %lx %lx\n", + len, start, folio_index(folio), end); + BUG(); + } + + if (atomic_long_dec_return(&fsc->writeback_count) < + CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) + fsc->write_congested = false; + + ceph_put_snap_context(folio_detach_private(folio)); + folio_end_writeback(folio); + + if (lost_cap) { + xas_pause(&xas); + generic_error_remove_page(inode->i_mapping, folio_page(folio, 0)); + } + + folio_unlock(folio); + } + + rcu_read_unlock(); +} + /* * async writeback completion handler. * @@ -670,17 +687,17 @@ static void writepages_finish(struct ceph_osd_request *req) struct inode *inode = req->r_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_data *osd_data; - struct page *page; - int num_pages, total_pages = 0; - int i, j; - int rc = req->r_result; struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + loff_t start = req->r_data_offset; unsigned int len = 0; - bool remove_page; + bool lost_cap; + int num_pages, total_pages = 0; + int i; + int rc = req->r_result; - dout("writepages_finish %p rc %d\n", inode, rc); + printk("writepages_finish %p rc %d\n", inode, rc); if (rc < 0) { mapping_set_error(mapping, rc); ceph_set_error_write(ci); @@ -696,8 +713,8 @@ static void writepages_finish(struct ceph_osd_request *req) * page truncation thread, possibly losing some data that * raced its way in */ - remove_page = !(ceph_caps_issued(ci) & - (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); + lost_cap = !(ceph_caps_issued(ci) & + (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); /* clean all pages */ for (i = 0; i < req->r_num_ops; i++) { @@ -708,35 +725,14 @@ static void writepages_finish(struct ceph_osd_request *req) } osd_data = osd_req_op_extent_osd_data(req, i); - BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); - len += osd_data->length; - num_pages = calc_pages_for((u64)osd_data->alignment, - (u64)osd_data->length); + num_pages = calc_pages_for(osd_data->alignment, + osd_data->length); total_pages += num_pages; - for (j = 0; j < num_pages; j++) { - page = osd_data->pages[j]; - BUG_ON(!page); - WARN_ON(!PageUptodate(page)); - - if (atomic_long_dec_return(&fsc->writeback_count) < - CONGESTION_OFF_THRESH( - fsc->mount_options->congestion_kb)) - fsc->write_congested = false; - - ceph_put_snap_context(detach_page_private(page)); - end_page_writeback(page); - dout("unlocking %p\n", page); - - if (remove_page) - generic_error_remove_page(inode->i_mapping, - page); - - unlock_page(page); - } + ceph_pages_written_back(inode, fsc, start, osd_data->length, lost_cap); dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n", - inode, osd_data->length, rc >= 0 ? num_pages : 0); - - release_pages(osd_data->pages, num_pages); + inode, osd_data->length, num_pages); + start += osd_data->length; + len += osd_data->length; } ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, @@ -745,448 +741,432 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); osd_data = osd_req_op_extent_osd_data(req, 0); - if (osd_data->pages_from_pool) - mempool_free(osd_data->pages, ceph_wb_pagevec_pool); - else - kfree(osd_data->pages); ceph_osdc_put_request(req); } /* - * initiate async writeback + * Extend the region to be written back to include subsequent contiguously + * dirty pages if possible, but don't sleep while doing so. + * + * If this page holds new content, then we can include filler zeros in the + * writeback. */ -static int ceph_writepages_start(struct address_space *mapping, - struct writeback_control *wbc) +static void ceph_extend_writeback(struct address_space *mapping, + struct ceph_snap_context *snapc, + long *_count, + loff_t start, + loff_t max_len, + bool caching, + u64 *_len) { - struct inode *inode = mapping->host; - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_vino vino = ceph_vino(inode); - pgoff_t index, start_index, end = -1; - struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; struct pagevec pvec; - int rc = 0; - unsigned int wsize = i_blocksize(inode); - struct ceph_osd_request *req = NULL; - struct ceph_writeback_ctl ceph_wbc; - bool should_loop, range_whole = false; - bool done = false; - bool caching = ceph_is_cache_enabled(inode); - - if (wbc->sync_mode == WB_SYNC_NONE && - fsc->write_congested) - return 0; - - dout("writepages_start %p (mode=%s)\n", inode, - wbc->sync_mode == WB_SYNC_NONE ? "NONE" : - (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); - - if (ceph_inode_is_shutdown(inode)) { - if (ci->i_wrbuffer_ref > 0) { - pr_warn_ratelimited( - "writepage_start %p %lld forced umount\n", - inode, ceph_ino(inode)); - } - mapping_set_error(mapping, -EIO); - return -EIO; /* we're in a forced umount, don't write! */ - } - if (fsc->mount_options->wsize < wsize) - wsize = fsc->mount_options->wsize; + struct folio *folio; + loff_t len = *_len; + pgoff_t index = (start + len) / PAGE_SIZE; + bool stop = true; + unsigned int i; + XA_STATE(xas, &mapping->i_pages, index); pagevec_init(&pvec); - start_index = wbc->range_cyclic ? mapping->writeback_index : 0; - index = start_index; - -retry: - /* find oldest snap context with dirty data */ - snapc = get_oldest_context(inode, &ceph_wbc, NULL); - if (!snapc) { - /* hmm, why does writepages get called when there - is no dirty data? */ - dout(" no snap context with dirty data?\n"); - goto out; - } - dout(" oldest snapc is %p seq %lld (%d snaps)\n", - snapc, snapc->seq, snapc->num_snaps); - - should_loop = false; - if (ceph_wbc.head_snapc && snapc != last_snapc) { - /* where to start/end? */ - if (wbc->range_cyclic) { - index = start_index; - end = -1; - if (index > 0) - should_loop = true; - dout(" cyclic, start at %lu\n", index); - } else { - index = wbc->range_start >> PAGE_SHIFT; - end = wbc->range_end >> PAGE_SHIFT; - if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) - range_whole = true; - dout(" not cyclic, %lu to %lu\n", index, end); - } - } else if (!ceph_wbc.head_snapc) { - /* Do not respect wbc->range_{start,end}. Dirty pages - * in that range can be associated with newer snapc. - * They are not writeable until we write all dirty pages - * associated with 'snapc' get written */ - if (index > 0) - should_loop = true; - dout(" non-head snapc, range whole\n"); - } - - ceph_put_snap_context(last_snapc); - last_snapc = snapc; - - while (!done && index <= end) { - int num_ops = 0, op_idx; - unsigned i, pvec_pages, max_pages, locked_pages = 0; - struct page **pages = NULL, **data_pages; - struct page *page; - pgoff_t strip_unit_end = 0; - u64 offset = 0, len = 0; - bool from_pool = false; - - max_pages = wsize >> PAGE_SHIFT; + do { + /* Firstly, we gather up a batch of contiguous dirty pages + * under the RCU read lock - but we can't clear the dirty flags + * there if any of those pages are mapped. + */ + rcu_read_lock(); -get_more_pages: - pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index, - end, PAGECACHE_TAG_DIRTY); - dout("pagevec_lookup_range_tag got %d\n", pvec_pages); - if (!pvec_pages && !locked_pages) - break; - for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) { - page = pvec.pages[i]; - dout("? %p idx %lu\n", page, page->index); - if (locked_pages == 0) - lock_page(page); /* first page */ - else if (!trylock_page(page)) + xas_for_each(&xas, folio, ULONG_MAX) { + stop = true; + if (xas_retry(&xas, folio)) + continue; + if (xa_is_value(folio)) + break; + if (folio_index(folio) != index || + folio_get_private(folio) != snapc) break; - /* only dirty pages, or our accounting breaks */ - if (unlikely(!PageDirty(page)) || - unlikely(page->mapping != mapping)) { - dout("!dirty or !mapping %p\n", page); - unlock_page(page); - continue; - } - /* only if matching snap context */ - pgsnapc = page_snap_context(page); - if (pgsnapc != snapc) { - dout("page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, snapc, snapc->seq); - if (!should_loop && - !ceph_wbc.head_snapc && - wbc->sync_mode != WB_SYNC_NONE) - should_loop = true; - unlock_page(page); + if (!folio_try_get_rcu(folio)) { + xas_reset(&xas); continue; } - if (page_offset(page) >= ceph_wbc.i_size) { - struct folio *folio = page_folio(page); - - dout("folio at %lu beyond eof %llu\n", - folio->index, ceph_wbc.i_size); - if ((ceph_wbc.size_stable || - folio_pos(folio) >= i_size_read(inode)) && - folio_clear_dirty_for_io(folio)) - folio_invalidate(folio, 0, - folio_size(folio)); - folio_unlock(folio); - continue; + + /* Has the page moved or been split? */ + if (unlikely(folio != xas_reload(&xas))) { + folio_put(folio); + break; } - if (strip_unit_end && (page->index > strip_unit_end)) { - dout("end of strip unit %p\n", page); - unlock_page(page); + + if (!folio_trylock(folio)) { + folio_put(folio); break; } - if (PageWriteback(page) || PageFsCache(page)) { - if (wbc->sync_mode == WB_SYNC_NONE) { - dout("%p under writeback\n", page); - unlock_page(page); - continue; - } - dout("waiting on writeback %p\n", page); - wait_on_page_writeback(page); - wait_on_page_fscache(page); + if (folio_get_private(folio) != snapc || + !folio_test_dirty(folio) || + folio_test_writeback(folio) || + folio_test_fscache(folio)) { + folio_unlock(folio); + folio_put(folio); + break; } - if (!clear_page_dirty_for_io(page)) { - dout("%p !clear_page_dirty_for_io\n", page); - unlock_page(page); - continue; - } + index += folio_nr_pages(folio); + if (!pagevec_add(&pvec, &folio->page)) + break; + if (stop) + break; + } - /* - * We have something to write. If this is - * the first locked page this time through, - * calculate max possinle write size and - * allocate a page array - */ - if (locked_pages == 0) { - u64 objnum; - u64 objoff; - u32 xlen; - - /* prepare async write request */ - offset = (u64)page_offset(page); - ceph_calc_file_object_mapping(&ci->i_layout, - offset, wsize, - &objnum, &objoff, - &xlen); - len = xlen; - - num_ops = 1; - strip_unit_end = page->index + - ((len - 1) >> PAGE_SHIFT); - - BUG_ON(pages); - max_pages = calc_pages_for(0, (u64)len); - pages = kmalloc_array(max_pages, - sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); - } - - len = 0; - } else if (page->index != - (offset + len) >> PAGE_SHIFT) { - if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS : - CEPH_OSD_MAX_OPS)) { - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } + if (!stop) + xas_pause(&xas); + rcu_read_unlock(); - num_ops++; - offset = (u64)page_offset(page); - len = 0; - } + /* Now, if we obtained any pages, we can shift them to being + * writable and mark them for caching. + */ + if (!pagevec_count(&pvec)) + break; - /* note position of first page in pvec */ - dout("%p will write page %p idx %lu\n", - inode, page, page->index); + for (i = 0; i < pagevec_count(&pvec); i++) { + folio = page_folio(pvec.pages[i]); + if (!folio_clear_dirty_for_io(folio)) + BUG(); + if (folio_start_writeback(folio)) + BUG(); + //ceph_folio_start_fscache(caching, folio); - if (atomic_long_inc_return(&fsc->writeback_count) > - CONGESTION_ON_THRESH( - fsc->mount_options->congestion_kb)) - fsc->write_congested = true; + *_count -= folio_nr_pages(folio); + folio_unlock(folio); + } - pages[locked_pages++] = page; - pvec.pages[i] = NULL; + pagevec_release(&pvec); + cond_resched(); + } while (!stop); - len += thp_size(page); - } + *_len = len; +} - /* did we get anything? */ - if (!locked_pages) - goto release_pvec_pages; - if (i) { - unsigned j, n = 0; - /* shift unused page to beginning of pvec */ - for (j = 0; j < pvec_pages; j++) { - if (!pvec.pages[j]) - continue; - if (n < j) - pvec.pages[n] = pvec.pages[j]; - n++; - } - pvec.nr = n; +/* + * Synchronously write back the locked page and any subsequent non-locked dirty + * pages. + */ +static ssize_t ceph_write_back_from_locked_folio(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_snap_context *snapc, + struct ceph_writeback_ctl *ceph_wbc, + struct folio *folio, + loff_t start, loff_t end) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_vino vino = ceph_vino(inode); + struct ceph_osd_request *req = NULL; + struct iov_iter iter; + unsigned int max_len; + loff_t i_size = i_size_read(inode); + bool caching = false; //fscache_cookie_enabled(ceph_fscache_cookie(ci)); + long count = wbc->nr_to_write; + u64 len, oplen; + int ret, num_ops, op_idx = 0; + + if (folio_start_writeback(folio)) + BUG(); + //ceph_folio_start_fscache(caching, folio); + + count -= folio_nr_pages(folio); + + /* Find all consecutive lockable dirty pages that have contiguous + * written regions, stopping when we find a page that is not + * immediately lockable, is not dirty or is missing, or we reach the + * end of the range. + */ + len = folio_size(folio); + if (start < i_size) { + /* Trim the write to the EOF; the extra data is ignored. Also + * put an upper limit on the size of a single storedata op. + */ + u64 objnum, objoff; - if (pvec_pages && i == pvec_pages && - locked_pages < max_pages) { - dout("reached end pvec, trying for more\n"); - pagevec_release(&pvec); - goto get_more_pages; - } - } + ceph_calc_file_object_mapping(&ci->i_layout, start, end - start + 1, + &objnum, &objoff, &max_len); + max_len = min_t(u64, max_len, i_size - start); + + if (len < max_len) + ceph_extend_writeback(mapping, snapc, &count, + start, max_len, caching, &len); + len = min_t(loff_t, len, max_len); + } + + /* We now have a contiguous set of dirty pages, each with writeback + * set; the first page is still locked at this point, but all the rest + * have been unlocked. + */ + folio_unlock(folio); -new_request: - offset = page_offset(pages[0]); - len = wsize; + /* Create a write request */ + if (start < i_size) { + printk("write back %llx @%llx [%llx]\n", len, start, i_size); + /* Speculatively write to the cache. We have to fix this up + * later if the store fails. + */ + //ceph_write_to_cache(vnode, start, len, i_size, caching); + oplen = len; + num_ops = 1; req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - offset, &len, 0, num_ops, - CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, false); + &ci->i_layout, vino, + start, &oplen, 0, num_ops, + CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, + snapc, ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, false); if (IS_ERR(req)) { req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - offset, &len, 0, - min(num_ops, - CEPH_OSD_SLAB_OPS), - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, true); + &ci->i_layout, vino, + start, &oplen, 0, + min(num_ops, CEPH_OSD_SLAB_OPS), + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE, + snapc, ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, true); BUG_ON(IS_ERR(req)); } - BUG_ON(len < page_offset(pages[locked_pages - 1]) + - thp_size(page) - offset); + BUG_ON(oplen < len); req->r_callback = writepages_finish; req->r_inode = inode; + req->r_mtime = inode->i_mtime; - /* Format the osd request message and submit the write */ - len = 0; - data_pages = pages; - op_idx = 0; - for (i = 0; i < locked_pages; i++) { - u64 cur_offset = page_offset(pages[i]); - /* - * Discontinuity in page range? Ceph can handle that by just passing - * multiple extents in the write op. - */ - if (offset + len != cur_offset) { - /* If it's full, stop here */ - if (op_idx + 1 == req->r_num_ops) - break; + iov_iter_xarray(&iter, WRITE, &mapping->i_pages, start, len); + osd_req_op_extent_osd_iter(req, op_idx, &iter); + osd_req_op_extent_update(req, op_idx, len); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, true); + BUG_ON(ret); + wbc->nr_to_write = count; + req = NULL; + } else { + printk("write discard %llx @%llx [%llx]\n", len, start, i_size); - /* Kick off an fscache write with what we have so far. */ - ceph_fscache_write_to_cache(inode, offset, len, caching); - - /* Start a new extent */ - osd_req_op_extent_dup_last(req, op_idx, - cur_offset - offset); - dout("writepages got pages at %llu~%llu\n", - offset, len); - osd_req_op_extent_osd_data_pages(req, op_idx, - data_pages, len, 0, - from_pool, false); - osd_req_op_extent_update(req, op_idx, len); - - len = 0; - offset = cur_offset; - data_pages = pages + i; - op_idx++; - } + /* The dirty region was entirely beyond the EOF. */ + fscache_clear_page_bits(mapping, start, len, caching); + ceph_pages_written_back(inode, fsc, start, len, false); + wbc->nr_to_write = count; + ret = 0; + } - set_page_writeback(pages[i]); - if (caching) - ceph_set_page_fscache(pages[i]); - len += thp_size(page); - } - ceph_fscache_write_to_cache(inode, offset, len, caching); - - if (ceph_wbc.size_stable) { - len = min(len, ceph_wbc.i_size - offset); - } else if (i == locked_pages) { - /* writepages_finish() clears writeback pages - * according to the data length, so make sure - * data length covers all locked pages */ - u64 min_len = len + 1 - thp_size(page); - len = get_writepages_data_length(inode, pages[i - 1], - offset); - len = max(len, min_len); - } - dout("writepages got pages at %llu~%llu\n", offset, len); + printk("%s() = %d\n", __func__, ret); + return ret; +} - osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, - 0, from_pool, false); - osd_req_op_extent_update(req, op_idx, len); +/* + * Scan for a dirty page in the specified snap. + */ +static struct folio *ceph_scan_for_writeable_page(struct address_space *mapping, + struct ceph_snap_context *snapc, + pgoff_t from, pgoff_t to) +{ + XA_STATE(xas, &mapping->i_pages, from); + struct folio *ret = NULL, *folio; + + rcu_read_lock(); + xas_for_each_marked(&xas, folio, to, PAGECACHE_TAG_DIRTY) { + if (xas_retry(&xas, folio)) + continue; + if (xa_is_value(folio) || folio->index > to) + break; + if (folio_get_private(folio) != snapc) + continue; + if (!folio_try_get_rcu(folio)) + goto retry; + if (unlikely(folio != xas_reload(&xas) || + folio_get_private(folio) != snapc)) + goto put_page; + ret = folio; + break; +put_page: + folio_put(folio); +retry: + xas_reset(&xas); + } + rcu_read_unlock(); + return ret; +} - BUG_ON(op_idx + 1 != req->r_num_ops); - - from_pool = false; - if (i < locked_pages) { - BUG_ON(num_ops <= req->r_num_ops); - num_ops -= req->r_num_ops; - locked_pages -= i; - - /* allocate new pages array for next request */ - data_pages = pages; - pages = kmalloc_array(locked_pages, sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); +/* + * write a region of pages back to the server + */ +static int ceph_writepages_region(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_snap_context *snapc, + struct ceph_writeback_ctl *ceph_wbc, + loff_t start, loff_t end, loff_t *_next) +{ + struct folio *folio; + ssize_t ret; + int skips = 0; + + printk("%s(%llx,%llx)", __func__, start, end); + + do { + pgoff_t index = start / PAGE_SIZE; + pgoff_t pend = end / PAGE_SIZE; + + folio = ceph_scan_for_writeable_page(mapping, snapc, index, pend); + if (!folio) + break; + + start = folio_pos(folio); /* May regress with THPs */ + + printk("wback %lx\n", folio_index(folio)); + + /* At this point we hold neither the i_pages lock nor the + * page lock: the page may be truncated or invalidated + * (changing page->mapping to NULL), or even swizzled + * back from swapper_space to tmpfs file mapping + */ + if (wbc->sync_mode != WB_SYNC_NONE) { + ret = folio_lock_killable(folio); + if (ret < 0) { + folio_put(folio); + return ret; } - memcpy(pages, data_pages + i, - locked_pages * sizeof(*pages)); - memset(data_pages + i, 0, - locked_pages * sizeof(*pages)); } else { - BUG_ON(num_ops != req->r_num_ops); - index = pages[i - 1]->index + 1; - /* request message now owns the pages array */ - pages = NULL; + if (!folio_trylock(folio)) { + folio_put(folio); + return 0; + } } - req->r_mtime = inode->i_mtime; - rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); - BUG_ON(rc); - req = NULL; + if (folio_mapping(folio) != mapping || + folio_get_private(folio) != snapc || + !folio_test_dirty(folio)) { + start += folio_size(folio); + folio_unlock(folio); + folio_put(folio); + continue; + } - wbc->nr_to_write -= i; - if (pages) - goto new_request; + if (folio_test_writeback(folio) || + folio_test_fscache(folio)) { + folio_unlock(folio); + if (wbc->sync_mode != WB_SYNC_NONE) { + folio_wait_writeback(folio); + folio_wait_fscache(folio); + } else { + start += folio_size(folio); + } + folio_put(folio); + if (wbc->sync_mode == WB_SYNC_NONE) { + if (skips >= 5 || need_resched()) + break; + skips++; + } + continue; + } - /* - * We stop writing back only if we are not doing - * integrity sync. In case of integrity sync we have to - * keep going until we have written all the pages - * we tagged for writeback prior to entering this loop. - */ - if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) - done = true; + if (!folio_clear_dirty_for_io(folio)) + BUG(); + ret = ceph_write_back_from_locked_folio(mapping, wbc, snapc, ceph_wbc, + folio, start, pend); + folio_put(folio); + if (ret < 0) + return ret; -release_pvec_pages: - dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr, - pvec.nr ? pvec.pages[0] : NULL); - pagevec_release(&pvec); - } + start += ret; - if (should_loop && !done) { - /* more to do; loop back to beginning of file */ - dout("writepages looping back to beginning of file\n"); - end = start_index - 1; /* OK even when start_index == 0 */ - - /* to write dirty pages associated with next snapc, - * we need to wait until current writes complete */ - if (wbc->sync_mode != WB_SYNC_NONE && - start_index == 0 && /* all dirty pages were checked */ - !ceph_wbc.head_snapc) { - struct page *page; - unsigned i, nr; - index = 0; - while ((index <= end) && - (nr = pagevec_lookup_tag(&pvec, mapping, &index, - PAGECACHE_TAG_WRITEBACK))) { - for (i = 0; i < nr; i++) { - page = pvec.pages[i]; - if (page_snap_context(page) != snapc) - continue; - wait_on_page_writeback(page); - } - pagevec_release(&pvec); - cond_resched(); - } + cond_resched(); + } while (wbc->nr_to_write > 0); + + *_next = start; + printk("%s() = 0 [%llx]\n", __func__, *_next); + return 0; +} + +/* + * write some of the pending data back to the server + */ +static int ceph_writepages(struct address_space *mapping, + struct writeback_control *wbc) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc = NULL, *last_snapc = NULL; + struct ceph_writeback_ctl ceph_wbc; + unsigned int wsize = i_blocksize(inode); + loff_t start, next; + int ret; + + printk("writepages_start %p (mode=%s)\n", inode, + wbc->sync_mode == WB_SYNC_NONE ? "NONE" : + (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); + + if (ceph_inode_is_shutdown(inode)) { + if (ci->i_wrbuffer_ref > 0) { + pr_warn_ratelimited( + "writepage_start %p %lld forced umount\n", + inode, ceph_ino(inode)); } + mapping_set_error(mapping, -EIO); + return -EIO; /* we're in a forced umount, don't write! */ + } + if (fsc->mount_options->wsize < wsize) + wsize = fsc->mount_options->wsize; - start_index = 0; - index = 0; - goto retry; + /* find oldest snap context with dirty data */ + snapc = get_oldest_context(inode, &ceph_wbc, NULL); + if (!snapc) { + /* hmm, why does writepages get called when there + is no dirty data? */ + printk(" no snap context with dirty data?\n"); + goto out; } + printk(" oldest snapc is %p seq %lld (%d snaps)\n", + snapc, snapc->seq, snapc->num_snaps); - if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) - mapping->writeback_index = index; + /* Decide whether/how to use the range specification we were given. */ + if (ceph_wbc.head_snapc && snapc != last_snapc) { + if (wbc->range_cyclic) { + start = mapping->writeback_index * PAGE_SIZE; + ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc, + start, LLONG_MAX, &next); + if (ret != 0) + goto out; + + mapping->writeback_index = next / PAGE_SIZE; + if (start == 0 || wbc->nr_to_write <= 0) + goto out; + + ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc, + 0, start, &next); + if (ret == 0) + mapping->writeback_index = next / PAGE_SIZE; + } else if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) { + ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc, + 0, LLONG_MAX, &next); + if (wbc->nr_to_write > 0 && ret == 0) + mapping->writeback_index = next / PAGE_SIZE; + } else { + ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc, + wbc->range_start, wbc->range_end, + &next); + } + } else if (!ceph_wbc.head_snapc) { + /* Do not respect wbc->range_{start,end}. Dirty pages + * in that range can be associated with newer snapc. + * They are not writeable until we write all dirty pages + * associated with 'snapc' get written */ + printk(" non-head snapc, range whole\n"); + ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc, + 0, LLONG_MAX, &next); + } out: - ceph_osdc_put_request(req); ceph_put_snap_context(last_snapc); - dout("writepages dend - startone, rc = %d\n", rc); - return rc; + last_snapc = snapc; + return ret; } - - /* * See if a given @snapc is either writeable, or already written. */ @@ -1350,7 +1330,7 @@ const struct address_space_operations ceph_aops = { .readpage = netfs_readpage, .readahead = netfs_readahead, .writepage = ceph_writepage, - .writepages = ceph_writepages_start, + .writepages = ceph_writepages, .write_begin = ceph_write_begin, .write_end = ceph_write_end, .dirty_folio = ceph_dirty_folio, -- 2.39.5