]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-client.git/commitdiff
ceph: Pass an iter down from writepages libceph-iter-experimental
authorDavid Howells <dhowells@redhat.com>
Thu, 20 Jan 2022 12:08:33 +0000 (12:08 +0000)
committerJeff Layton <jlayton@kernel.org>
Thu, 19 May 2022 20:44:16 +0000 (16:44 -0400)
fs/ceph/addr.c

index 78908c16ccbe4e822764e70c47d4aa92190defec..6dd8bfba0673e24fb10b7ab1221ed0c5c94a20bd 100644 (file)
@@ -481,33 +481,6 @@ get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
        return snapc;
 }
 
-static u64 get_writepages_data_length(struct inode *inode,
-                                     struct page *page, u64 start)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_snap_context *snapc = page_snap_context(page);
-       struct ceph_cap_snap *capsnap = NULL;
-       u64 end = i_size_read(inode);
-
-       if (snapc != ci->i_head_snapc) {
-               bool found = false;
-               spin_lock(&ci->i_ceph_lock);
-               list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
-                       if (capsnap->context == snapc) {
-                               if (!capsnap->writing)
-                                       end = capsnap->size;
-                               found = true;
-                               break;
-                       }
-               }
-               spin_unlock(&ci->i_ceph_lock);
-               WARN_ON(!found);
-       }
-       if (end > page_offset(page) + thp_size(page))
-               end = page_offset(page) + thp_size(page);
-       return end > start ? end - start : 0;
-}
-
 /*
  * Write a single page, but leave the page locked.
  *
@@ -659,6 +632,50 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
        return err;
 }
 
+/*
+ * completion of write to server
+ */
+static void ceph_pages_written_back(struct inode *inode,
+                                   struct ceph_fs_client *fsc,
+                                   loff_t start, unsigned int len,
+                                   bool lost_cap)
+{
+       struct address_space *mapping = inode->i_mapping;
+       struct folio *folio;
+       pgoff_t end;
+
+       XA_STATE(xas, &mapping->i_pages, start / PAGE_SIZE);
+
+       printk("written_back %x @%llx\n", len, start);
+
+       rcu_read_lock();
+
+       end = (start + len - 1) / PAGE_SIZE;
+       xas_for_each(&xas, folio, end) {
+               if (!folio_test_writeback(folio)) {
+                       printk("bad %x @%llx page %lx %lx\n",
+                              len, start, folio_index(folio), end);
+                       BUG();
+               }
+
+               if (atomic_long_dec_return(&fsc->writeback_count) <
+                   CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
+                       fsc->write_congested = false;
+
+               ceph_put_snap_context(folio_detach_private(folio));
+               folio_end_writeback(folio);
+
+               if (lost_cap) {
+                       xas_pause(&xas);
+                       generic_error_remove_page(inode->i_mapping, folio_page(folio, 0));
+               }
+
+               folio_unlock(folio);
+       }
+
+       rcu_read_unlock();
+}
+
 /*
  * async writeback completion handler.
  *
@@ -670,17 +687,17 @@ static void writepages_finish(struct ceph_osd_request *req)
        struct inode *inode = req->r_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_data *osd_data;
-       struct page *page;
-       int num_pages, total_pages = 0;
-       int i, j;
-       int rc = req->r_result;
        struct ceph_snap_context *snapc = req->r_snapc;
        struct address_space *mapping = inode->i_mapping;
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       loff_t start = req->r_data_offset;
        unsigned int len = 0;
-       bool remove_page;
+       bool lost_cap;
+       int num_pages, total_pages = 0;
+       int i;
+       int rc = req->r_result;
 
-       dout("writepages_finish %p rc %d\n", inode, rc);
+       printk("writepages_finish %p rc %d\n", inode, rc);
        if (rc < 0) {
                mapping_set_error(mapping, rc);
                ceph_set_error_write(ci);
@@ -696,8 +713,8 @@ static void writepages_finish(struct ceph_osd_request *req)
         * page truncation thread, possibly losing some data that
         * raced its way in
         */
-       remove_page = !(ceph_caps_issued(ci) &
-                       (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+       lost_cap = !(ceph_caps_issued(ci) &
+                    (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
 
        /* clean all pages */
        for (i = 0; i < req->r_num_ops; i++) {
@@ -708,35 +725,14 @@ static void writepages_finish(struct ceph_osd_request *req)
                }
 
                osd_data = osd_req_op_extent_osd_data(req, i);
-               BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-               len += osd_data->length;
-               num_pages = calc_pages_for((u64)osd_data->alignment,
-                                          (u64)osd_data->length);
+               num_pages = calc_pages_for(osd_data->alignment,
+                                          osd_data->length);
                total_pages += num_pages;
-               for (j = 0; j < num_pages; j++) {
-                       page = osd_data->pages[j];
-                       BUG_ON(!page);
-                       WARN_ON(!PageUptodate(page));
-
-                       if (atomic_long_dec_return(&fsc->writeback_count) <
-                            CONGESTION_OFF_THRESH(
-                                       fsc->mount_options->congestion_kb))
-                               fsc->write_congested = false;
-
-                       ceph_put_snap_context(detach_page_private(page));
-                       end_page_writeback(page);
-                       dout("unlocking %p\n", page);
-
-                       if (remove_page)
-                               generic_error_remove_page(inode->i_mapping,
-                                                         page);
-
-                       unlock_page(page);
-               }
+               ceph_pages_written_back(inode, fsc, start, osd_data->length, lost_cap);
                dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
-                    inode, osd_data->length, rc >= 0 ? num_pages : 0);
-
-               release_pages(osd_data->pages, num_pages);
+                    inode, osd_data->length, num_pages);
+               start += osd_data->length;
+               len += osd_data->length;
        }
 
        ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
@@ -745,448 +741,432 @@ static void writepages_finish(struct ceph_osd_request *req)
        ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
 
        osd_data = osd_req_op_extent_osd_data(req, 0);
-       if (osd_data->pages_from_pool)
-               mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
-       else
-               kfree(osd_data->pages);
        ceph_osdc_put_request(req);
 }
 
 /*
- * initiate async writeback
+ * Extend the region to be written back to include subsequent contiguously
+ * dirty pages if possible, but don't sleep while doing so.
+ *
+ * If this page holds new content, then we can include filler zeros in the
+ * writeback.
  */
-static int ceph_writepages_start(struct address_space *mapping,
-                                struct writeback_control *wbc)
+static void ceph_extend_writeback(struct address_space *mapping,
+                                 struct ceph_snap_context *snapc,
+                                 long *_count,
+                                 loff_t start,
+                                 loff_t max_len,
+                                 bool caching,
+                                 u64 *_len)
 {
-       struct inode *inode = mapping->host;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_vino vino = ceph_vino(inode);
-       pgoff_t index, start_index, end = -1;
-       struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
        struct pagevec pvec;
-       int rc = 0;
-       unsigned int wsize = i_blocksize(inode);
-       struct ceph_osd_request *req = NULL;
-       struct ceph_writeback_ctl ceph_wbc;
-       bool should_loop, range_whole = false;
-       bool done = false;
-       bool caching = ceph_is_cache_enabled(inode);
-
-       if (wbc->sync_mode == WB_SYNC_NONE &&
-           fsc->write_congested)
-               return 0;
-
-       dout("writepages_start %p (mode=%s)\n", inode,
-            wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
-            (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
-
-       if (ceph_inode_is_shutdown(inode)) {
-               if (ci->i_wrbuffer_ref > 0) {
-                       pr_warn_ratelimited(
-                               "writepage_start %p %lld forced umount\n",
-                               inode, ceph_ino(inode));
-               }
-               mapping_set_error(mapping, -EIO);
-               return -EIO; /* we're in a forced umount, don't write! */
-       }
-       if (fsc->mount_options->wsize < wsize)
-               wsize = fsc->mount_options->wsize;
+       struct folio *folio;
+       loff_t len = *_len;
+       pgoff_t index = (start + len) / PAGE_SIZE;
+       bool stop = true;
+       unsigned int i;
 
+       XA_STATE(xas, &mapping->i_pages, index);
        pagevec_init(&pvec);
 
-       start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
-       index = start_index;
-
-retry:
-       /* find oldest snap context with dirty data */
-       snapc = get_oldest_context(inode, &ceph_wbc, NULL);
-       if (!snapc) {
-               /* hmm, why does writepages get called when there
-                  is no dirty data? */
-               dout(" no snap context with dirty data?\n");
-               goto out;
-       }
-       dout(" oldest snapc is %p seq %lld (%d snaps)\n",
-            snapc, snapc->seq, snapc->num_snaps);
-
-       should_loop = false;
-       if (ceph_wbc.head_snapc && snapc != last_snapc) {
-               /* where to start/end? */
-               if (wbc->range_cyclic) {
-                       index = start_index;
-                       end = -1;
-                       if (index > 0)
-                               should_loop = true;
-                       dout(" cyclic, start at %lu\n", index);
-               } else {
-                       index = wbc->range_start >> PAGE_SHIFT;
-                       end = wbc->range_end >> PAGE_SHIFT;
-                       if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
-                               range_whole = true;
-                       dout(" not cyclic, %lu to %lu\n", index, end);
-               }
-       } else if (!ceph_wbc.head_snapc) {
-               /* Do not respect wbc->range_{start,end}. Dirty pages
-                * in that range can be associated with newer snapc.
-                * They are not writeable until we write all dirty pages
-                * associated with 'snapc' get written */
-               if (index > 0)
-                       should_loop = true;
-               dout(" non-head snapc, range whole\n");
-       }
-
-       ceph_put_snap_context(last_snapc);
-       last_snapc = snapc;
-
-       while (!done && index <= end) {
-               int num_ops = 0, op_idx;
-               unsigned i, pvec_pages, max_pages, locked_pages = 0;
-               struct page **pages = NULL, **data_pages;
-               struct page *page;
-               pgoff_t strip_unit_end = 0;
-               u64 offset = 0, len = 0;
-               bool from_pool = false;
-
-               max_pages = wsize >> PAGE_SHIFT;
+       do {
+               /* Firstly, we gather up a batch of contiguous dirty pages
+                * under the RCU read lock - but we can't clear the dirty flags
+                * there if any of those pages are mapped.
+                */
+               rcu_read_lock();
 
-get_more_pages:
-               pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
-                                               end, PAGECACHE_TAG_DIRTY);
-               dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
-               if (!pvec_pages && !locked_pages)
-                       break;
-               for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
-                       page = pvec.pages[i];
-                       dout("? %p idx %lu\n", page, page->index);
-                       if (locked_pages == 0)
-                               lock_page(page);  /* first page */
-                       else if (!trylock_page(page))
+               xas_for_each(&xas, folio, ULONG_MAX) {
+                       stop = true;
+                       if (xas_retry(&xas, folio))
+                               continue;
+                       if (xa_is_value(folio))
+                               break;
+                       if (folio_index(folio) != index ||
+                           folio_get_private(folio) != snapc)
                                break;
 
-                       /* only dirty pages, or our accounting breaks */
-                       if (unlikely(!PageDirty(page)) ||
-                           unlikely(page->mapping != mapping)) {
-                               dout("!dirty or !mapping %p\n", page);
-                               unlock_page(page);
-                               continue;
-                       }
-                       /* only if matching snap context */
-                       pgsnapc = page_snap_context(page);
-                       if (pgsnapc != snapc) {
-                               dout("page snapc %p %lld != oldest %p %lld\n",
-                                    pgsnapc, pgsnapc->seq, snapc, snapc->seq);
-                               if (!should_loop &&
-                                   !ceph_wbc.head_snapc &&
-                                   wbc->sync_mode != WB_SYNC_NONE)
-                                       should_loop = true;
-                               unlock_page(page);
+                       if (!folio_try_get_rcu(folio)) {
+                               xas_reset(&xas);
                                continue;
                        }
-                       if (page_offset(page) >= ceph_wbc.i_size) {
-                               struct folio *folio = page_folio(page);
-
-                               dout("folio at %lu beyond eof %llu\n",
-                                    folio->index, ceph_wbc.i_size);
-                               if ((ceph_wbc.size_stable ||
-                                   folio_pos(folio) >= i_size_read(inode)) &&
-                                   folio_clear_dirty_for_io(folio))
-                                       folio_invalidate(folio, 0,
-                                                       folio_size(folio));
-                               folio_unlock(folio);
-                               continue;
+
+                       /* Has the page moved or been split? */
+                       if (unlikely(folio != xas_reload(&xas))) {
+                               folio_put(folio);
+                               break;
                        }
-                       if (strip_unit_end && (page->index > strip_unit_end)) {
-                               dout("end of strip unit %p\n", page);
-                               unlock_page(page);
+
+                       if (!folio_trylock(folio)) {
+                               folio_put(folio);
                                break;
                        }
-                       if (PageWriteback(page) || PageFsCache(page)) {
-                               if (wbc->sync_mode == WB_SYNC_NONE) {
-                                       dout("%p under writeback\n", page);
-                                       unlock_page(page);
-                                       continue;
-                               }
-                               dout("waiting on writeback %p\n", page);
-                               wait_on_page_writeback(page);
-                               wait_on_page_fscache(page);
+                       if (folio_get_private(folio) != snapc ||
+                           !folio_test_dirty(folio) ||
+                           folio_test_writeback(folio) ||
+                           folio_test_fscache(folio)) {
+                               folio_unlock(folio);
+                               folio_put(folio);
+                               break;
                        }
 
-                       if (!clear_page_dirty_for_io(page)) {
-                               dout("%p !clear_page_dirty_for_io\n", page);
-                               unlock_page(page);
-                               continue;
-                       }
+                       index += folio_nr_pages(folio);
+                       if (!pagevec_add(&pvec, &folio->page))
+                               break;
+                       if (stop)
+                               break;
+               }
 
-                       /*
-                        * We have something to write.  If this is
-                        * the first locked page this time through,
-                        * calculate max possinle write size and
-                        * allocate a page array
-                        */
-                       if (locked_pages == 0) {
-                               u64 objnum;
-                               u64 objoff;
-                               u32 xlen;
-
-                               /* prepare async write request */
-                               offset = (u64)page_offset(page);
-                               ceph_calc_file_object_mapping(&ci->i_layout,
-                                                             offset, wsize,
-                                                             &objnum, &objoff,
-                                                             &xlen);
-                               len = xlen;
-
-                               num_ops = 1;
-                               strip_unit_end = page->index +
-                                       ((len - 1) >> PAGE_SHIFT);
-
-                               BUG_ON(pages);
-                               max_pages = calc_pages_for(0, (u64)len);
-                               pages = kmalloc_array(max_pages,
-                                                     sizeof(*pages),
-                                                     GFP_NOFS);
-                               if (!pages) {
-                                       from_pool = true;
-                                       pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
-                                       BUG_ON(!pages);
-                               }
-
-                               len = 0;
-                       } else if (page->index !=
-                                  (offset + len) >> PAGE_SHIFT) {
-                               if (num_ops >= (from_pool ?  CEPH_OSD_SLAB_OPS :
-                                                            CEPH_OSD_MAX_OPS)) {
-                                       redirty_page_for_writepage(wbc, page);
-                                       unlock_page(page);
-                                       break;
-                               }
+               if (!stop)
+                       xas_pause(&xas);
+               rcu_read_unlock();
 
-                               num_ops++;
-                               offset = (u64)page_offset(page);
-                               len = 0;
-                       }
+               /* Now, if we obtained any pages, we can shift them to being
+                * writable and mark them for caching.
+                */
+               if (!pagevec_count(&pvec))
+                       break;
 
-                       /* note position of first page in pvec */
-                       dout("%p will write page %p idx %lu\n",
-                            inode, page, page->index);
+               for (i = 0; i < pagevec_count(&pvec); i++) {
+                       folio = page_folio(pvec.pages[i]);
+                       if (!folio_clear_dirty_for_io(folio))
+                               BUG();
+                       if (folio_start_writeback(folio))
+                               BUG();
+                       //ceph_folio_start_fscache(caching, folio);
 
-                       if (atomic_long_inc_return(&fsc->writeback_count) >
-                           CONGESTION_ON_THRESH(
-                                   fsc->mount_options->congestion_kb))
-                               fsc->write_congested = true;
+                       *_count -= folio_nr_pages(folio);
+                       folio_unlock(folio);
+               }
 
-                       pages[locked_pages++] = page;
-                       pvec.pages[i] = NULL;
+               pagevec_release(&pvec);
+               cond_resched();
+       } while (!stop);
 
-                       len += thp_size(page);
-               }
+       *_len = len;
+}
 
-               /* did we get anything? */
-               if (!locked_pages)
-                       goto release_pvec_pages;
-               if (i) {
-                       unsigned j, n = 0;
-                       /* shift unused page to beginning of pvec */
-                       for (j = 0; j < pvec_pages; j++) {
-                               if (!pvec.pages[j])
-                                       continue;
-                               if (n < j)
-                                       pvec.pages[n] = pvec.pages[j];
-                               n++;
-                       }
-                       pvec.nr = n;
+/*
+ * Synchronously write back the locked page and any subsequent non-locked dirty
+ * pages.
+ */
+static ssize_t ceph_write_back_from_locked_folio(struct address_space *mapping,
+                                                struct writeback_control *wbc,
+                                                struct ceph_snap_context *snapc,
+                                                struct ceph_writeback_ctl *ceph_wbc,
+                                                struct folio *folio,
+                                                loff_t start, loff_t end)
+{
+       struct inode *inode = mapping->host;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_vino vino = ceph_vino(inode);
+       struct ceph_osd_request *req = NULL;
+       struct iov_iter iter;
+       unsigned int max_len;
+       loff_t i_size = i_size_read(inode);
+       bool caching = false; //fscache_cookie_enabled(ceph_fscache_cookie(ci));
+       long count = wbc->nr_to_write;
+       u64 len, oplen;
+       int ret, num_ops, op_idx = 0;
+
+       if (folio_start_writeback(folio))
+               BUG();
+       //ceph_folio_start_fscache(caching, folio);
+
+       count -= folio_nr_pages(folio);
+
+       /* Find all consecutive lockable dirty pages that have contiguous
+        * written regions, stopping when we find a page that is not
+        * immediately lockable, is not dirty or is missing, or we reach the
+        * end of the range.
+        */
+       len = folio_size(folio);
+       if (start < i_size) {
+               /* Trim the write to the EOF; the extra data is ignored.  Also
+                * put an upper limit on the size of a single storedata op.
+                */
+               u64 objnum, objoff;
 
-                       if (pvec_pages && i == pvec_pages &&
-                           locked_pages < max_pages) {
-                               dout("reached end pvec, trying for more\n");
-                               pagevec_release(&pvec);
-                               goto get_more_pages;
-                       }
-               }
+               ceph_calc_file_object_mapping(&ci->i_layout, start, end - start + 1,
+                                             &objnum, &objoff, &max_len);
+               max_len = min_t(u64, max_len, i_size - start);
+
+               if (len < max_len)
+                       ceph_extend_writeback(mapping, snapc, &count,
+                                             start, max_len, caching, &len);
+               len = min_t(loff_t, len, max_len);
+       }
+
+       /* We now have a contiguous set of dirty pages, each with writeback
+        * set; the first page is still locked at this point, but all the rest
+        * have been unlocked.
+        */
+       folio_unlock(folio);
 
-new_request:
-               offset = page_offset(pages[0]);
-               len = wsize;
+       /* Create a write request */
+       if (start < i_size) {
+               printk("write back %llx @%llx [%llx]\n", len, start, i_size);
 
+               /* Speculatively write to the cache.  We have to fix this up
+                * later if the store fails.
+                */
+               //ceph_write_to_cache(vnode, start, len, i_size, caching);
+               oplen = len;
+               num_ops = 1;
                req = ceph_osdc_new_request(&fsc->client->osdc,
-                                       &ci->i_layout, vino,
-                                       offset, &len, 0, num_ops,
-                                       CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
-                                       snapc, ceph_wbc.truncate_seq,
-                                       ceph_wbc.truncate_size, false);
+                                           &ci->i_layout, vino,
+                                           start, &oplen, 0, num_ops,
+                                           CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
+                                           snapc, ceph_wbc->truncate_seq,
+                                           ceph_wbc->truncate_size, false);
                if (IS_ERR(req)) {
                        req = ceph_osdc_new_request(&fsc->client->osdc,
-                                               &ci->i_layout, vino,
-                                               offset, &len, 0,
-                                               min(num_ops,
-                                                   CEPH_OSD_SLAB_OPS),
-                                               CEPH_OSD_OP_WRITE,
-                                               CEPH_OSD_FLAG_WRITE,
-                                               snapc, ceph_wbc.truncate_seq,
-                                               ceph_wbc.truncate_size, true);
+                                                   &ci->i_layout, vino,
+                                                   start, &oplen, 0,
+                                                   min(num_ops, CEPH_OSD_SLAB_OPS),
+                                                   CEPH_OSD_OP_WRITE,
+                                                   CEPH_OSD_FLAG_WRITE,
+                                                   snapc, ceph_wbc->truncate_seq,
+                                                   ceph_wbc->truncate_size, true);
                        BUG_ON(IS_ERR(req));
                }
-               BUG_ON(len < page_offset(pages[locked_pages - 1]) +
-                            thp_size(page) - offset);
+               BUG_ON(oplen < len);
 
                req->r_callback = writepages_finish;
                req->r_inode = inode;
+               req->r_mtime = inode->i_mtime;
 
-               /* Format the osd request message and submit the write */
-               len = 0;
-               data_pages = pages;
-               op_idx = 0;
-               for (i = 0; i < locked_pages; i++) {
-                       u64 cur_offset = page_offset(pages[i]);
-                       /*
-                        * Discontinuity in page range? Ceph can handle that by just passing
-                        * multiple extents in the write op.
-                        */
-                       if (offset + len != cur_offset) {
-                               /* If it's full, stop here */
-                               if (op_idx + 1 == req->r_num_ops)
-                                       break;
+               iov_iter_xarray(&iter, WRITE, &mapping->i_pages, start, len);
+               osd_req_op_extent_osd_iter(req, op_idx, &iter);
+               osd_req_op_extent_update(req, op_idx, len);
+               ret = ceph_osdc_start_request(&fsc->client->osdc, req, true);
+               BUG_ON(ret);
+               wbc->nr_to_write = count;
+               req = NULL;
+       } else {
+               printk("write discard %llx @%llx [%llx]\n", len, start, i_size);
 
-                               /* Kick off an fscache write with what we have so far. */
-                               ceph_fscache_write_to_cache(inode, offset, len, caching);
-
-                               /* Start a new extent */
-                               osd_req_op_extent_dup_last(req, op_idx,
-                                                          cur_offset - offset);
-                               dout("writepages got pages at %llu~%llu\n",
-                                    offset, len);
-                               osd_req_op_extent_osd_data_pages(req, op_idx,
-                                                       data_pages, len, 0,
-                                                       from_pool, false);
-                               osd_req_op_extent_update(req, op_idx, len);
-
-                               len = 0;
-                               offset = cur_offset;
-                               data_pages = pages + i;
-                               op_idx++;
-                       }
+               /* The dirty region was entirely beyond the EOF. */
+               fscache_clear_page_bits(mapping, start, len, caching);
+               ceph_pages_written_back(inode, fsc, start, len, false);
+               wbc->nr_to_write = count;
+               ret = 0;
+       }
 
-                       set_page_writeback(pages[i]);
-                       if (caching)
-                               ceph_set_page_fscache(pages[i]);
-                       len += thp_size(page);
-               }
-               ceph_fscache_write_to_cache(inode, offset, len, caching);
-
-               if (ceph_wbc.size_stable) {
-                       len = min(len, ceph_wbc.i_size - offset);
-               } else if (i == locked_pages) {
-                       /* writepages_finish() clears writeback pages
-                        * according to the data length, so make sure
-                        * data length covers all locked pages */
-                       u64 min_len = len + 1 - thp_size(page);
-                       len = get_writepages_data_length(inode, pages[i - 1],
-                                                        offset);
-                       len = max(len, min_len);
-               }
-               dout("writepages got pages at %llu~%llu\n", offset, len);
+       printk("%s() = %d\n", __func__, ret);
+       return ret;
+}
 
-               osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
-                                                0, from_pool, false);
-               osd_req_op_extent_update(req, op_idx, len);
+/*
+ * Scan for a dirty page in the specified snap.
+ */
+static struct folio *ceph_scan_for_writeable_page(struct address_space *mapping,
+                                                 struct ceph_snap_context *snapc,
+                                                 pgoff_t from, pgoff_t to)
+{
+       XA_STATE(xas, &mapping->i_pages, from);
+       struct folio *ret = NULL, *folio;
+
+       rcu_read_lock();
+       xas_for_each_marked(&xas, folio, to, PAGECACHE_TAG_DIRTY) {
+               if (xas_retry(&xas, folio))
+                       continue;
+               if (xa_is_value(folio) || folio->index > to)
+                       break;
+               if (folio_get_private(folio) != snapc)
+                       continue;
+               if (!folio_try_get_rcu(folio))
+                       goto retry;
+               if (unlikely(folio != xas_reload(&xas) ||
+                            folio_get_private(folio) != snapc))
+                       goto put_page;
+               ret = folio;
+               break;
+put_page:
+               folio_put(folio);
+retry:
+               xas_reset(&xas);
+       }
+       rcu_read_unlock();
+       return ret;
+}
 
-               BUG_ON(op_idx + 1 != req->r_num_ops);
-
-               from_pool = false;
-               if (i < locked_pages) {
-                       BUG_ON(num_ops <= req->r_num_ops);
-                       num_ops -= req->r_num_ops;
-                       locked_pages -= i;
-
-                       /* allocate new pages array for next request */
-                       data_pages = pages;
-                       pages = kmalloc_array(locked_pages, sizeof(*pages),
-                                             GFP_NOFS);
-                       if (!pages) {
-                               from_pool = true;
-                               pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
-                               BUG_ON(!pages);
+/*
+ * write a region of pages back to the server
+ */
+static int ceph_writepages_region(struct address_space *mapping,
+                                 struct writeback_control *wbc,
+                                 struct ceph_snap_context *snapc,
+                                 struct ceph_writeback_ctl *ceph_wbc,
+                                 loff_t start, loff_t end, loff_t *_next)
+{
+       struct folio *folio;
+       ssize_t ret;
+       int skips = 0;
+
+       printk("%s(%llx,%llx)", __func__, start, end);
+
+       do {
+               pgoff_t index = start / PAGE_SIZE;
+               pgoff_t pend = end / PAGE_SIZE;
+
+               folio = ceph_scan_for_writeable_page(mapping, snapc, index, pend);
+               if (!folio)
+                       break;
+
+               start = folio_pos(folio); /* May regress with THPs */
+
+               printk("wback %lx\n", folio_index(folio));
+
+               /* At this point we hold neither the i_pages lock nor the
+                * page lock: the page may be truncated or invalidated
+                * (changing page->mapping to NULL), or even swizzled
+                * back from swapper_space to tmpfs file mapping
+                */
+               if (wbc->sync_mode != WB_SYNC_NONE) {
+                       ret = folio_lock_killable(folio);
+                       if (ret < 0) {
+                               folio_put(folio);
+                               return ret;
                        }
-                       memcpy(pages, data_pages + i,
-                              locked_pages * sizeof(*pages));
-                       memset(data_pages + i, 0,
-                              locked_pages * sizeof(*pages));
                } else {
-                       BUG_ON(num_ops != req->r_num_ops);
-                       index = pages[i - 1]->index + 1;
-                       /* request message now owns the pages array */
-                       pages = NULL;
+                       if (!folio_trylock(folio)) {
+                               folio_put(folio);
+                               return 0;
+                       }
                }
 
-               req->r_mtime = inode->i_mtime;
-               rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
-               BUG_ON(rc);
-               req = NULL;
+               if (folio_mapping(folio) != mapping ||
+                   folio_get_private(folio) != snapc ||
+                   !folio_test_dirty(folio)) {
+                       start += folio_size(folio);
+                       folio_unlock(folio);
+                       folio_put(folio);
+                       continue;
+               }
 
-               wbc->nr_to_write -= i;
-               if (pages)
-                       goto new_request;
+               if (folio_test_writeback(folio) ||
+                   folio_test_fscache(folio)) {
+                       folio_unlock(folio);
+                       if (wbc->sync_mode != WB_SYNC_NONE) {
+                               folio_wait_writeback(folio);
+                               folio_wait_fscache(folio);
+                       } else {
+                               start += folio_size(folio);
+                       }
+                       folio_put(folio);
+                       if (wbc->sync_mode == WB_SYNC_NONE) {
+                               if (skips >= 5 || need_resched())
+                                       break;
+                               skips++;
+                       }
+                       continue;
+               }
 
-               /*
-                * We stop writing back only if we are not doing
-                * integrity sync. In case of integrity sync we have to
-                * keep going until we have written all the pages
-                * we tagged for writeback prior to entering this loop.
-                */
-               if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
-                       done = true;
+               if (!folio_clear_dirty_for_io(folio))
+                       BUG();
+               ret = ceph_write_back_from_locked_folio(mapping, wbc, snapc, ceph_wbc,
+                                                       folio, start, pend);
+               folio_put(folio);
+               if (ret < 0)
+                       return ret;
 
-release_pvec_pages:
-               dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
-                    pvec.nr ? pvec.pages[0] : NULL);
-               pagevec_release(&pvec);
-       }
+               start += ret;
 
-       if (should_loop && !done) {
-               /* more to do; loop back to beginning of file */
-               dout("writepages looping back to beginning of file\n");
-               end = start_index - 1; /* OK even when start_index == 0 */
-
-               /* to write dirty pages associated with next snapc,
-                * we need to wait until current writes complete */
-               if (wbc->sync_mode != WB_SYNC_NONE &&
-                   start_index == 0 && /* all dirty pages were checked */
-                   !ceph_wbc.head_snapc) {
-                       struct page *page;
-                       unsigned i, nr;
-                       index = 0;
-                       while ((index <= end) &&
-                              (nr = pagevec_lookup_tag(&pvec, mapping, &index,
-                                               PAGECACHE_TAG_WRITEBACK))) {
-                               for (i = 0; i < nr; i++) {
-                                       page = pvec.pages[i];
-                                       if (page_snap_context(page) != snapc)
-                                               continue;
-                                       wait_on_page_writeback(page);
-                               }
-                               pagevec_release(&pvec);
-                               cond_resched();
-                       }
+               cond_resched();
+       } while (wbc->nr_to_write > 0);
+
+       *_next = start;
+       printk("%s() = 0 [%llx]\n", __func__, *_next);
+       return 0;
+}
+
+/*
+ * write some of the pending data back to the server
+ */
+static int ceph_writepages(struct address_space *mapping,
+                          struct writeback_control *wbc)
+{
+       struct inode *inode = mapping->host;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
+       struct ceph_writeback_ctl ceph_wbc;
+       unsigned int wsize = i_blocksize(inode);
+       loff_t start, next;
+       int ret;
+
+       printk("writepages_start %p (mode=%s)\n", inode,
+              wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
+              (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
+
+       if (ceph_inode_is_shutdown(inode)) {
+               if (ci->i_wrbuffer_ref > 0) {
+                       pr_warn_ratelimited(
+                               "writepage_start %p %lld forced umount\n",
+                               inode, ceph_ino(inode));
                }
+               mapping_set_error(mapping, -EIO);
+               return -EIO; /* we're in a forced umount, don't write! */
+       }
+       if (fsc->mount_options->wsize < wsize)
+               wsize = fsc->mount_options->wsize;
 
-               start_index = 0;
-               index = 0;
-               goto retry;
+       /* find oldest snap context with dirty data */
+       snapc = get_oldest_context(inode, &ceph_wbc, NULL);
+       if (!snapc) {
+               /* hmm, why does writepages get called when there
+                  is no dirty data? */
+               printk(" no snap context with dirty data?\n");
+               goto out;
        }
+       printk(" oldest snapc is %p seq %lld (%d snaps)\n",
+              snapc, snapc->seq, snapc->num_snaps);
 
-       if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
-               mapping->writeback_index = index;
+       /* Decide whether/how to use the range specification we were given. */
+       if (ceph_wbc.head_snapc && snapc != last_snapc) {
+               if (wbc->range_cyclic) {
+                       start = mapping->writeback_index * PAGE_SIZE;
+                       ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+                                                    start, LLONG_MAX, &next);
+                       if (ret != 0)
+                               goto out;
+
+                       mapping->writeback_index = next / PAGE_SIZE;
+                       if (start == 0 || wbc->nr_to_write <= 0)
+                               goto out;
+
+                       ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+                                                    0, start, &next);
+                       if (ret == 0)
+                               mapping->writeback_index = next / PAGE_SIZE;
+               } else if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) {
+                       ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+                                                    0, LLONG_MAX, &next);
+                       if (wbc->nr_to_write > 0 && ret == 0)
+                               mapping->writeback_index = next / PAGE_SIZE;
+               } else {
+                       ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+                                                    wbc->range_start, wbc->range_end,
+                                                    &next);
+               }
+       } else if (!ceph_wbc.head_snapc) {
+               /* Do not respect wbc->range_{start,end}. Dirty pages
+                * in that range can be associated with newer snapc.
+                * They are not writeable until we write all dirty pages
+                * associated with 'snapc' get written */
+               printk(" non-head snapc, range whole\n");
+               ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+                                            0, LLONG_MAX, &next);
+       }
 
 out:
-       ceph_osdc_put_request(req);
        ceph_put_snap_context(last_snapc);
-       dout("writepages dend - startone, rc = %d\n", rc);
-       return rc;
+       last_snapc = snapc;
+       return ret;
 }
 
-
-
 /*
  * See if a given @snapc is either writeable, or already written.
  */
@@ -1350,7 +1330,7 @@ const struct address_space_operations ceph_aops = {
        .readpage = netfs_readpage,
        .readahead = netfs_readahead,
        .writepage = ceph_writepage,
-       .writepages = ceph_writepages_start,
+       .writepages = ceph_writepages,
        .write_begin = ceph_write_begin,
        .write_end = ceph_write_end,
        .dirty_folio = ceph_dirty_folio,