From: Sage Weil Date: Mon, 16 Mar 2009 21:56:18 +0000 (-0700) Subject: kclient: refactor osdc write; handle dual ack/commit, and fsync on sync writes X-Git-Tag: v0.7.2~163 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2c933dcab4665b6e12b1ac4137871316e9806eaa;p=ceph.git kclient: refactor osdc write; handle dual ack/commit, and fsync on sync writes For sync_writes that are not O_SYNC or O_DIRECT, we get two acks back from the OSD, one signalling serialization and one when the write is safe on disk. The client needs to keep the request message around until the commit in case the OSD fails and it needs to be replayed. However, the calling thread should continue after the ACK. Refactor the osdc interface so that sync_write() can do this for non SYNC|DIRECT writes. --- diff --git a/src/kernel/addr.c b/src/kernel/addr.c index 1101622043577..90989c38c77b6 100644 --- a/src/kernel/addr.c +++ b/src/kernel/addr.c @@ -641,6 +641,8 @@ retry: struct page *page; int want; u64 offset, len; + struct ceph_osd_request_head *reqhead; + struct ceph_osd_op *op; next = 0; locked_pages = 0; @@ -730,7 +732,9 @@ get_more_pages: &ci->i_layout, ceph_vino(inode), offset, &len, - CEPH_OSD_OP_WRITE, 0, + CEPH_OSD_OP_WRITE, + CEPH_OSD_OP_MODIFY | + CEPH_OSD_OP_ONDISK, snapc, do_sync, ci->i_truncate_seq, ci->i_truncate_size); @@ -787,8 +791,15 @@ get_more_pages: (u64)locked_pages << PAGE_CACHE_SHIFT); dout(10, "writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - rc = ceph_osdc_writepages_start(&client->osdc, req, - len, locked_pages); + + /* revise final length, page count */ + req->r_num_pages = locked_pages; + reqhead = req->r_request->front.iov_base; + op = (void *)(reqhead + 1); + op->length = cpu_to_le64(len); + req->r_request->hdr.data_len = cpu_to_le32(len); + + rc = ceph_osdc_start_request(&client->osdc, req); req = NULL; /* * FIXME: if writepages_start fails (ENOMEM?) we should diff --git a/src/kernel/caps.c b/src/kernel/caps.c index 293a6e34c0ef6..52ca2165a283c 100644 --- a/src/kernel/caps.c +++ b/src/kernel/caps.c @@ -1096,6 +1096,17 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, ceph_get_cap_refs(ci, need, want, got, endoff)); } +/* + * Take cap refs. Caller must already now we hold at least on ref on + * the caps in question or we don't know this is safe. + */ +void ceph_get_more_cap_refs(struct ceph_inode_info *ci, int caps) +{ + spin_lock(&ci->vfs_inode.i_lock); + __take_cap_refs(ci, caps); + spin_unlock(&ci->vfs_inode.i_lock); +} + /* * Release cap refs. * diff --git a/src/kernel/file.c b/src/kernel/file.c index 5d9266cbfac65..d027d71fa9d90 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -241,7 +241,7 @@ static void put_page_vector(struct page **pages, int num_pages) kfree(pages); } -static void release_page_vector(struct page **pages, int num_pages) +void ceph_release_page_vector(struct page **pages, int num_pages) { int i; @@ -261,7 +261,7 @@ static struct page **alloc_page_vector(int num_pages) for (i = 0; i < num_pages; i++) { pages[i] = alloc_page(GFP_NOFS); if (pages[i] == NULL) { - release_page_vector(pages, i); + ceph_release_page_vector(pages, i); return ERR_PTR(-ENOMEM); } } @@ -399,10 +399,67 @@ more: if (file->f_flags & O_DIRECT) put_page_vector(pages, num_pages); else - release_page_vector(pages, num_pages); + ceph_release_page_vector(pages, num_pages); return ret; } +/* + * Write commit callback, called if we requested both an ACK and + * ONDISK commit reply from the OSD. + */ +static void sync_write_commit(struct ceph_osd_request *req) +{ + struct ceph_inode_info *ci = ceph_inode(req->r_inode); + + dout(10, "sync_write_commit %p tid %llu\n", req, req->r_tid); + spin_lock(&ci->i_listener_lock); + list_del_init(&req->r_unsafe_item); + spin_unlock(&ci->i_listener_lock); + ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); +} + +/* + * Wait on any unsafe replies for the given inode. First wait on the + * newest request, and make that the upper bound. Then, if there are + * more requests, keep waiting on the oldest as long as it is still older + * than the original request. + */ +static void sync_write_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct list_head *head = &ci->i_unsafe_writes; + struct ceph_osd_request *req; + u64 last_tid; + + spin_lock(&ci->i_listener_lock); + if (list_empty(head)) + goto out; + + /* set upper bound as _last_ entry in chain */ + req = list_entry(head->prev, struct ceph_osd_request, + r_unsafe_item); + last_tid = req->r_tid; + + do { + ceph_osdc_get_request(req); + spin_unlock(&ci->i_listener_lock); + dout(10, "sync_write_wait on tid %llu (until %llu)\n", + req->r_tid, last_tid); + wait_for_completion(&req->r_safe_completion); + spin_lock(&ci->i_listener_lock); + ceph_osdc_put_request(req); + + /* + * from here on look at first entry in chain, since we + * only want to wait for anything older than last_tid + */ + req = list_entry(head->next, struct ceph_osd_request, + r_unsafe_item); + } while (req->r_tid <= last_tid); +out: + spin_unlock(&ci->i_listener_lock); +} + /* * synchronous write. from userspace. * @@ -416,9 +473,11 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, struct inode *inode = file->f_dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *client = ceph_inode_to_client(inode); - struct page **pages, **page_pos; - int num_pages, pages_left; + struct ceph_osd_request *req; + struct page **pages; + int num_pages; long long unsigned pos; + u64 len; int written = 0; int flags; int do_sync = 0; @@ -434,73 +493,100 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, pos = i_size_read(inode); else pos = *offset; - num_pages = calc_pages_for(pos, left); + + flags = CEPH_OSD_OP_ORDERSNAP | + CEPH_OSD_OP_ONDISK | + CEPH_OSD_OP_MODIFY; + if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) + flags |= CEPH_OSD_OP_ACK; + else + do_sync = 1; + + /* + * we may need to do multiple writes here if we span an object + * boundary. this isn't atomic, unfortunately. :( + */ +more: + len = left; + req = ceph_osdc_new_request(&client->osdc, &ci->i_layout, + ceph_vino(inode), pos, &len, + CEPH_OSD_OP_WRITE, flags, + ci->i_snap_realm->cached_context, + do_sync, + ci->i_truncate_seq, ci->i_truncate_size); + if (IS_ERR(req)) + return PTR_ERR(req); + + num_pages = calc_pages_for(pos, len); if (file->f_flags & O_DIRECT) { - pages = get_direct_page_vector(data, num_pages, pos, left); - if (IS_ERR(pages)) - return PTR_ERR(pages); + pages = get_direct_page_vector(data, num_pages, pos, len); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out; + } /* * throw out any page cache pages in this range. this * may block. */ - truncate_inode_pages_range(inode->i_mapping, pos, pos+left); + truncate_inode_pages_range(inode->i_mapping, pos, pos+len); } else { pages = alloc_page_vector(num_pages); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = copy_user_to_page_vector(pages, data, pos, left); - if (ret < 0) + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); goto out; - } - - flags = CEPH_OSD_OP_ORDERSNAP; - if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) - flags |= CEPH_OSD_OP_ACK; - else - do_sync = 1; + } + ret = copy_user_to_page_vector(pages, data, pos, len); + if (ret < 0) { + ceph_release_page_vector(pages, num_pages); + goto out; + } - /* - * we may need to do multiple writes here if we span an object - * boundary. this isn't atomic, unfortunately. :( - */ - page_pos = pages; - pages_left = num_pages; + if ((file->f_flags & O_SYNC) == 0) { + /* get a second commit callback */ + req->r_safe_callback = sync_write_commit; + req->r_own_pages = 1; + } + } + req->r_pages = pages; + req->r_num_pages = num_pages; + req->r_inode = inode; + + ret = ceph_osdc_start_request(&client->osdc, req); + if (!ret) { + if (req->r_safe_callback) { + /* + * Add to inode unsafe list only after we + * start_request so that a tid has been assigned. + */ + spin_lock(&ci->i_listener_lock); + list_add(&ci->i_unsafe_writes, &req->r_unsafe_item); + spin_unlock(&ci->i_listener_lock); + ceph_get_more_cap_refs(ci, CEPH_CAP_FILE_WR); + } + ret = ceph_osdc_wait_request(&client->osdc, req); + } -more: - ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode), - &ci->i_layout, - ci->i_snap_realm->cached_context, - pos, left, ci->i_truncate_seq, - ci->i_truncate_size, - page_pos, pages_left, - flags, do_sync); - if (ret > 0) { - int didpages = - ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT; + if (file->f_flags & O_DIRECT) + put_page_vector(pages, num_pages); + else if (file->f_flags & O_SYNC) + ceph_release_page_vector(pages, num_pages); - pos += ret; - written += ret; - left -= ret; - if (left) { - page_pos += didpages; - pages_left -= didpages; - BUG_ON(!pages_left); +out: + ceph_osdc_put_request(req); + if (ret == 0) { + pos += len; + written += len; + left -= len; + if (left) goto more; - } ret = written; *offset = pos; if (pos > i_size_read(inode)) ceph_inode_set_size(inode, pos); } - -out: - if (file->f_flags & O_DIRECT) - put_page_vector(pages, num_pages); - else - release_page_vector(pages, num_pages); return ret; } @@ -509,7 +595,7 @@ out: * Atomically grab references, so that those bits are not released * back to the MDS mid-read. * - * Hmm, the sync reach case isn't actually async... should it be? + * Hmm, the sync read case isn't actually async... should it be? */ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, unsigned long nr_segs, loff_t pos) @@ -648,6 +734,8 @@ static int ceph_fsync(struct file *file, struct dentry *dentry, int datasync) int ret; dout(10, "fsync on inode %p\n", inode); + sync_write_wait(inode); + ret = filemap_write_and_wait(inode->i_mapping); if (ret < 0) return ret; diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 37fb8cedde960..555b21d751369 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -288,6 +288,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_wrbuffer_ref_head = 0; ci->i_rdcache_gen = 0; ci->i_rdcache_revoking = 0; + INIT_LIST_HEAD(&ci->i_unsafe_writes); ci->i_snap_realm = NULL; INIT_LIST_HEAD(&ci->i_snap_realm_item); diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index d38fffe644797..82283ebf27db3 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -60,11 +60,6 @@ static void calc_layout(struct ceph_osd_client *osdc, /* * requests */ -static void get_request(struct ceph_osd_request *req) -{ - atomic_inc(&req->r_ref); -} - void ceph_osdc_put_request(struct ceph_osd_request *req) { dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref), @@ -75,6 +70,9 @@ void ceph_osdc_put_request(struct ceph_osd_request *req) ceph_msg_put(req->r_request); if (req->r_reply) ceph_msg_put(req->r_reply); + if (req->r_own_pages) + ceph_release_page_vector(req->r_pages, + req->r_num_pages); ceph_put_snap_context(req->r_snapc); kfree(req); } @@ -110,6 +108,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (req == NULL) return ERR_PTR(-ENOMEM); + atomic_set(&req->r_ref, 1); + init_completion(&req->r_completion); + init_completion(&req->r_safe_completion); + INIT_LIST_HEAD(&req->r_unsafe_item); + /* create message */ if (snapc) msg_size += sizeof(u64) * snapc->num_snaps; @@ -135,10 +138,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, calc_layout(osdc, vino, layout, off, plen, req); req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid); + if (flags & CEPH_OSD_OP_MODIFY) { + req->r_request->hdr.data_off = cpu_to_le16(off); + req->r_request->hdr.data_len = cpu_to_le32(*plen); + } + /* additional ops */ if (do_trunc) { op++; - op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ? + op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ? CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC); op->truncate_seq = cpu_to_le32(truncate_seq); prevofs = le64_to_cpu((op-1)->offset); @@ -154,13 +162,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, for (i = 0; i < snapc->num_snaps; i++) snaps[i] = cpu_to_le64(snapc->snaps[i]); } - - atomic_set(&req->r_ref, 1); - init_completion(&req->r_completion); return req; } - /* * Register request, assign tid. If this is the first request, set up * the timeout event. @@ -180,7 +184,7 @@ static int register_request(struct ceph_osd_client *osdc, if (rc < 0) goto out; - get_request(req); + ceph_osdc_get_request(req); osdc->num_requests++; req->r_timeout_stamp = @@ -215,7 +219,7 @@ static void handle_timeout(struct work_struct *work) container_of(work, struct ceph_osd_client, timeout_work.work); struct ceph_osd_request *req; unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ; - unsigned long next_timeout = timeout + jiffies; + unsigned long next_timeout = timeout + jiffies; RADIX_TREE(pings, GFP_NOFS); /* only send 1 ping per osd */ u64 next_tid = 0; int got; @@ -369,6 +373,7 @@ static int send_request(struct ceph_osd_client *osdc, reqhead = req->r_request->front.iov_base; reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ + reqhead->reassert_version = req->r_reassert_version; req->r_request->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD); @@ -392,7 +397,7 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) struct ceph_osd_reply_head *rhead = msg->front.iov_base; struct ceph_osd_request *req; u64 tid; - int numops; + int numops, flags; if (msg->front.iov_len < sizeof(*rhead)) goto bad; @@ -411,27 +416,50 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) mutex_unlock(&osdc->request_mutex); return; } - get_request(req); - if (req->r_reply == NULL) { - /* no data payload, or r_reply would have been set by - prepare_pages. */ - ceph_msg_get(msg); - req->r_reply = msg; - } else if (req->r_reply == msg) { - /* r_reply was set by prepare_pages; now it's fully read. */ - } else { - dout(10, "handle_reply tid %llu already had reply?\n", tid); + ceph_osdc_get_request(req); + flags = le32_to_cpu(rhead->flags); + + if (req->r_aborted) { + dout(10, "handle_reply tid %llu aborted\n", tid); goto done; } - dout(10, "handle_reply tid %llu flags %d\n", tid, - le32_to_cpu(rhead->flags)); - __unregister_request(osdc, req); + + if (req->r_reassert_version.epoch == 0) { + /* first ack */ + if (req->r_reply == NULL) { + /* no data payload, or r_reply would have been set by + prepare_pages. */ + ceph_msg_get(msg); + req->r_reply = msg; + } else { + /* r_reply was set by prepare_pages */ + BUG_ON(req->r_reply != msg); + } + + /* in case we need to replay this op, */ + req->r_reassert_version = rhead->reassert_version; + } else if ((flags & CEPH_OSD_OP_ONDISK) == 0) { + dout(10, "handle_reply tid %llu dup ack\n", tid); + goto done; + } + + dout(10, "handle_reply tid %llu flags %d\n", tid, flags); + + if (flags & CEPH_OSD_OP_ONDISK) + __unregister_request(osdc, req); + mutex_unlock(&osdc->request_mutex); if (req->r_callback) req->r_callback(req); else - complete(&req->r_completion); /* see do_sync_request */ + complete(&req->r_completion); + + if ((flags & CEPH_OSD_OP_ONDISK) && req->r_safe_callback) { + req->r_safe_callback(req); + complete(&req->r_safe_completion); /* fsync waiter */ + } + done: ceph_osdc_put_request(req); return; @@ -481,7 +509,7 @@ static void kick_requests(struct ceph_osd_client *osdc, dout(20, "kicking tid %llu osd%d\n", req->r_tid, req->r_last_osd); - get_request(req); + ceph_osdc_get_request(req); mutex_unlock(&osdc->request_mutex); req->r_request = ceph_msg_maybe_dup(req->r_request); if (!req->r_aborted) { @@ -670,61 +698,34 @@ out: /* * Register request, send initial attempt. */ -static int start_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +int ceph_osdc_start_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { int rc; + req->r_request->pages = req->r_pages; + req->r_request->nr_pages = req->r_num_pages; + rc = register_request(osdc, req); if (rc < 0) return rc; + down_read(&osdc->map_sem); rc = send_request(osdc, req); up_read(&osdc->map_sem); return rc; } -/* - * synchronously do an osd request. - * - * If we are interrupted, take our pages away from any previous sent - * request message that may still be being written to the socket. - */ -static int do_sync_request(struct ceph_osd_client *osdc, +int ceph_osdc_wait_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { struct ceph_osd_reply_head *replyhead; __s32 rc; int bytes; - rc = start_request(osdc, req); /* register+send request */ - if (rc) - return rc; - rc = wait_for_completion_interruptible(&req->r_completion); if (rc < 0) { - struct ceph_msg *msg; - - dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid, - rc, req->r_request); - /* - * we were interrupted. - * - * mark req aborted _before_ revoking pages, so that - * if a racing kick_request _does_ dup the page vec - * pointer, it will definitely then see the aborted - * flag and not send the request. - */ - req->r_aborted = 1; - msg = req->r_request; - mutex_lock(&msg->page_mutex); - msg->pages = NULL; - mutex_unlock(&msg->page_mutex); - if (req->r_reply) { - mutex_lock(&req->r_reply->page_mutex); - req->r_reply->pages = NULL; - mutex_unlock(&req->r_reply->page_mutex); - } + ceph_osdc_abort_request(osdc, req); return rc; } @@ -732,12 +733,42 @@ static int do_sync_request(struct ceph_osd_client *osdc, replyhead = req->r_reply->front.iov_base; rc = le32_to_cpu(replyhead->result); bytes = le32_to_cpu(req->r_reply->hdr.data_len); - dout(10, "do_sync_request tid %llu result %d, %d bytes\n", + dout(10, "wait_request tid %llu result %d, %d bytes\n", req->r_tid, rc, bytes); if (rc < 0) return rc; return bytes; } + +/* + * To abort an in-progress request, take pages away from outgoing or + * incoming message. + */ +void ceph_osdc_abort_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) +{ + struct ceph_msg *msg; + + dout(0, "abort_request tid %llu, revoking %p pages\n", req->r_tid, + req->r_request); + /* + * mark req aborted _before_ revoking pages, so that + * if a racing kick_request _does_ dup the page vec + * pointer, it will definitely then see the aborted + * flag and not send the request. + */ + req->r_aborted = 1; + msg = req->r_request; + mutex_lock(&msg->page_mutex); + msg->pages = NULL; + mutex_unlock(&msg->page_mutex); + if (req->r_reply) { + mutex_lock(&req->r_reply->page_mutex); + req->r_reply->pages = NULL; + mutex_unlock(&req->r_reply->page_mutex); + } +} + /* * init, shutdown */ @@ -796,7 +827,10 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout(10, "readpages final extent is %llu~%llu (%d pages)\n", off, len, req->r_num_pages); - rc = do_sync_request(osdc, req); + + rc = ceph_osdc_start_request(osdc, req); + if (!rc) + rc = ceph_osdc_wait_request(osdc, req); if (rc >= 0) { read = rc; @@ -849,7 +883,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct page **pages, int num_pages, int flags, int do_sync) { - struct ceph_msg *reqm; struct ceph_osd_request *req; int rc = 0; @@ -869,14 +902,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, dout(10, "writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); - /* set up data payload */ - reqm = req->r_request; - reqm->pages = pages; - reqm->nr_pages = req->r_num_pages; - reqm->hdr.data_len = cpu_to_le32(len); - reqm->hdr.data_off = cpu_to_le16(off); + rc = ceph_osdc_start_request(osdc, req); + if (!rc) + rc = ceph_osdc_wait_request(osdc, req); - rc = do_sync_request(osdc, req); ceph_osdc_put_request(req); if (rc == 0) rc = len; @@ -884,37 +913,3 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return rc; } -/* - * start an async write - */ -int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, - struct ceph_osd_request *req, - u64 len, int num_pages) -{ - struct ceph_msg *reqm = req->r_request; - struct ceph_osd_request_head *reqhead = reqm->front.iov_base; - struct ceph_osd_op *op = (void *)(reqhead + 1); - u64 off = le64_to_cpu(op->offset); - int rc; - int flags; - - dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages); - - flags = CEPH_OSD_OP_MODIFY; - if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK) - flags |= CEPH_OSD_OP_ACK; - else - flags |= CEPH_OSD_OP_ONDISK; - reqhead->flags = cpu_to_le32(flags); - op->length = cpu_to_le64(len); - - /* reference pages in message */ - reqm->pages = req->r_pages; - reqm->nr_pages = req->r_num_pages = num_pages; - reqm->hdr.data_len = cpu_to_le32(len); - reqm->hdr.data_off = cpu_to_le16(off); - - rc = start_request(osdc, req); - return rc; -} - diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index ea66bd021d408..1ffebab69b0b9 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -56,12 +56,15 @@ struct ceph_osd_request { int r_aborted; /* set if we cancel this request */ atomic_t r_ref; - struct completion r_completion; /* on completion, or... */ - ceph_osdc_callback_t r_callback; /* ...async callback. */ - struct inode *r_inode; /* needed for async write */ - struct writeback_control *r_wbc; + struct completion r_completion, r_safe_completion; + ceph_osdc_callback_t r_callback, r_safe_callback; + struct ceph_eversion r_reassert_version; + struct list_head r_unsafe_item; - int r_last_osd; /* pg osds */ + struct inode *r_inode; /* for use by callbacks */ + struct writeback_control *r_wbc; /* ditto */ + + int r_last_osd; /* pg osds */ struct ceph_entity_addr r_last_osd_addr; unsigned long r_timeout_stamp; @@ -69,6 +72,7 @@ struct ceph_osd_request { struct ceph_snap_context *r_snapc; /* snap context for writes */ unsigned r_num_pages; /* size of page array (follows) */ struct page **r_pages; /* pages for data payload */ + int r_own_pages; /* if true, i own page list */ }; struct ceph_osd_client { @@ -111,8 +115,21 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_snap_context *snapc, int do_sync, u32 truncate_seq, u64 truncate_size); + +static inline void ceph_osdc_get_request(struct ceph_osd_request *req) +{ + atomic_inc(&req->r_ref); +} extern void ceph_osdc_put_request(struct ceph_osd_request *req); +extern int ceph_osdc_start_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); +extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req); + + extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, @@ -128,10 +145,6 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, u32 truncate_seq, u64 truncate_size, struct page **pages, int nr_pages, int flags, int do_sync); -extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, - struct ceph_osd_request *req, - u64 len, - int nr_pages); #endif diff --git a/src/kernel/super.h b/src/kernel/super.h index 10444a8861b10..5caeae807d931 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -281,6 +281,7 @@ struct ceph_inode_info { If it's non-zero, we _may_ have cached pages. */ u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */ + struct list_head i_unsafe_writes; /* uncommitted sync writes */ struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */ struct list_head i_snap_realm_item; @@ -722,6 +723,7 @@ extern int ceph_add_cap(struct inode *inode, struct ceph_cap *new_cap); extern void ceph_remove_cap(struct ceph_cap *cap); extern int ceph_get_cap_mds(struct inode *inode); +extern void ceph_get_more_cap_refs(struct ceph_inode_info *ci, int caps); extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc); @@ -759,7 +761,7 @@ extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry, struct nameidata *nd, int mode, int locked_dir); extern int ceph_release(struct inode *inode, struct file *filp); - +extern void ceph_release_page_vector(struct page **pages, int num_pages); /* dir.c */ extern const struct file_operations ceph_dir_fops;