int l, bad;
while (left > 0) {
- l = min_t(int, PAGE_SIZE-po, left);
+ l = min_t(int, PAGE_CACHE_SIZE-po, left);
bad = copy_from_user(page_address(pages[i]) + po, data, l);
if (bad == l)
return -EFAULT;
return len;
}
+/*
+ * Zero an extent within a page vector. Offset is relative to the
+ * start of the first page.
+ */
+static void zero_page_vector_range(int off, int len, struct page **pages)
+{
+ int i = off >> PAGE_CACHE_SHIFT;
+
+ dout("zero_page_vector_page %u~%u\n", off, len);
+ BUG_ON(len < PAGE_CACHE_SIZE);
+
+ /* leading partial page? */
+ if (off & ~PAGE_CACHE_MASK) {
+ dout("zeroing %d %p head from %d\n", i, pages[i],
+ (int)(off & ~PAGE_CACHE_MASK));
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+ zero_user_segment(pages[i], off & ~PAGE_CACHE_MASK,
+ PAGE_CACHE_SIZE);
+#else
+ zero_user_page(pages[i], off & ~PAGE_CACHE_MASK,
+ PAGE_CACHE_SIZE - (off & ~PAGE_CACHE_MASK),
+ KM_USER0);
+#endif
+ off += PAGE_CACHE_SIZE;
+ off &= PAGE_CACHE_MASK;
+ i++;
+ }
+ while (len >= PAGE_CACHE_SIZE) {
+ dout("zeroing %d %p\n", i, pages[i]);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+ zero_user_segment(pages[i], 0, PAGE_CACHE_SIZE);
+#else
+ zero_user_page(pages[i], 0, PAGE_CACHE_SIZE, KM_USER0);
+#endif
+ off += PAGE_CACHE_SIZE;
+ len -= PAGE_CACHE_SIZE;
+ i++;
+ }
+ /* trailing partial page? */
+ if (len) {
+ dout("zeroing %d %p tail to %d\n", i, pages[i], (int)len);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+ zero_user_segment(pages[i], 0, len);
+#else
+ zero_user_page(pages[i], 0, PAGE_CACHE_SIZE - len, KM_USER0);
+#endif
+ }
+}
+
+
+/*
+ * Read a range of bytes striped over one or more objects. Iterate over
+ * objects we stripe over. (That's not atomic, but good enough for now.)
+ *
+ * If @fill, zero any regions that are holes or past object EOF on
+ * disk.
+ */
+static int striped_read(struct inode *inode,
+ u64 off, u64 len,
+ struct page **pages, int num_pages,
+ bool fill)
+{
+ struct ceph_client *client = ceph_inode_to_client(inode);
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ u64 pos, this_len;
+ int page_off = off & ~PAGE_CACHE_SIZE; /* first byte's offset in page */
+ int left, pages_left;
+ int read;
+ struct page **page_pos;
+ int ret;
+ bool was_short;
+
+ /*
+ * we may need to do multiple reads. not atomic, unfortunately.
+ */
+ pos = off;
+ left = len;
+ page_pos = pages;
+ pages_left = num_pages;
+ read = 0;
+
+more:
+ this_len = left;
+ ret = ceph_osdc_readpages(&client->osdc, ceph_vino(inode),
+ &ci->i_layout, pos, &this_len,
+ ci->i_truncate_seq,
+ ci->i_truncate_size,
+ page_pos, pages_left);
+ was_short = this_len < left;
+ if (ret == -ENOENT)
+ ret = 0;
+ dout("striped_read %llu~%u (read %u) got %d%s\n", pos, left, read, ret,
+ was_short ? " SHORT" : "");
+
+ if (ret > 0) {
+ int didpages =
+ ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
+
+ if (read < pos - off) {
+ dout(" zero gap %llu to %llu\n", off + read, pos);
+ zero_page_vector_range(page_off + read,
+ pos - off - read, pages);
+ }
+ pos += ret;
+ read = pos - off;
+ left -= ret;
+ if (left && was_short) {
+ page_pos += didpages;
+ pages_left -= didpages;
+ goto more;
+ }
+ if (fill) {
+ dout("zero tail\n");
+ zero_page_vector_range(page_off + read,
+ len - read, pages);
+ }
+ }
+ if (ret >= 0)
+ ret = read;
+ dout("striped_read returns %d\n", ret);
+ return ret;
+}
+
/*
* Completely synchronous read and write methods. Direct from __user
* buffer to osd, or directly to user pages (if O_DIRECT).
* If the read spans object boundary, just do multiple reads.
*/
static ssize_t ceph_sync_read(struct file *file, char __user *data,
- unsigned left, loff_t *offset)
+ unsigned len, loff_t *poff)
{
struct inode *inode = file->f_dentry->d_inode;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_client *client = ceph_inode_to_client(inode);
- long long unsigned start_off = *offset;
- long long unsigned pos = start_off;
- struct page **pages, **page_pos;
- int num_pages = calc_pages_for(start_off, left);
- int pages_left;
- int read = 0;
+ struct page **pages;
+ u64 off = *poff;
+ int num_pages = calc_pages_for(off, len);
int ret;
- dout("sync_read on file %p %llu~%u %s\n", file, start_off, left,
+ dout("sync_read on file %p %llu~%u %s\n", file, off, len,
(file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
if (file->f_flags & O_DIRECT) {
- pages = get_direct_page_vector(data, num_pages, pos, left);
+ pages = get_direct_page_vector(data, num_pages, off, len);
/*
* flush any page cache pages in this range. this
if (IS_ERR(pages))
return PTR_ERR(pages);
- /*
- * we may need to do multiple reads. not atomic, unfortunately.
- */
- page_pos = pages;
- pages_left = num_pages;
+ ret = striped_read(inode, off, len, pages, num_pages,
+ file->f_flags & O_DIRECT);
-more:
- ret = ceph_osdc_readpages(&client->osdc, ceph_vino(inode),
- &ci->i_layout,
- pos, left, ci->i_truncate_seq,
- ci->i_truncate_size,
- page_pos, pages_left,
- file->f_flags & O_DIRECT);
- if (ret > 0) {
- int didpages =
- ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
-
- pos += ret;
- read += ret;
- left -= ret;
- if (left) {
- page_pos += didpages;
- pages_left -= didpages;
- goto more;
- }
- }
- if (ret >= 0) {
- ret = copy_page_vector_to_user(pages, data, start_off, read);
- if (ret >= 0) {
- *offset = start_off + read;
- ret = read;
- }
- }
+ if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
+ ret = copy_page_vector_to_user(pages, data, off, ret);
+ if (ret >= 0)
+ *poff = off + ret;
if (file->f_flags & O_DIRECT)
put_page_vector(pages, num_pages);
else
ceph_release_page_vector(pages, num_pages);
- dout("sync_read read %d result %d\n", read, ret);
+ dout("sync_read result %d\n", ret);
return ret;
}
}
/*
- * Read some contiguous pages. If @fill, return number of bytes read/zeroed
- * (i.e., the range we tried to read). If not @fill, return the number of
- * bytes actually read. (We do this because mapping readpages and O_DIRECT
- * reads should zero out the full extent, but regular sync reads don't care.)
+ * Read some contiguous pages. If we cross a stripe boundary, shorten
+ * *plen. Return number of bytes read, or error.
*/
int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino, struct ceph_file_layout *layout,
- u64 off, u64 len,
+ u64 off, u64 *plen,
u32 truncate_seq, u64 truncate_size,
- struct page **pages, int num_pages,
- int fill)
+ struct page **pages, int num_pages)
{
struct ceph_osd_request *req;
- int i;
- struct page *page;
- int rc = 0, read = 0;
+ int rc = 0;
dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
- vino.snap, off, len);
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+ vino.snap, off, *plen);
+ req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
NULL, 0, truncate_seq, truncate_size, NULL,
false);
/* it may be a short read due to an object boundary */
req->r_pages = pages;
- num_pages = calc_pages_for(off, len);
+ num_pages = calc_pages_for(off, *plen);
req->r_num_pages = num_pages;
- dout("readpages final extent is %llu~%llu (%d pages)\n",
- off, len, req->r_num_pages);
+ dout("readpages final extent is %llu~%llu (%d pages)\n",
+ off, *plen, req->r_num_pages);
rc = ceph_osdc_start_request(osdc, req, false);
if (!rc)
rc = ceph_osdc_wait_request(osdc, req);
- if (rc >= 0) {
- read = rc;
- if (fill)
- rc = len;
- } else if (rc == -ENOENT) {
- if (fill)
- rc = len;
- }
-
- /* zero trailing pages on success? */
- if (fill && read < (num_pages << PAGE_CACHE_SHIFT)) {
- if (read & ~PAGE_CACHE_MASK) {
- i = read >> PAGE_CACHE_SHIFT;
- page = pages[i];
- dout("readpages zeroing %d %p from %d\n", i, page,
- (int)(read & ~PAGE_CACHE_MASK));
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
- zero_user_segment(page, read & ~PAGE_CACHE_MASK,
- PAGE_CACHE_SIZE);
-#else
- zero_user_page(page, read & ~PAGE_CACHE_MASK,
- PAGE_CACHE_SIZE - (read & ~PAGE_CACHE_MASK),
- KM_USER0);
-#endif
- read += PAGE_CACHE_SIZE;
- }
- for (i = read >> PAGE_CACHE_SHIFT; i < num_pages; i++) {
- page = req->r_pages[i];
- dout("readpages zeroing %d %p\n", i, page);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
- zero_user_segment(page, 0, PAGE_CACHE_SIZE);
-#else
- zero_user_page(page, 0, PAGE_CACHE_SIZE, KM_USER0);
-#endif
- }
- }
-
ceph_osdc_put_request(req);
dout("readpages result %d\n", rc);
return rc;