return snapc;
}
-static u64 get_writepages_data_length(struct inode *inode,
- struct page *page, u64 start)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_snap_context *snapc = page_snap_context(page);
- struct ceph_cap_snap *capsnap = NULL;
- u64 end = i_size_read(inode);
-
- if (snapc != ci->i_head_snapc) {
- bool found = false;
- spin_lock(&ci->i_ceph_lock);
- list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
- if (capsnap->context == snapc) {
- if (!capsnap->writing)
- end = capsnap->size;
- found = true;
- break;
- }
- }
- spin_unlock(&ci->i_ceph_lock);
- WARN_ON(!found);
- }
- if (end > page_offset(page) + thp_size(page))
- end = page_offset(page) + thp_size(page);
- return end > start ? end - start : 0;
-}
-
/*
* Write a single page, but leave the page locked.
*
return err;
}
+/*
+ * completion of write to server
+ */
+static void ceph_pages_written_back(struct inode *inode,
+ struct ceph_fs_client *fsc,
+ loff_t start, unsigned int len,
+ bool lost_cap)
+{
+ struct address_space *mapping = inode->i_mapping;
+ struct folio *folio;
+ pgoff_t end;
+
+ XA_STATE(xas, &mapping->i_pages, start / PAGE_SIZE);
+
+ printk("written_back %x @%llx\n", len, start);
+
+ rcu_read_lock();
+
+ end = (start + len - 1) / PAGE_SIZE;
+ xas_for_each(&xas, folio, end) {
+ if (!folio_test_writeback(folio)) {
+ printk("bad %x @%llx page %lx %lx\n",
+ len, start, folio_index(folio), end);
+ BUG();
+ }
+
+ if (atomic_long_dec_return(&fsc->writeback_count) <
+ CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
+ fsc->write_congested = false;
+
+ ceph_put_snap_context(folio_detach_private(folio));
+ folio_end_writeback(folio);
+
+ if (lost_cap) {
+ xas_pause(&xas);
+ generic_error_remove_page(inode->i_mapping, folio_page(folio, 0));
+ }
+
+ folio_unlock(folio);
+ }
+
+ rcu_read_unlock();
+}
+
/*
* async writeback completion handler.
*
struct inode *inode = req->r_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_data *osd_data;
- struct page *page;
- int num_pages, total_pages = 0;
- int i, j;
- int rc = req->r_result;
struct ceph_snap_context *snapc = req->r_snapc;
struct address_space *mapping = inode->i_mapping;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ loff_t start = req->r_data_offset;
unsigned int len = 0;
- bool remove_page;
+ bool lost_cap;
+ int num_pages, total_pages = 0;
+ int i;
+ int rc = req->r_result;
- dout("writepages_finish %p rc %d\n", inode, rc);
+ printk("writepages_finish %p rc %d\n", inode, rc);
if (rc < 0) {
mapping_set_error(mapping, rc);
ceph_set_error_write(ci);
* page truncation thread, possibly losing some data that
* raced its way in
*/
- remove_page = !(ceph_caps_issued(ci) &
- (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+ lost_cap = !(ceph_caps_issued(ci) &
+ (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
/* clean all pages */
for (i = 0; i < req->r_num_ops; i++) {
}
osd_data = osd_req_op_extent_osd_data(req, i);
- BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
- len += osd_data->length;
- num_pages = calc_pages_for((u64)osd_data->alignment,
- (u64)osd_data->length);
+ num_pages = calc_pages_for(osd_data->alignment,
+ osd_data->length);
total_pages += num_pages;
- for (j = 0; j < num_pages; j++) {
- page = osd_data->pages[j];
- BUG_ON(!page);
- WARN_ON(!PageUptodate(page));
-
- if (atomic_long_dec_return(&fsc->writeback_count) <
- CONGESTION_OFF_THRESH(
- fsc->mount_options->congestion_kb))
- fsc->write_congested = false;
-
- ceph_put_snap_context(detach_page_private(page));
- end_page_writeback(page);
- dout("unlocking %p\n", page);
-
- if (remove_page)
- generic_error_remove_page(inode->i_mapping,
- page);
-
- unlock_page(page);
- }
+ ceph_pages_written_back(inode, fsc, start, osd_data->length, lost_cap);
dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
- inode, osd_data->length, rc >= 0 ? num_pages : 0);
-
- release_pages(osd_data->pages, num_pages);
+ inode, osd_data->length, num_pages);
+ start += osd_data->length;
+ len += osd_data->length;
}
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
osd_data = osd_req_op_extent_osd_data(req, 0);
- if (osd_data->pages_from_pool)
- mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
- else
- kfree(osd_data->pages);
ceph_osdc_put_request(req);
}
/*
- * initiate async writeback
+ * Extend the region to be written back to include subsequent contiguously
+ * dirty pages if possible, but don't sleep while doing so.
+ *
+ * If this page holds new content, then we can include filler zeros in the
+ * writeback.
*/
-static int ceph_writepages_start(struct address_space *mapping,
- struct writeback_control *wbc)
+static void ceph_extend_writeback(struct address_space *mapping,
+ struct ceph_snap_context *snapc,
+ long *_count,
+ loff_t start,
+ loff_t max_len,
+ bool caching,
+ u64 *_len)
{
- struct inode *inode = mapping->host;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_vino vino = ceph_vino(inode);
- pgoff_t index, start_index, end = -1;
- struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
struct pagevec pvec;
- int rc = 0;
- unsigned int wsize = i_blocksize(inode);
- struct ceph_osd_request *req = NULL;
- struct ceph_writeback_ctl ceph_wbc;
- bool should_loop, range_whole = false;
- bool done = false;
- bool caching = ceph_is_cache_enabled(inode);
-
- if (wbc->sync_mode == WB_SYNC_NONE &&
- fsc->write_congested)
- return 0;
-
- dout("writepages_start %p (mode=%s)\n", inode,
- wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
- (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
-
- if (ceph_inode_is_shutdown(inode)) {
- if (ci->i_wrbuffer_ref > 0) {
- pr_warn_ratelimited(
- "writepage_start %p %lld forced umount\n",
- inode, ceph_ino(inode));
- }
- mapping_set_error(mapping, -EIO);
- return -EIO; /* we're in a forced umount, don't write! */
- }
- if (fsc->mount_options->wsize < wsize)
- wsize = fsc->mount_options->wsize;
+ struct folio *folio;
+ loff_t len = *_len;
+ pgoff_t index = (start + len) / PAGE_SIZE;
+ bool stop = true;
+ unsigned int i;
+ XA_STATE(xas, &mapping->i_pages, index);
pagevec_init(&pvec);
- start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
- index = start_index;
-
-retry:
- /* find oldest snap context with dirty data */
- snapc = get_oldest_context(inode, &ceph_wbc, NULL);
- if (!snapc) {
- /* hmm, why does writepages get called when there
- is no dirty data? */
- dout(" no snap context with dirty data?\n");
- goto out;
- }
- dout(" oldest snapc is %p seq %lld (%d snaps)\n",
- snapc, snapc->seq, snapc->num_snaps);
-
- should_loop = false;
- if (ceph_wbc.head_snapc && snapc != last_snapc) {
- /* where to start/end? */
- if (wbc->range_cyclic) {
- index = start_index;
- end = -1;
- if (index > 0)
- should_loop = true;
- dout(" cyclic, start at %lu\n", index);
- } else {
- index = wbc->range_start >> PAGE_SHIFT;
- end = wbc->range_end >> PAGE_SHIFT;
- if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
- range_whole = true;
- dout(" not cyclic, %lu to %lu\n", index, end);
- }
- } else if (!ceph_wbc.head_snapc) {
- /* Do not respect wbc->range_{start,end}. Dirty pages
- * in that range can be associated with newer snapc.
- * They are not writeable until we write all dirty pages
- * associated with 'snapc' get written */
- if (index > 0)
- should_loop = true;
- dout(" non-head snapc, range whole\n");
- }
-
- ceph_put_snap_context(last_snapc);
- last_snapc = snapc;
-
- while (!done && index <= end) {
- int num_ops = 0, op_idx;
- unsigned i, pvec_pages, max_pages, locked_pages = 0;
- struct page **pages = NULL, **data_pages;
- struct page *page;
- pgoff_t strip_unit_end = 0;
- u64 offset = 0, len = 0;
- bool from_pool = false;
-
- max_pages = wsize >> PAGE_SHIFT;
+ do {
+ /* Firstly, we gather up a batch of contiguous dirty pages
+ * under the RCU read lock - but we can't clear the dirty flags
+ * there if any of those pages are mapped.
+ */
+ rcu_read_lock();
-get_more_pages:
- pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
- end, PAGECACHE_TAG_DIRTY);
- dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
- if (!pvec_pages && !locked_pages)
- break;
- for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
- page = pvec.pages[i];
- dout("? %p idx %lu\n", page, page->index);
- if (locked_pages == 0)
- lock_page(page); /* first page */
- else if (!trylock_page(page))
+ xas_for_each(&xas, folio, ULONG_MAX) {
+ stop = true;
+ if (xas_retry(&xas, folio))
+ continue;
+ if (xa_is_value(folio))
+ break;
+ if (folio_index(folio) != index ||
+ folio_get_private(folio) != snapc)
break;
- /* only dirty pages, or our accounting breaks */
- if (unlikely(!PageDirty(page)) ||
- unlikely(page->mapping != mapping)) {
- dout("!dirty or !mapping %p\n", page);
- unlock_page(page);
- continue;
- }
- /* only if matching snap context */
- pgsnapc = page_snap_context(page);
- if (pgsnapc != snapc) {
- dout("page snapc %p %lld != oldest %p %lld\n",
- pgsnapc, pgsnapc->seq, snapc, snapc->seq);
- if (!should_loop &&
- !ceph_wbc.head_snapc &&
- wbc->sync_mode != WB_SYNC_NONE)
- should_loop = true;
- unlock_page(page);
+ if (!folio_try_get_rcu(folio)) {
+ xas_reset(&xas);
continue;
}
- if (page_offset(page) >= ceph_wbc.i_size) {
- struct folio *folio = page_folio(page);
-
- dout("folio at %lu beyond eof %llu\n",
- folio->index, ceph_wbc.i_size);
- if ((ceph_wbc.size_stable ||
- folio_pos(folio) >= i_size_read(inode)) &&
- folio_clear_dirty_for_io(folio))
- folio_invalidate(folio, 0,
- folio_size(folio));
- folio_unlock(folio);
- continue;
+
+ /* Has the page moved or been split? */
+ if (unlikely(folio != xas_reload(&xas))) {
+ folio_put(folio);
+ break;
}
- if (strip_unit_end && (page->index > strip_unit_end)) {
- dout("end of strip unit %p\n", page);
- unlock_page(page);
+
+ if (!folio_trylock(folio)) {
+ folio_put(folio);
break;
}
- if (PageWriteback(page) || PageFsCache(page)) {
- if (wbc->sync_mode == WB_SYNC_NONE) {
- dout("%p under writeback\n", page);
- unlock_page(page);
- continue;
- }
- dout("waiting on writeback %p\n", page);
- wait_on_page_writeback(page);
- wait_on_page_fscache(page);
+ if (folio_get_private(folio) != snapc ||
+ !folio_test_dirty(folio) ||
+ folio_test_writeback(folio) ||
+ folio_test_fscache(folio)) {
+ folio_unlock(folio);
+ folio_put(folio);
+ break;
}
- if (!clear_page_dirty_for_io(page)) {
- dout("%p !clear_page_dirty_for_io\n", page);
- unlock_page(page);
- continue;
- }
+ index += folio_nr_pages(folio);
+ if (!pagevec_add(&pvec, &folio->page))
+ break;
+ if (stop)
+ break;
+ }
- /*
- * We have something to write. If this is
- * the first locked page this time through,
- * calculate max possinle write size and
- * allocate a page array
- */
- if (locked_pages == 0) {
- u64 objnum;
- u64 objoff;
- u32 xlen;
-
- /* prepare async write request */
- offset = (u64)page_offset(page);
- ceph_calc_file_object_mapping(&ci->i_layout,
- offset, wsize,
- &objnum, &objoff,
- &xlen);
- len = xlen;
-
- num_ops = 1;
- strip_unit_end = page->index +
- ((len - 1) >> PAGE_SHIFT);
-
- BUG_ON(pages);
- max_pages = calc_pages_for(0, (u64)len);
- pages = kmalloc_array(max_pages,
- sizeof(*pages),
- GFP_NOFS);
- if (!pages) {
- from_pool = true;
- pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
- BUG_ON(!pages);
- }
-
- len = 0;
- } else if (page->index !=
- (offset + len) >> PAGE_SHIFT) {
- if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
- CEPH_OSD_MAX_OPS)) {
- redirty_page_for_writepage(wbc, page);
- unlock_page(page);
- break;
- }
+ if (!stop)
+ xas_pause(&xas);
+ rcu_read_unlock();
- num_ops++;
- offset = (u64)page_offset(page);
- len = 0;
- }
+ /* Now, if we obtained any pages, we can shift them to being
+ * writable and mark them for caching.
+ */
+ if (!pagevec_count(&pvec))
+ break;
- /* note position of first page in pvec */
- dout("%p will write page %p idx %lu\n",
- inode, page, page->index);
+ for (i = 0; i < pagevec_count(&pvec); i++) {
+ folio = page_folio(pvec.pages[i]);
+ if (!folio_clear_dirty_for_io(folio))
+ BUG();
+ if (folio_start_writeback(folio))
+ BUG();
+ //ceph_folio_start_fscache(caching, folio);
- if (atomic_long_inc_return(&fsc->writeback_count) >
- CONGESTION_ON_THRESH(
- fsc->mount_options->congestion_kb))
- fsc->write_congested = true;
+ *_count -= folio_nr_pages(folio);
+ folio_unlock(folio);
+ }
- pages[locked_pages++] = page;
- pvec.pages[i] = NULL;
+ pagevec_release(&pvec);
+ cond_resched();
+ } while (!stop);
- len += thp_size(page);
- }
+ *_len = len;
+}
- /* did we get anything? */
- if (!locked_pages)
- goto release_pvec_pages;
- if (i) {
- unsigned j, n = 0;
- /* shift unused page to beginning of pvec */
- for (j = 0; j < pvec_pages; j++) {
- if (!pvec.pages[j])
- continue;
- if (n < j)
- pvec.pages[n] = pvec.pages[j];
- n++;
- }
- pvec.nr = n;
+/*
+ * Synchronously write back the locked page and any subsequent non-locked dirty
+ * pages.
+ */
+static ssize_t ceph_write_back_from_locked_folio(struct address_space *mapping,
+ struct writeback_control *wbc,
+ struct ceph_snap_context *snapc,
+ struct ceph_writeback_ctl *ceph_wbc,
+ struct folio *folio,
+ loff_t start, loff_t end)
+{
+ struct inode *inode = mapping->host;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_vino vino = ceph_vino(inode);
+ struct ceph_osd_request *req = NULL;
+ struct iov_iter iter;
+ unsigned int max_len;
+ loff_t i_size = i_size_read(inode);
+ bool caching = false; //fscache_cookie_enabled(ceph_fscache_cookie(ci));
+ long count = wbc->nr_to_write;
+ u64 len, oplen;
+ int ret, num_ops, op_idx = 0;
+
+ if (folio_start_writeback(folio))
+ BUG();
+ //ceph_folio_start_fscache(caching, folio);
+
+ count -= folio_nr_pages(folio);
+
+ /* Find all consecutive lockable dirty pages that have contiguous
+ * written regions, stopping when we find a page that is not
+ * immediately lockable, is not dirty or is missing, or we reach the
+ * end of the range.
+ */
+ len = folio_size(folio);
+ if (start < i_size) {
+ /* Trim the write to the EOF; the extra data is ignored. Also
+ * put an upper limit on the size of a single storedata op.
+ */
+ u64 objnum, objoff;
- if (pvec_pages && i == pvec_pages &&
- locked_pages < max_pages) {
- dout("reached end pvec, trying for more\n");
- pagevec_release(&pvec);
- goto get_more_pages;
- }
- }
+ ceph_calc_file_object_mapping(&ci->i_layout, start, end - start + 1,
+ &objnum, &objoff, &max_len);
+ max_len = min_t(u64, max_len, i_size - start);
+
+ if (len < max_len)
+ ceph_extend_writeback(mapping, snapc, &count,
+ start, max_len, caching, &len);
+ len = min_t(loff_t, len, max_len);
+ }
+
+ /* We now have a contiguous set of dirty pages, each with writeback
+ * set; the first page is still locked at this point, but all the rest
+ * have been unlocked.
+ */
+ folio_unlock(folio);
-new_request:
- offset = page_offset(pages[0]);
- len = wsize;
+ /* Create a write request */
+ if (start < i_size) {
+ printk("write back %llx @%llx [%llx]\n", len, start, i_size);
+ /* Speculatively write to the cache. We have to fix this up
+ * later if the store fails.
+ */
+ //ceph_write_to_cache(vnode, start, len, i_size, caching);
+ oplen = len;
+ num_ops = 1;
req = ceph_osdc_new_request(&fsc->client->osdc,
- &ci->i_layout, vino,
- offset, &len, 0, num_ops,
- CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
- snapc, ceph_wbc.truncate_seq,
- ceph_wbc.truncate_size, false);
+ &ci->i_layout, vino,
+ start, &oplen, 0, num_ops,
+ CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
+ snapc, ceph_wbc->truncate_seq,
+ ceph_wbc->truncate_size, false);
if (IS_ERR(req)) {
req = ceph_osdc_new_request(&fsc->client->osdc,
- &ci->i_layout, vino,
- offset, &len, 0,
- min(num_ops,
- CEPH_OSD_SLAB_OPS),
- CEPH_OSD_OP_WRITE,
- CEPH_OSD_FLAG_WRITE,
- snapc, ceph_wbc.truncate_seq,
- ceph_wbc.truncate_size, true);
+ &ci->i_layout, vino,
+ start, &oplen, 0,
+ min(num_ops, CEPH_OSD_SLAB_OPS),
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_FLAG_WRITE,
+ snapc, ceph_wbc->truncate_seq,
+ ceph_wbc->truncate_size, true);
BUG_ON(IS_ERR(req));
}
- BUG_ON(len < page_offset(pages[locked_pages - 1]) +
- thp_size(page) - offset);
+ BUG_ON(oplen < len);
req->r_callback = writepages_finish;
req->r_inode = inode;
+ req->r_mtime = inode->i_mtime;
- /* Format the osd request message and submit the write */
- len = 0;
- data_pages = pages;
- op_idx = 0;
- for (i = 0; i < locked_pages; i++) {
- u64 cur_offset = page_offset(pages[i]);
- /*
- * Discontinuity in page range? Ceph can handle that by just passing
- * multiple extents in the write op.
- */
- if (offset + len != cur_offset) {
- /* If it's full, stop here */
- if (op_idx + 1 == req->r_num_ops)
- break;
+ iov_iter_xarray(&iter, WRITE, &mapping->i_pages, start, len);
+ osd_req_op_extent_osd_iter(req, op_idx, &iter);
+ osd_req_op_extent_update(req, op_idx, len);
+ ret = ceph_osdc_start_request(&fsc->client->osdc, req, true);
+ BUG_ON(ret);
+ wbc->nr_to_write = count;
+ req = NULL;
+ } else {
+ printk("write discard %llx @%llx [%llx]\n", len, start, i_size);
- /* Kick off an fscache write with what we have so far. */
- ceph_fscache_write_to_cache(inode, offset, len, caching);
-
- /* Start a new extent */
- osd_req_op_extent_dup_last(req, op_idx,
- cur_offset - offset);
- dout("writepages got pages at %llu~%llu\n",
- offset, len);
- osd_req_op_extent_osd_data_pages(req, op_idx,
- data_pages, len, 0,
- from_pool, false);
- osd_req_op_extent_update(req, op_idx, len);
-
- len = 0;
- offset = cur_offset;
- data_pages = pages + i;
- op_idx++;
- }
+ /* The dirty region was entirely beyond the EOF. */
+ fscache_clear_page_bits(mapping, start, len, caching);
+ ceph_pages_written_back(inode, fsc, start, len, false);
+ wbc->nr_to_write = count;
+ ret = 0;
+ }
- set_page_writeback(pages[i]);
- if (caching)
- ceph_set_page_fscache(pages[i]);
- len += thp_size(page);
- }
- ceph_fscache_write_to_cache(inode, offset, len, caching);
-
- if (ceph_wbc.size_stable) {
- len = min(len, ceph_wbc.i_size - offset);
- } else if (i == locked_pages) {
- /* writepages_finish() clears writeback pages
- * according to the data length, so make sure
- * data length covers all locked pages */
- u64 min_len = len + 1 - thp_size(page);
- len = get_writepages_data_length(inode, pages[i - 1],
- offset);
- len = max(len, min_len);
- }
- dout("writepages got pages at %llu~%llu\n", offset, len);
+ printk("%s() = %d\n", __func__, ret);
+ return ret;
+}
- osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
- 0, from_pool, false);
- osd_req_op_extent_update(req, op_idx, len);
+/*
+ * Scan for a dirty page in the specified snap.
+ */
+static struct folio *ceph_scan_for_writeable_page(struct address_space *mapping,
+ struct ceph_snap_context *snapc,
+ pgoff_t from, pgoff_t to)
+{
+ XA_STATE(xas, &mapping->i_pages, from);
+ struct folio *ret = NULL, *folio;
+
+ rcu_read_lock();
+ xas_for_each_marked(&xas, folio, to, PAGECACHE_TAG_DIRTY) {
+ if (xas_retry(&xas, folio))
+ continue;
+ if (xa_is_value(folio) || folio->index > to)
+ break;
+ if (folio_get_private(folio) != snapc)
+ continue;
+ if (!folio_try_get_rcu(folio))
+ goto retry;
+ if (unlikely(folio != xas_reload(&xas) ||
+ folio_get_private(folio) != snapc))
+ goto put_page;
+ ret = folio;
+ break;
+put_page:
+ folio_put(folio);
+retry:
+ xas_reset(&xas);
+ }
+ rcu_read_unlock();
+ return ret;
+}
- BUG_ON(op_idx + 1 != req->r_num_ops);
-
- from_pool = false;
- if (i < locked_pages) {
- BUG_ON(num_ops <= req->r_num_ops);
- num_ops -= req->r_num_ops;
- locked_pages -= i;
-
- /* allocate new pages array for next request */
- data_pages = pages;
- pages = kmalloc_array(locked_pages, sizeof(*pages),
- GFP_NOFS);
- if (!pages) {
- from_pool = true;
- pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
- BUG_ON(!pages);
+/*
+ * write a region of pages back to the server
+ */
+static int ceph_writepages_region(struct address_space *mapping,
+ struct writeback_control *wbc,
+ struct ceph_snap_context *snapc,
+ struct ceph_writeback_ctl *ceph_wbc,
+ loff_t start, loff_t end, loff_t *_next)
+{
+ struct folio *folio;
+ ssize_t ret;
+ int skips = 0;
+
+ printk("%s(%llx,%llx)", __func__, start, end);
+
+ do {
+ pgoff_t index = start / PAGE_SIZE;
+ pgoff_t pend = end / PAGE_SIZE;
+
+ folio = ceph_scan_for_writeable_page(mapping, snapc, index, pend);
+ if (!folio)
+ break;
+
+ start = folio_pos(folio); /* May regress with THPs */
+
+ printk("wback %lx\n", folio_index(folio));
+
+ /* At this point we hold neither the i_pages lock nor the
+ * page lock: the page may be truncated or invalidated
+ * (changing page->mapping to NULL), or even swizzled
+ * back from swapper_space to tmpfs file mapping
+ */
+ if (wbc->sync_mode != WB_SYNC_NONE) {
+ ret = folio_lock_killable(folio);
+ if (ret < 0) {
+ folio_put(folio);
+ return ret;
}
- memcpy(pages, data_pages + i,
- locked_pages * sizeof(*pages));
- memset(data_pages + i, 0,
- locked_pages * sizeof(*pages));
} else {
- BUG_ON(num_ops != req->r_num_ops);
- index = pages[i - 1]->index + 1;
- /* request message now owns the pages array */
- pages = NULL;
+ if (!folio_trylock(folio)) {
+ folio_put(folio);
+ return 0;
+ }
}
- req->r_mtime = inode->i_mtime;
- rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
- BUG_ON(rc);
- req = NULL;
+ if (folio_mapping(folio) != mapping ||
+ folio_get_private(folio) != snapc ||
+ !folio_test_dirty(folio)) {
+ start += folio_size(folio);
+ folio_unlock(folio);
+ folio_put(folio);
+ continue;
+ }
- wbc->nr_to_write -= i;
- if (pages)
- goto new_request;
+ if (folio_test_writeback(folio) ||
+ folio_test_fscache(folio)) {
+ folio_unlock(folio);
+ if (wbc->sync_mode != WB_SYNC_NONE) {
+ folio_wait_writeback(folio);
+ folio_wait_fscache(folio);
+ } else {
+ start += folio_size(folio);
+ }
+ folio_put(folio);
+ if (wbc->sync_mode == WB_SYNC_NONE) {
+ if (skips >= 5 || need_resched())
+ break;
+ skips++;
+ }
+ continue;
+ }
- /*
- * We stop writing back only if we are not doing
- * integrity sync. In case of integrity sync we have to
- * keep going until we have written all the pages
- * we tagged for writeback prior to entering this loop.
- */
- if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
- done = true;
+ if (!folio_clear_dirty_for_io(folio))
+ BUG();
+ ret = ceph_write_back_from_locked_folio(mapping, wbc, snapc, ceph_wbc,
+ folio, start, pend);
+ folio_put(folio);
+ if (ret < 0)
+ return ret;
-release_pvec_pages:
- dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
- pvec.nr ? pvec.pages[0] : NULL);
- pagevec_release(&pvec);
- }
+ start += ret;
- if (should_loop && !done) {
- /* more to do; loop back to beginning of file */
- dout("writepages looping back to beginning of file\n");
- end = start_index - 1; /* OK even when start_index == 0 */
-
- /* to write dirty pages associated with next snapc,
- * we need to wait until current writes complete */
- if (wbc->sync_mode != WB_SYNC_NONE &&
- start_index == 0 && /* all dirty pages were checked */
- !ceph_wbc.head_snapc) {
- struct page *page;
- unsigned i, nr;
- index = 0;
- while ((index <= end) &&
- (nr = pagevec_lookup_tag(&pvec, mapping, &index,
- PAGECACHE_TAG_WRITEBACK))) {
- for (i = 0; i < nr; i++) {
- page = pvec.pages[i];
- if (page_snap_context(page) != snapc)
- continue;
- wait_on_page_writeback(page);
- }
- pagevec_release(&pvec);
- cond_resched();
- }
+ cond_resched();
+ } while (wbc->nr_to_write > 0);
+
+ *_next = start;
+ printk("%s() = 0 [%llx]\n", __func__, *_next);
+ return 0;
+}
+
+/*
+ * write some of the pending data back to the server
+ */
+static int ceph_writepages(struct address_space *mapping,
+ struct writeback_control *wbc)
+{
+ struct inode *inode = mapping->host;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
+ struct ceph_writeback_ctl ceph_wbc;
+ unsigned int wsize = i_blocksize(inode);
+ loff_t start, next;
+ int ret;
+
+ printk("writepages_start %p (mode=%s)\n", inode,
+ wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
+ (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
+
+ if (ceph_inode_is_shutdown(inode)) {
+ if (ci->i_wrbuffer_ref > 0) {
+ pr_warn_ratelimited(
+ "writepage_start %p %lld forced umount\n",
+ inode, ceph_ino(inode));
}
+ mapping_set_error(mapping, -EIO);
+ return -EIO; /* we're in a forced umount, don't write! */
+ }
+ if (fsc->mount_options->wsize < wsize)
+ wsize = fsc->mount_options->wsize;
- start_index = 0;
- index = 0;
- goto retry;
+ /* find oldest snap context with dirty data */
+ snapc = get_oldest_context(inode, &ceph_wbc, NULL);
+ if (!snapc) {
+ /* hmm, why does writepages get called when there
+ is no dirty data? */
+ printk(" no snap context with dirty data?\n");
+ goto out;
}
+ printk(" oldest snapc is %p seq %lld (%d snaps)\n",
+ snapc, snapc->seq, snapc->num_snaps);
- if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
- mapping->writeback_index = index;
+ /* Decide whether/how to use the range specification we were given. */
+ if (ceph_wbc.head_snapc && snapc != last_snapc) {
+ if (wbc->range_cyclic) {
+ start = mapping->writeback_index * PAGE_SIZE;
+ ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+ start, LLONG_MAX, &next);
+ if (ret != 0)
+ goto out;
+
+ mapping->writeback_index = next / PAGE_SIZE;
+ if (start == 0 || wbc->nr_to_write <= 0)
+ goto out;
+
+ ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+ 0, start, &next);
+ if (ret == 0)
+ mapping->writeback_index = next / PAGE_SIZE;
+ } else if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) {
+ ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+ 0, LLONG_MAX, &next);
+ if (wbc->nr_to_write > 0 && ret == 0)
+ mapping->writeback_index = next / PAGE_SIZE;
+ } else {
+ ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+ wbc->range_start, wbc->range_end,
+ &next);
+ }
+ } else if (!ceph_wbc.head_snapc) {
+ /* Do not respect wbc->range_{start,end}. Dirty pages
+ * in that range can be associated with newer snapc.
+ * They are not writeable until we write all dirty pages
+ * associated with 'snapc' get written */
+ printk(" non-head snapc, range whole\n");
+ ret = ceph_writepages_region(mapping, wbc, snapc, &ceph_wbc,
+ 0, LLONG_MAX, &next);
+ }
out:
- ceph_osdc_put_request(req);
ceph_put_snap_context(last_snapc);
- dout("writepages dend - startone, rc = %d\n", rc);
- return rc;
+ last_snapc = snapc;
+ return ret;
}
-
-
/*
* See if a given @snapc is either writeable, or already written.
*/
.readpage = netfs_readpage,
.readahead = netfs_readahead,
.writepage = ceph_writepage,
- .writepages = ceph_writepages_start,
+ .writepages = ceph_writepages,
.write_begin = ceph_write_begin,
.write_end = ceph_write_end,
.dirty_folio = ceph_dirty_folio,