pagevec_release(&pvec);
}
+
/*
- * ceph_writepages:
- * do write jobs for several pages
+ * writeback completion handler.
*/
-static int ceph_writepages(struct address_space *mapping,
- struct writeback_control *wbc)
+static void writepages_finish(struct ceph_osd_request *req)
+{
+ struct inode *inode = req->r_inode;
+ struct ceph_osd_reply_head *replyhead;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ unsigned wrote;
+ loff_t offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
+ struct page *page;
+ int i;
+ struct ceph_snap_context *snapc = req->r_snapc;
+ struct address_space *mapping = inode->i_mapping;
+ struct writeback_control *wbc = req->r_wbc;
+ __s32 rc = -EIO;
+ __u64 bytes = 0;
+
+ /* parse reply */
+ if (req->r_reply) {
+ replyhead = req->r_reply->front.iov_base;
+ rc = le32_to_cpu(replyhead->result);
+ bytes = le32_to_cpu(replyhead->length);
+ }
+ if (rc >= 0)
+ wrote = (bytes + (offset & ~PAGE_CACHE_MASK) + ~PAGE_CACHE_MASK)
+ >> PAGE_CACHE_SHIFT;
+ else
+ wrote = 0;
+ dout(10, "writepages_finish rc %d bytes %llu wrote %d (pages)\n", rc,
+ bytes, wrote);
+
+ /* clean or redirty pages */
+ for (i = 0; i < req->r_num_pages; i++) {
+ page = req->r_pages[i];
+ WARN_ON(!PageUptodate(page));
+ if (i < wrote) {
+ dout(20, "%p cleaning %p\n", inode, page);
+ page->private = 0;
+ ClearPagePrivate(page);
+ ceph_put_snap_context(snapc);
+ } else {
+ dout(20, "%p redirtying %p\n", inode, page);
+ ceph_redirty_page(mapping, page);
+ wbc->pages_skipped++;
+ }
+ dout(50, "unlocking %d %p\n", i, page);
+ end_page_writeback(page);
+ unlock_page(page);
+ }
+ dout(20, "%p wrote+cleaned %d pages\n", inode, wrote);
+ ceph_put_wrbuffer_cap_refs(ci, wrote, snapc);
+
+ ceph_release_pages(req->r_pages, req->r_num_pages);
+ ceph_osdc_put_request(req);
+}
+
+/*
+ * initiate async writeback
+ */
+static int ceph_writepages_start(struct address_space *mapping,
+ struct writeback_control *wbc)
{
struct inode *inode = mapping->host;
+ struct backing_dev_info *bdi = mapping->backing_dev_info;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_client *client = ceph_inode_to_client(inode);
pgoff_t index, start, end;
int range_whole = 0;
int should_loop = 1;
- struct page **pages;
- pgoff_t max_pages = 0;
+ struct page **pages = 0;
+ pgoff_t max_pages = 0, max_pages_ever = 0;
struct ceph_snap_context *snapc = 0, *last_snapc = 0;
struct pagevec pvec;
int done = 0;
dout(10, "writepages on %p, wsize %u\n", inode, wsize);
/* larger page vector? */
- max_pages = wsize >> PAGE_CACHE_SHIFT;
- pages = kmalloc(max_pages * sizeof(*pages), GFP_NOFS);
- if (!pages)
- return generic_writepages(mapping, wbc);
+ max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
pagevec_init(&pvec, 0);
- /* ?? from cifs. */
- /*
+ /* ?? */
if (wbc->nonblocking && bdi_write_congested(bdi)) {
- wbc->encountered_congestions = 1;
+ dout(10, "writepages congested\n");
+ wbc->encountered_congestion = 1;
return 0;
}
- */
/* where to start/end? */
if (wbc->range_cyclic) {
struct page *page;
int want;
loff_t offset, len;
- unsigned wrote;
+ struct ceph_osd_request *req;
+ req = 0;
next = 0;
locked_pages = 0;
+ max_pages = max_pages_ever;
get_more_pages:
first = -1;
want = min(end - index,
min((pgoff_t)PAGEVEC_SIZE,
- max_pages - (pgoff_t)locked_pages) - 1) + 1;
+ max_pages - (pgoff_t)locked_pages) - 1)
+ + 1;
pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
PAGECACHE_TAG_DIRTY,
want);
for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
page = pvec.pages[i];
dout(20, "? %p idx %lu\n", page, page->index);
- if (locked_pages == 0)
+ if (locked_pages == 0) {
lock_page(page);
+
+ /* alloc request */
+ offset = page->index << PAGE_CACHE_SHIFT;
+ req = ceph_osdc_new_request(&client->osdc,
+ &ci->i_layout,
+ ceph_vino(inode),
+ offset, wsize,
+ CEPH_OSD_OP_WRITE,
+ snapc);
+ max_pages = req->r_num_pages;
+ pages = req->r_pages;
+ req->r_callback = writepages_finish;
+ req->r_inode = inode;
+ req->r_wbc = wbc;
+ }
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,27)
else if (!trylock_page(page))
#else
(loff_t)locked_pages << PAGE_CACHE_SHIFT);
dout(10, "writepages got %d pages at %llu~%llu\n",
locked_pages, offset, len);
- rc = ceph_osdc_writepages(&client->osdc,
- ceph_vino(inode),
- &ci->i_layout,
- snapc,
- offset, len,
- pages,
- locked_pages);
- if (rc >= 0)
- wrote = (rc + (offset & ~PAGE_CACHE_MASK)
- + ~PAGE_CACHE_MASK)
- >> PAGE_CACHE_SHIFT;
- else
- wrote = 0;
- dout(20, "writepages rc %d wrote %d\n", rc, wrote);
-
- /* clean or redirty pages */
- for (i = 0; i < locked_pages; i++) {
- page = pages[i];
- WARN_ON(!PageUptodate(page));
- if (i < wrote) {
- dout(20, "%p cleaning %p\n", inode, page);
- page->private = 0;
- ClearPagePrivate(page);
- ceph_put_snap_context(snapc);
- } else {
- dout(20, "%p redirtying %p\n", inode, page);
- ceph_redirty_page(mapping, page);
- wbc->pages_skipped++;
- }
- dout(50, "unlocking %d %p\n", i, page);
- end_page_writeback(page);
- unlock_page(page);
- }
- dout(20, "%p wrote+cleaned %d pages\n", inode, wrote);
- ceph_put_wrbuffer_cap_refs(ci, wrote, snapc);
+ rc = ceph_osdc_writepages_start(&client->osdc, req,
+ len, locked_pages);
/* continue? */
index = next;
done = 1;
release_pages:
- /* hmm, pagevec_release also does lru_add_drain()...? */
- dout(50, "release_pages on %d\n", locked_pages);
- ceph_release_pages(pages, locked_pages);
-
dout(50, "pagevec_release on %d pages (%p)\n", (int)pvec.nr,
pvec.nr ? pvec.pages[0] : 0);
pagevec_release(&pvec);
mapping->writeback_index = index;
out:
- kfree(pages);
if (rc > 0)
rc = 0; /* vfs expects us to return 0 */
ceph_put_snap_context(snapc);
.readpage = ceph_readpage,
.readpages = ceph_readpages,
.writepage = ceph_writepage,
- .writepages = ceph_writepages,
+ .writepages = ceph_writepages_start,
.write_begin = ceph_write_begin,
.write_end = ceph_write_end,
.set_page_dirty = ceph_set_page_dirty_vfs,
/*
- * vm ops
+ * vm ops
*/
static int ceph_page_mkwrite(struct vm_area_struct *vma, struct page *page)
};
+
+
+/*
+ * calculate the mapping of an extent onto an object, and fill out the
+ * request accordingly. shorten extent as necessary if it hits an
+ * object boundary.
+ */
+static __u64 calc_layout(struct ceph_osd_client *osdc,
+ struct ceph_vino vino, struct ceph_file_layout *layout,
+ __u64 off, __u64 len,
+ struct ceph_osd_request *req)
+{
+ struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+ __u64 toff = off, tlen = len;
+
+ reqhead->oid.ino = vino.ino;
+ reqhead->oid.snap = vino.snap;
+
+ calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid,
+ &off, &len);
+ if (tlen != 0)
+ dout(10, " skipping last %llu, writing %llu~%llu\n",
+ tlen, off, len);
+ reqhead->offset = cpu_to_le64(off);
+ reqhead->length = cpu_to_le64(len);
+
+ calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+ osdc->osdmap);
+ req->r_num_pages = calc_pages_for(off, len);
+
+ dout(10, "calc_layout bno %u on %llu~%llu pgid %llx (%d pages)\n",
+ le32_to_cpu(reqhead->oid.bno), off, len,
+ le64_to_cpu(reqhead->layout.ol_pgid),
+ req->r_num_pages);
+
+ return len;
+}
+
+
/*
* requests
*/
atomic_inc(&req->r_ref);
}
-static void put_request(struct ceph_osd_request *req)
+void ceph_osdc_put_request(struct ceph_osd_request *req)
{
dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
atomic_read(&req->r_ref)-1);
BUG_ON(atomic_read(&req->r_ref) <= 0);
if (atomic_dec_and_test(&req->r_ref)) {
ceph_msg_put(req->r_request);
+ ceph_put_snap_context(req->r_snapc);
kfree(req);
}
}
+
+
struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
struct ceph_snap_context *snapc)
{
return req;
}
-static struct ceph_osd_request *alloc_request(int num_pages,
- struct ceph_msg *msg)
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+ struct ceph_file_layout *layout,
+ struct ceph_vino vino,
+ __u64 off, __u64 len, int op,
+ struct ceph_snap_context *snapc)
{
struct ceph_osd_request *req;
+ struct ceph_msg *msg;
+ int num_pages = len >> PAGE_CACHE_SHIFT;
+
+ BUG_ON(vino.snap != CEPH_NOSNAP);
req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
+
+ msg = new_request_msg(osdc, op, snapc);
+ if (IS_ERR(msg)) {
+ kfree(req);
+ return ERR_PTR(PTR_ERR(msg));
+ }
req->r_request = msg;
- req->r_num_pages = num_pages;
+ req->r_snapc = ceph_get_snap_context(snapc);
+
+ /* calculate max write size */
+ calc_layout(osdc, vino, layout, off, len, req);
+
atomic_set(&req->r_ref, 1);
return req;
}
{
int timeout = osdc->client->mount_args.osd_timeout;
dout(10, "reschedule timeout (%d seconds)\n", timeout);
- schedule_delayed_work(&osdc->timeout_work,
+ schedule_delayed_work(&osdc->timeout_work,
round_jiffies_relative(timeout*HZ));
}
get_request(req);
rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
- if (osdc->nr_requests == 0)
+ if (osdc->nr_requests == 0)
reschedule_timeout(osdc);
osdc->nr_requests++;
spin_unlock(&osdc->request_lock);
radix_tree_preload_end();
-
+
return rc;
}
-static void unregister_request(struct ceph_osd_client *osdc,
+static void __unregister_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
- dout(30, "unregister_request %p tid %lld\n", req, req->r_tid);
- spin_lock(&osdc->request_lock);
+ dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
radix_tree_delete(&osdc->request_tree, req->r_tid);
osdc->nr_requests--;
if (osdc->nr_requests)
reschedule_timeout(osdc);
- spin_unlock(&osdc->request_lock);
- put_request(req);
+ ceph_osdc_put_request(req);
}
/*
int pps; /* placement ps */
int osds[10];
int nr_osds;
-
+
ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool,
req->r_pgid.pg.type, req->r_pgid.pg.size);
if (ruleno < 0) {
derr(0, "pick_osd no crush rule for pool %d type %d size %d\n",
- req->r_pgid.pg.pool, req->r_pgid.pg.type,
+ req->r_pgid.pg.pool, req->r_pgid.pg.type,
req->r_pgid.pg.size);
return -1;
}
req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
req->r_last_osd = req->r_request->hdr.dst.addr;
-
+
ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_msg_send(osdc->client->msgr, req->r_request, 0);
}
dout(10, "handle_reply tid %llu flags %d |= %d\n", tid, req->r_flags,
rhead->flags);
req->r_flags |= rhead->flags;
+ __unregister_request(osdc, req);
spin_unlock(&osdc->request_lock);
- complete(&req->r_completion);
+
+ if (req->r_callback)
+ req->r_callback(req);
+ else
+ complete(&req->r_completion); /* see do_sync_request */
done:
- put_request(req);
+ ceph_osdc_put_request(req);
return;
bad:
int got;
int osd;
int ret = 0;
-
+
more:
spin_lock(&osdc->request_lock);
more_locked:
- got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+ got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
next_tid, 1);
if (got == 0)
goto done;
req->r_flags |= CEPH_OSD_OP_RETRY;
send_request(osdc, req, osd);
}
- put_request(req);
+ ceph_osdc_put_request(req);
goto more;
}
goto more_locked;
return ret;
}
+
+void start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+{
+ register_request(osdc, req);
+ down_read(&osdc->map_sem);
+ send_request(osdc, req, -1);
+ up_read(&osdc->map_sem);
+}
+
/*
* synchronously do an osd request.
*/
-int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
{
struct ceph_osd_reply_head *replyhead;
__s32 rc;
int bytes;
- /* register+send request */
- register_request(osdc, req);
- down_read(&osdc->map_sem);
- send_request(osdc, req, -1);
- up_read(&osdc->map_sem);
-
+ start_request(osdc, req); /* register+send request */
rc = wait_for_completion_interruptible(&req->r_completion);
-
- unregister_request(osdc, req);
if (rc < 0) {
struct ceph_msg *msg;
dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
rc, req->r_request);
- /*
+ /*
* mark req aborted _before_ revoking pages, so that
* if a racing kick_request _does_ dup the page vec
* pointer, it will definitely then see the aborted
replyhead = req->r_reply->front.iov_base;
rc = le32_to_cpu(replyhead->result);
bytes = le32_to_cpu(req->r_reply->hdr.data_len);
- dout(10, "do_request tid %llu result %d, %d bytes\n",
+ dout(10, "do_sync_request tid %llu result %d, %d bytes\n",
req->r_tid, rc, bytes);
if (rc < 0)
return rc;
void handle_timeout(struct work_struct *work)
{
- struct ceph_osd_client *osdc =
+ struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
dout(10, "timeout\n");
down_read(&osdc->map_sem);
-/*
- * calculate the mapping of an extent onto an object, and fill out the
- * request accordingly. shorten extent as necessary if it hits an
- * object boundary.
- */
-static __u64 calc_layout(struct ceph_osd_client *osdc,
- struct ceph_vino vino, struct ceph_file_layout *layout,
- __u64 off, __u64 len,
- struct ceph_osd_request *req)
-{
- struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- __u64 toff = off, tlen = len;
-
- reqhead->oid.ino = vino.ino;
- reqhead->oid.snap = vino.snap;
-
- calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid,
- &off, &len);
- if (tlen != 0)
- dout(10, " skipping last %llu, writing %llu~%llu\n",
- tlen, off, len);
- reqhead->offset = cpu_to_le64(off);
- reqhead->length = cpu_to_le64(len);
-
- calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
- osdc->osdmap);
-
- dout(10, "calc_layout bno %u on %llu~%llu pgid %llx\n",
- le32_to_cpu(reqhead->oid.bno), off, len,
- le64_to_cpu(reqhead->layout.ol_pgid));
-
- return len;
-}
-
/*
* synchronous read direct to user buffer.
*
- * FIXME: if read spans object boundary, just do two separate reads.
+ * if read spans object boundary, just do two separate reads. FIXME:
* for a correct atomic read, we should take read locks on all
* objects.
*/
{
struct ceph_msg *reqm;
struct ceph_osd_request *req;
- int num_pages, i, po, left, l;
+ int i, po, left, l;
__s32 rc;
int finalrc = 0;
u64 rlen;
vino.snap, off, len);
more:
- /* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
-
- num_pages = calc_pages_for(off, len);
- req = alloc_request(num_pages, reqm);
+ req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+ CEPH_OSD_OP_READ, 0);
if (IS_ERR(req))
return PTR_ERR(req);
+ reqm = req->r_request;
rlen = calc_layout(osdc, vino, layout, off, len, req);
- num_pages = calc_pages_for(off, rlen); /* recalc */
- dout(10, "sync_read %llu~%llu -> %d pages\n", off, rlen, num_pages);
+ dout(10, "sync_read %llu~%llu -> %d pages\n", off, rlen,
+ req->r_num_pages);
/* allocate temp pages to hold data */
- for (i = 0; i < num_pages; i++) {
+ for (i = 0; i < req->r_num_pages; i++) {
req->r_pages[i] = alloc_page(GFP_NOFS);
if (req->r_pages[i] == NULL) {
req->r_num_pages = i+1;
- put_request(req);
+ ceph_osdc_put_request(req);
return -ENOMEM;
}
}
- reqm->nr_pages = num_pages;
+ reqm->nr_pages = req->r_num_pages;
reqm->pages = req->r_pages;
reqm->hdr.data_len = cpu_to_le32(rlen);
reqm->hdr.data_off = cpu_to_le32(off);
- rc = do_request(osdc, req);
+ rc = do_sync_request(osdc, req);
if (rc > 0) {
/* copy into user buffer */
po = off & ~PAGE_CACHE_MASK;
}
}
out:
- put_request(req);
+ ceph_osdc_put_request(req);
if (rc > 0) {
finalrc += rc;
off += rc;
vino.snap, off, len);
/* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
- reqhead = reqm->front.iov_base;
-
- req = alloc_request(1, reqm);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
+ req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+ CEPH_OSD_OP_READ, 0);
+ if (IS_ERR(req))
return PTR_ERR(req);
- }
+ reqm = req->r_request;
+ reqhead = reqm->front.iov_base;
req->r_pages[0] = page;
len = calc_layout(osdc, vino, layout, off, len, req);
BUG_ON(len != PAGE_CACHE_SIZE);
- rc = do_request(osdc, req);
- put_request(req);
+ rc = do_sync_request(osdc, req);
+ ceph_osdc_put_request(req);
dout(10, "readpage result %d\n", rc);
if (rc == -ENOENT)
rc = 0; /* object page dne; zero it */
vino.snap, off, len);
/* alloc request, w/ optimistically-sized page vector */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
- req = alloc_request(num_pages, reqm);
- if (req == 0) {
- ceph_msg_put(reqm);
- return -ENOMEM;
- }
+ req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+ CEPH_OSD_OP_READ, 0);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+ reqm = req->r_request;
/* find adjacent pages */
next_index = list_entry(page_list->prev, struct page, lru)->index;
/* request msg */
len = calc_layout(osdc, vino, layout, off, len, req);
- req->r_num_pages = calc_pages_for(off, len);
dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
off, len, req->r_num_pages);
- rc = do_request(osdc, req);
+ rc = do_sync_request(osdc, req);
out:
- put_request(req);
+ ceph_osdc_put_request(req);
dout(10, "readpages result %d\n", rc);
if (rc < 0)
dout(10, "hrm!\n");
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
- int num_pages, i, po, l, left;
+ int i, po, l, left;
__s32 rc;
u64 rlen;
int finalrc = 0;
vino.snap, off, len);
more:
- /* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
+ req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+ CEPH_OSD_OP_WRITE, snapc);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+ reqm = req->r_request;
reqhead = reqm->front.iov_base;
reqhead->flags = CEPH_OSD_OP_ACK | /* just ack for now... FIXME */
CEPH_OSD_OP_ORDERSNAP; /* get EOLDSNAPC if out of order */
- /* how many pages? */
- num_pages = calc_pages_for(off, len);
- req = alloc_request(num_pages, reqm);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
- return PTR_ERR(req);
- }
-
rlen = calc_layout(osdc, vino, layout, off, len, req);
- num_pages = calc_pages_for(off, rlen); /* recalc */
- dout(10, "sync_write %llu~%llu -> %d pages\n", off, rlen, num_pages);
+ dout(10, "sync_write %llu~%llu -> %d pages\n", off, rlen,
+ req->r_num_pages);
/* copy data into a set of pages */
left = rlen;
po = off & ~PAGE_MASK;
- for (i = 0; i < num_pages; i++) {
+ for (i = 0; i < req->r_num_pages; i++) {
int bad;
req->r_pages[i] = alloc_page(GFP_NOFS);
if (req->r_pages[i] == NULL) {
}
}
reqm->pages = req->r_pages;
- reqm->nr_pages = num_pages;
+ reqm->nr_pages = req->r_num_pages;
reqm->hdr.data_len = cpu_to_le32(rlen);
reqm->hdr.data_off = cpu_to_le32(off);
- rc = do_request(osdc, req);
+ rc = do_sync_request(osdc, req);
out:
for (i = 0; i < req->r_num_pages; i++)
__free_pages(req->r_pages[i], 0);
- put_request(req);
+ ceph_osdc_put_request(req);
if (rc == 0) {
finalrc += rlen;
off += rlen;
}
/*
- * do a write job for N pages
+ * do a sync write for N pages
*/
int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
BUG_ON(vino.snap != CEPH_NOSNAP);
/* request + msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
- req = alloc_request(num_pages, reqm);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
+ req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+ CEPH_OSD_OP_WRITE, snapc);
+ if (IS_ERR(req))
return PTR_ERR(req);
- }
-
+ reqm = req->r_request;
reqhead = reqm->front.iov_base;
if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
reqhead->flags = CEPH_OSD_OP_ACK;
else
reqhead->flags = CEPH_OSD_OP_SAFE;
- len = calc_layout(osdc, vino, layout, off, len, req);
- num_pages = calc_pages_for(off, len);
- dout(10, "writepages %llu~%llu -> %d pages\n", off, len, num_pages);
-
+ len = le64_to_cpu(reqhead->length);
+ dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
+ req->r_num_pages);
+
/* copy pages */
- memcpy(req->r_pages, pages, num_pages * sizeof(struct page *));
+ memcpy(req->r_pages, pages, req->r_num_pages * sizeof(struct page *));
reqm->pages = req->r_pages;
- reqm->nr_pages = req->r_num_pages = num_pages;
+ reqm->nr_pages = req->r_num_pages;
reqm->hdr.data_len = len;
reqm->hdr.data_off = off;
- rc = do_request(osdc, req);
- put_request(req);
+ rc = do_sync_request(osdc, req);
+ ceph_osdc_put_request(req);
if (rc == 0)
rc = len;
dout(10, "writepages result %d\n", rc);
return rc;
}
+/*
+ * start an async multipage write
+ */
+int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req,
+ __u64 len, int num_pages)
+{
+ struct ceph_msg *reqm = req->r_request;
+ struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
+ __u64 off = le64_to_cpu(reqhead->offset);
+
+ dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
+
+ if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
+ reqhead->flags = CEPH_OSD_OP_ACK;
+ else
+ reqhead->flags = CEPH_OSD_OP_SAFE;
+ reqhead->length = cpu_to_le64(len);
+
+ /* reference pages in message */
+ reqm->pages = req->r_pages;
+ reqm->nr_pages = req->r_num_pages = num_pages;
+ reqm->hdr.data_len = len;
+ reqm->hdr.data_off = off;
+
+ start_request(osdc, req);
+ return 0;
+}