]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: refactor osdc write; handle dual ack/commit, and fsync on sync writes
authorSage Weil <sage@newdream.net>
Mon, 16 Mar 2009 21:56:18 +0000 (14:56 -0700)
committerSage Weil <sage@newdream.net>
Mon, 16 Mar 2009 21:56:18 +0000 (14:56 -0700)
For sync_writes that are not O_SYNC or O_DIRECT, we get two acks
back from the OSD, one signalling serialization and one when the
write is safe on disk.  The client needs to keep the request
message around until the commit in case the OSD fails and it needs
to be replayed.  However, the calling thread should continue after
the ACK.

Refactor the osdc interface so that sync_write() can do this for
non SYNC|DIRECT writes.

src/kernel/addr.c
src/kernel/caps.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/osd_client.c
src/kernel/osd_client.h
src/kernel/super.h

index 110162204357755287a28b65b02cea49a16e4f3f..90989c38c77b6803ef77b1b8b7f10f0251e847ac 100644 (file)
@@ -641,6 +641,8 @@ retry:
                struct page *page;
                int want;
                u64 offset, len;
+               struct ceph_osd_request_head *reqhead;
+               struct ceph_osd_op *op;
 
                next = 0;
                locked_pages = 0;
@@ -730,7 +732,9 @@ get_more_pages:
                                            &ci->i_layout,
                                            ceph_vino(inode),
                                            offset, &len,
-                                           CEPH_OSD_OP_WRITE, 0,
+                                           CEPH_OSD_OP_WRITE,
+                                           CEPH_OSD_OP_MODIFY |
+                                                   CEPH_OSD_OP_ONDISK,
                                            snapc, do_sync,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size);
@@ -787,8 +791,15 @@ get_more_pages:
                          (u64)locked_pages << PAGE_CACHE_SHIFT);
                dout(10, "writepages got %d pages at %llu~%llu\n",
                     locked_pages, offset, len);
-               rc = ceph_osdc_writepages_start(&client->osdc, req,
-                                               len, locked_pages);
+
+               /* revise final length, page count */
+               req->r_num_pages = locked_pages;
+               reqhead = req->r_request->front.iov_base;
+               op = (void *)(reqhead + 1);
+               op->length = cpu_to_le64(len);
+               req->r_request->hdr.data_len = cpu_to_le32(len);
+
+               rc = ceph_osdc_start_request(&client->osdc, req);
                req = NULL;
                /*
                 * FIXME: if writepages_start fails (ENOMEM?) we should
index 293a6e34c0ef60c39bca9039d4d6e1a578bd3223..52ca2165a283cec8faee7f3ed296d742274f1401 100644 (file)
@@ -1096,6 +1096,17 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
                                ceph_get_cap_refs(ci, need, want, got, endoff));
 }
 
+/*
+ * Take cap refs.  Caller must already now we hold at least on ref on
+ * the caps in question or we don't know this is safe.
+ */
+void ceph_get_more_cap_refs(struct ceph_inode_info *ci, int caps)
+{
+       spin_lock(&ci->vfs_inode.i_lock);
+       __take_cap_refs(ci, caps);
+       spin_unlock(&ci->vfs_inode.i_lock);
+}
+
 /*
  * Release cap refs.
  *
index 5d9266cbfac65f7f749cd138e42cf923b0e4b153..d027d71fa9d90fc3a569a7c55d89f4e79e22f717 100644 (file)
@@ -241,7 +241,7 @@ static void put_page_vector(struct page **pages, int num_pages)
        kfree(pages);
 }
 
-static void release_page_vector(struct page **pages, int num_pages)
+void ceph_release_page_vector(struct page **pages, int num_pages)
 {
        int i;
 
@@ -261,7 +261,7 @@ static struct page **alloc_page_vector(int num_pages)
        for (i = 0; i < num_pages; i++) {
                pages[i] = alloc_page(GFP_NOFS);
                if (pages[i] == NULL) {
-                       release_page_vector(pages, i);
+                       ceph_release_page_vector(pages, i);
                        return ERR_PTR(-ENOMEM);
                }
        }
@@ -399,10 +399,67 @@ more:
        if (file->f_flags & O_DIRECT)
                put_page_vector(pages, num_pages);
        else
-               release_page_vector(pages, num_pages);
+               ceph_release_page_vector(pages, num_pages);
        return ret;
 }
 
+/*
+ * Write commit callback, called if we requested both an ACK and
+ * ONDISK commit reply from the OSD.
+ */
+static void sync_write_commit(struct ceph_osd_request *req)
+{
+       struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+
+       dout(10, "sync_write_commit %p tid %llu\n", req, req->r_tid);
+       spin_lock(&ci->i_listener_lock);
+       list_del_init(&req->r_unsafe_item);
+       spin_unlock(&ci->i_listener_lock);
+       ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+}
+
+/*
+ * Wait on any unsafe replies for the given inode.  First wait on the
+ * newest request, and make that the upper bound.  Then, if there are
+ * more requests, keep waiting on the oldest as long as it is still older
+ * than the original request.
+ */
+static void sync_write_wait(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct list_head *head = &ci->i_unsafe_writes;
+       struct ceph_osd_request *req;
+       u64 last_tid;
+
+       spin_lock(&ci->i_listener_lock);
+       if (list_empty(head))
+              goto out;
+
+       /* set upper bound as _last_ entry in chain */
+       req = list_entry(head->prev, struct ceph_osd_request,
+                       r_unsafe_item);
+       last_tid = req->r_tid;
+
+       do {
+              ceph_osdc_get_request(req);
+              spin_unlock(&ci->i_listener_lock);
+              dout(10, "sync_write_wait on tid %llu (until %llu)\n",
+                   req->r_tid, last_tid);
+              wait_for_completion(&req->r_safe_completion);
+              spin_lock(&ci->i_listener_lock);
+              ceph_osdc_put_request(req);
+
+              /*
+               * from here on look at first entry in chain, since we
+               * only want to wait for anything older than last_tid
+               */
+              req = list_entry(head->next, struct ceph_osd_request,
+                               r_unsafe_item);
+       } while (req->r_tid <= last_tid);
+out:
+       spin_unlock(&ci->i_listener_lock);
+}
+
 /*
  * synchronous write.  from userspace.
  *
@@ -416,9 +473,11 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_client *client = ceph_inode_to_client(inode);
-       struct page **pages, **page_pos;
-       int num_pages, pages_left;
+       struct ceph_osd_request *req;
+       struct page **pages;
+       int num_pages;
        long long unsigned pos;
+       u64 len;
        int written = 0;
        int flags;
        int do_sync = 0;
@@ -434,73 +493,100 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
                pos = i_size_read(inode);
        else
                pos = *offset;
-       num_pages = calc_pages_for(pos, left);
+
+       flags = CEPH_OSD_OP_ORDERSNAP |
+               CEPH_OSD_OP_ONDISK |
+               CEPH_OSD_OP_MODIFY;
+       if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
+               flags |= CEPH_OSD_OP_ACK;
+       else
+               do_sync = 1;
+
+       /*
+        * we may need to do multiple writes here if we span an object
+        * boundary.  this isn't atomic, unfortunately.  :(
+        */
+more:
+       len = left;
+       req = ceph_osdc_new_request(&client->osdc, &ci->i_layout,
+                                   ceph_vino(inode), pos, &len,
+                                   CEPH_OSD_OP_WRITE, flags,
+                                   ci->i_snap_realm->cached_context,
+                                   do_sync,
+                                   ci->i_truncate_seq, ci->i_truncate_size);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+
+       num_pages = calc_pages_for(pos, len);
 
        if (file->f_flags & O_DIRECT) {
-               pages = get_direct_page_vector(data, num_pages, pos, left);
-               if (IS_ERR(pages))
-                       return PTR_ERR(pages);
+               pages = get_direct_page_vector(data, num_pages, pos, len);
+               if (IS_ERR(pages)) {
+                       ret = PTR_ERR(pages);
+                       goto out;
+               }
 
                /*
                 * throw out any page cache pages in this range. this
                 * may block.
                 */
-               truncate_inode_pages_range(inode->i_mapping, pos, pos+left);
+               truncate_inode_pages_range(inode->i_mapping, pos, pos+len);
        } else {
                pages = alloc_page_vector(num_pages);
-               if (IS_ERR(pages))
-                       return PTR_ERR(pages);
-               ret = copy_user_to_page_vector(pages, data, pos, left);
-               if (ret < 0)
+               if (IS_ERR(pages)) {
+                       ret = PTR_ERR(pages);
                        goto out;
-       }
-
-       flags = CEPH_OSD_OP_ORDERSNAP;
-       if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
-               flags |= CEPH_OSD_OP_ACK;
-       else
-               do_sync = 1;
+               }
+               ret = copy_user_to_page_vector(pages, data, pos, len);
+               if (ret < 0) {
+                       ceph_release_page_vector(pages, num_pages);
+                       goto out;
+               }
 
-       /*
-        * we may need to do multiple writes here if we span an object
-        * boundary.  this isn't atomic, unfortunately.  :(
-        */
-       page_pos = pages;
-       pages_left = num_pages;
+               if ((file->f_flags & O_SYNC) == 0) {
+                       /* get a second commit callback */
+                       req->r_safe_callback = sync_write_commit;
+                       req->r_own_pages = 1;
+               }
+       }
+       req->r_pages = pages;
+       req->r_num_pages = num_pages;
+       req->r_inode = inode;
+
+       ret = ceph_osdc_start_request(&client->osdc, req);
+       if (!ret) {
+               if (req->r_safe_callback) {
+                       /*
+                        * Add to inode unsafe list only after we
+                        * start_request so that a tid has been assigned.
+                        */
+                       spin_lock(&ci->i_listener_lock);
+                       list_add(&ci->i_unsafe_writes, &req->r_unsafe_item);
+                       spin_unlock(&ci->i_listener_lock);
+                       ceph_get_more_cap_refs(ci, CEPH_CAP_FILE_WR);
+               }
+               ret = ceph_osdc_wait_request(&client->osdc, req);
+       }
 
-more:
-       ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode),
-                                  &ci->i_layout,
-                                  ci->i_snap_realm->cached_context,
-                                  pos, left, ci->i_truncate_seq,
-                                  ci->i_truncate_size,
-                                  page_pos, pages_left,
-                                  flags, do_sync);
-       if (ret > 0) {
-               int didpages =
-                       ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
+       if (file->f_flags & O_DIRECT)
+               put_page_vector(pages, num_pages);
+       else if (file->f_flags & O_SYNC)
+               ceph_release_page_vector(pages, num_pages);
 
-               pos += ret;
-               written += ret;
-               left -= ret;
-               if (left) {
-                       page_pos += didpages;
-                       pages_left -= didpages;
-                       BUG_ON(!pages_left);
+out:
+       ceph_osdc_put_request(req);
+       if (ret == 0) {
+               pos += len;
+               written += len;
+               left -= len;
+               if (left)
                        goto more;
-               }
 
                ret = written;
                *offset = pos;
                if (pos > i_size_read(inode))
                        ceph_inode_set_size(inode, pos);
        }
-
-out:
-       if (file->f_flags & O_DIRECT)
-               put_page_vector(pages, num_pages);
-       else
-               release_page_vector(pages, num_pages);
        return ret;
 }
 
@@ -509,7 +595,7 @@ out:
  * Atomically grab references, so that those bits are not released
  * back to the MDS mid-read.
  *
- * Hmm, the sync reach case isn't actually async... should it be?
+ * Hmm, the sync read case isn't actually async... should it be?
  */
 static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
                             unsigned long nr_segs, loff_t pos)
@@ -648,6 +734,8 @@ static int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
        int ret;
 
        dout(10, "fsync on inode %p\n", inode);
+       sync_write_wait(inode);
+
        ret = filemap_write_and_wait(inode->i_mapping);
        if (ret < 0)
                return ret;
index 37fb8cedde96076cc5b896b5adf07fa9b52fce16..555b21d75136996990d71a0ab4344e5255ddbbc1 100644 (file)
@@ -288,6 +288,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_wrbuffer_ref_head = 0;
        ci->i_rdcache_gen = 0;
        ci->i_rdcache_revoking = 0;
+       INIT_LIST_HEAD(&ci->i_unsafe_writes);
 
        ci->i_snap_realm = NULL;
        INIT_LIST_HEAD(&ci->i_snap_realm_item);
index d38fffe64479703c7e1dbcfef300b634ee57a568..82283ebf27db3800a1c60b10e191e223f6a43865 100644 (file)
@@ -60,11 +60,6 @@ static void calc_layout(struct ceph_osd_client *osdc,
 /*
  * requests
  */
-static void get_request(struct ceph_osd_request *req)
-{
-       atomic_inc(&req->r_ref);
-}
-
 void ceph_osdc_put_request(struct ceph_osd_request *req)
 {
        dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
@@ -75,6 +70,9 @@ void ceph_osdc_put_request(struct ceph_osd_request *req)
                        ceph_msg_put(req->r_request);
                if (req->r_reply)
                        ceph_msg_put(req->r_reply);
+               if (req->r_own_pages)
+                       ceph_release_page_vector(req->r_pages,
+                                                req->r_num_pages);
                ceph_put_snap_context(req->r_snapc);
                kfree(req);
        }
@@ -110,6 +108,11 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
+       atomic_set(&req->r_ref, 1);
+       init_completion(&req->r_completion);
+       init_completion(&req->r_safe_completion);
+       INIT_LIST_HEAD(&req->r_unsafe_item);
+
        /* create message */
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
@@ -135,10 +138,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        calc_layout(osdc, vino, layout, off, plen, req);
        req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
 
+       if (flags & CEPH_OSD_OP_MODIFY) {
+               req->r_request->hdr.data_off = cpu_to_le16(off);
+               req->r_request->hdr.data_len = cpu_to_le32(*plen);
+       }
+
        /* additional ops */
        if (do_trunc) {
                op++;
-               op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ? 
+               op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ?
                             CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
                op->truncate_seq = cpu_to_le32(truncate_seq);
                prevofs =  le64_to_cpu((op-1)->offset);
@@ -154,13 +162,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                for (i = 0; i < snapc->num_snaps; i++)
                        snaps[i] = cpu_to_le64(snapc->snaps[i]);
        }
-
-       atomic_set(&req->r_ref, 1);
-       init_completion(&req->r_completion);
        return req;
 }
 
-
 /*
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
@@ -180,7 +184,7 @@ static int register_request(struct ceph_osd_client *osdc,
        if (rc < 0)
                goto out;
 
-       get_request(req);
+       ceph_osdc_get_request(req);
        osdc->num_requests++;
 
        req->r_timeout_stamp =
@@ -215,7 +219,7 @@ static void handle_timeout(struct work_struct *work)
                container_of(work, struct ceph_osd_client, timeout_work.work);
        struct ceph_osd_request *req;
        unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies; 
+       unsigned long next_timeout = timeout + jiffies;
        RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
        u64 next_tid = 0;
        int got;
@@ -369,6 +373,7 @@ static int send_request(struct ceph_osd_client *osdc,
        reqhead = req->r_request->front.iov_base;
        reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
+       reqhead->reassert_version = req->r_reassert_version;
 
        req->r_request->hdr.dst.name.type =
                cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
@@ -392,7 +397,7 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
        struct ceph_osd_request *req;
        u64 tid;
-       int numops;
+       int numops, flags;
 
        if (msg->front.iov_len < sizeof(*rhead))
                goto bad;
@@ -411,27 +416,50 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                mutex_unlock(&osdc->request_mutex);
                return;
        }
-       get_request(req);
-       if (req->r_reply == NULL) {
-               /* no data payload, or r_reply would have been set by
-                  prepare_pages. */
-               ceph_msg_get(msg);
-               req->r_reply = msg;
-       } else if (req->r_reply == msg) {
-               /* r_reply was set by prepare_pages; now it's fully read. */
-       } else {
-               dout(10, "handle_reply tid %llu already had reply?\n", tid);
+       ceph_osdc_get_request(req);
+       flags = le32_to_cpu(rhead->flags);
+
+       if (req->r_aborted) {
+               dout(10, "handle_reply tid %llu aborted\n", tid);
                goto done;
        }
-       dout(10, "handle_reply tid %llu flags %d\n", tid,
-            le32_to_cpu(rhead->flags));
-       __unregister_request(osdc, req);
+
+       if (req->r_reassert_version.epoch == 0) {
+               /* first ack */
+               if (req->r_reply == NULL) {
+                       /* no data payload, or r_reply would have been set by
+                          prepare_pages. */
+                       ceph_msg_get(msg);
+                       req->r_reply = msg;
+               } else {
+                       /* r_reply was set by prepare_pages */
+                       BUG_ON(req->r_reply != msg);
+               }
+
+               /* in case we need to replay this op, */
+               req->r_reassert_version = rhead->reassert_version;
+       } else if ((flags & CEPH_OSD_OP_ONDISK) == 0) {
+               dout(10, "handle_reply tid %llu dup ack\n", tid);
+               goto done;
+       }
+
+       dout(10, "handle_reply tid %llu flags %d\n", tid, flags);
+
+       if (flags & CEPH_OSD_OP_ONDISK)
+               __unregister_request(osdc, req);
+
        mutex_unlock(&osdc->request_mutex);
 
        if (req->r_callback)
                req->r_callback(req);
        else
-               complete(&req->r_completion);  /* see do_sync_request */
+               complete(&req->r_completion);
+
+       if ((flags & CEPH_OSD_OP_ONDISK) && req->r_safe_callback) {
+               req->r_safe_callback(req);
+               complete(&req->r_safe_completion);  /* fsync waiter */
+       }
+
 done:
        ceph_osdc_put_request(req);
        return;
@@ -481,7 +509,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
 
                dout(20, "kicking tid %llu osd%d\n", req->r_tid,
                     req->r_last_osd);
-               get_request(req);
+               ceph_osdc_get_request(req);
                mutex_unlock(&osdc->request_mutex);
                req->r_request = ceph_msg_maybe_dup(req->r_request);
                if (!req->r_aborted) {
@@ -670,61 +698,34 @@ out:
 /*
  * Register request, send initial attempt.
  */
-static int start_request(struct ceph_osd_client *osdc,
-                        struct ceph_osd_request *req)
+int ceph_osdc_start_request(struct ceph_osd_client *osdc,
+                           struct ceph_osd_request *req)
 {
        int rc;
 
+       req->r_request->pages = req->r_pages;
+       req->r_request->nr_pages = req->r_num_pages;
+
        rc = register_request(osdc, req);
        if (rc < 0)
                return rc;
+
        down_read(&osdc->map_sem);
        rc = send_request(osdc, req);
        up_read(&osdc->map_sem);
        return rc;
 }
 
-/*
- * synchronously do an osd request.
- *
- * If we are interrupted, take our pages away from any previous sent
- * request message that may still be being written to the socket.
- */
-static int do_sync_request(struct ceph_osd_client *osdc,
+int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req)
 {
        struct ceph_osd_reply_head *replyhead;
        __s32 rc;
        int bytes;
 
-       rc = start_request(osdc, req);  /* register+send request */
-       if (rc)
-               return rc;
-
        rc = wait_for_completion_interruptible(&req->r_completion);
        if (rc < 0) {
-               struct ceph_msg *msg;
-
-               dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
-                    rc, req->r_request);
-               /*
-                * we were interrupted.
-                *
-                * mark req aborted _before_ revoking pages, so that
-                * if a racing kick_request _does_ dup the page vec
-                * pointer, it will definitely then see the aborted
-                * flag and not send the request.
-                */
-               req->r_aborted = 1;
-               msg = req->r_request;
-               mutex_lock(&msg->page_mutex);
-               msg->pages = NULL;
-               mutex_unlock(&msg->page_mutex);
-               if (req->r_reply) {
-                       mutex_lock(&req->r_reply->page_mutex);
-                       req->r_reply->pages = NULL;
-                       mutex_unlock(&req->r_reply->page_mutex);
-               }
+               ceph_osdc_abort_request(osdc, req);
                return rc;
        }
 
@@ -732,12 +733,42 @@ static int do_sync_request(struct ceph_osd_client *osdc,
        replyhead = req->r_reply->front.iov_base;
        rc = le32_to_cpu(replyhead->result);
        bytes = le32_to_cpu(req->r_reply->hdr.data_len);
-       dout(10, "do_sync_request tid %llu result %d, %d bytes\n",
+       dout(10, "wait_request tid %llu result %d, %d bytes\n",
             req->r_tid, rc, bytes);
        if (rc < 0)
                return rc;
        return bytes;
 }
+
+/*
+ * To abort an in-progress request, take pages away from outgoing or
+ * incoming message.
+ */
+void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
+                            struct ceph_osd_request *req)
+{
+       struct ceph_msg *msg;
+
+       dout(0, "abort_request tid %llu, revoking %p pages\n", req->r_tid,
+            req->r_request);
+       /*
+        * mark req aborted _before_ revoking pages, so that
+        * if a racing kick_request _does_ dup the page vec
+        * pointer, it will definitely then see the aborted
+        * flag and not send the request.
+        */
+       req->r_aborted = 1;
+       msg = req->r_request;
+       mutex_lock(&msg->page_mutex);
+       msg->pages = NULL;
+       mutex_unlock(&msg->page_mutex);
+       if (req->r_reply) {
+               mutex_lock(&req->r_reply->page_mutex);
+               req->r_reply->pages = NULL;
+               mutex_unlock(&req->r_reply->page_mutex);
+       }
+}
+
 /*
  * init, shutdown
  */
@@ -796,7 +827,10 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        dout(10, "readpages final extent is %llu~%llu (%d pages)\n",
             off, len, req->r_num_pages);
-       rc = do_sync_request(osdc, req);
+
+       rc = ceph_osdc_start_request(osdc, req);
+       if (!rc)
+               rc = ceph_osdc_wait_request(osdc, req);
 
        if (rc >= 0) {
                read = rc;
@@ -849,7 +883,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct page **pages, int num_pages,
                         int flags, int do_sync)
 {
-       struct ceph_msg *reqm;
        struct ceph_osd_request *req;
        int rc = 0;
 
@@ -869,14 +902,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        dout(10, "writepages %llu~%llu (%d pages)\n", off, len,
             req->r_num_pages);
 
-       /* set up data payload */
-       reqm = req->r_request;
-       reqm->pages = pages;
-       reqm->nr_pages = req->r_num_pages;
-       reqm->hdr.data_len = cpu_to_le32(len);
-       reqm->hdr.data_off = cpu_to_le16(off);
+       rc = ceph_osdc_start_request(osdc, req);
+       if (!rc)
+               rc = ceph_osdc_wait_request(osdc, req);
 
-       rc = do_sync_request(osdc, req);
        ceph_osdc_put_request(req);
        if (rc == 0)
                rc = len;
@@ -884,37 +913,3 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        return rc;
 }
 
-/*
- * start an async write
- */
-int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
-                              struct ceph_osd_request *req,
-                              u64 len, int num_pages)
-{
-       struct ceph_msg *reqm = req->r_request;
-       struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
-       struct ceph_osd_op *op = (void *)(reqhead + 1);
-       u64 off = le64_to_cpu(op->offset);
-       int rc;
-       int flags;
-
-       dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
-
-       flags = CEPH_OSD_OP_MODIFY;
-       if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
-               flags |= CEPH_OSD_OP_ACK;
-       else
-               flags |= CEPH_OSD_OP_ONDISK;
-       reqhead->flags = cpu_to_le32(flags);
-       op->length = cpu_to_le64(len);
-
-       /* reference pages in message */
-       reqm->pages = req->r_pages;
-       reqm->nr_pages = req->r_num_pages = num_pages;
-       reqm->hdr.data_len = cpu_to_le32(len);
-       reqm->hdr.data_off = cpu_to_le16(off);
-
-       rc = start_request(osdc, req);
-       return rc;
-}
-
index ea66bd021d408e43a8214b04d53b13c08fc7ac61..1ffebab69b0b96c2d629c552cac0991d2bb67741 100644 (file)
@@ -56,12 +56,15 @@ struct ceph_osd_request {
        int               r_aborted;   /* set if we cancel this request */
 
        atomic_t          r_ref;
-       struct completion r_completion;       /* on completion, or... */
-       ceph_osdc_callback_t r_callback;      /* ...async callback. */
-       struct inode *r_inode;                /* needed for async write */
-       struct writeback_control *r_wbc;
+       struct completion r_completion, r_safe_completion;
+       ceph_osdc_callback_t r_callback, r_safe_callback;
+       struct ceph_eversion r_reassert_version;
+       struct list_head  r_unsafe_item;
 
-       int               r_last_osd;   /* pg osds */
+       struct inode *r_inode;                /* for use by callbacks */
+       struct writeback_control *r_wbc;      /* ditto */
+
+       int               r_last_osd;         /* pg osds */
        struct ceph_entity_addr r_last_osd_addr;
        unsigned long     r_timeout_stamp;
 
@@ -69,6 +72,7 @@ struct ceph_osd_request {
        struct ceph_snap_context *r_snapc;    /* snap context for writes */
        unsigned          r_num_pages;        /* size of page array (follows) */
        struct page     **r_pages;            /* pages for data payload */
+       int               r_own_pages;        /* if true, i own page list */
 };
 
 struct ceph_osd_client {
@@ -111,8 +115,21 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      struct ceph_snap_context *snapc,
                                      int do_sync, u32 truncate_seq,
                                      u64 truncate_size);
+
+static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
+{
+       atomic_inc(&req->r_ref);
+}
 extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
+extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
+                                  struct ceph_osd_request *req);
+extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
+                                 struct ceph_osd_request *req);
+extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
+                                   struct ceph_osd_request *req);
+
+
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
@@ -128,10 +145,6 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                u32 truncate_seq, u64 truncate_size,
                                struct page **pages, int nr_pages,
                                int flags, int do_sync);
-extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
-                                     struct ceph_osd_request *req,
-                                     u64 len,
-                                     int nr_pages);
 
 #endif
 
index 10444a8861b10d6b44807f532dc3ded7e70a4fbd..5caeae807d931a04326142c9bc6894301d8436d4 100644 (file)
@@ -281,6 +281,7 @@ struct ceph_inode_info {
                                   If it's non-zero, we _may_ have cached
                                   pages. */
        u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
+       struct list_head i_unsafe_writes; /* uncommitted sync writes */
 
        struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
        struct list_head i_snap_realm_item;
@@ -722,6 +723,7 @@ extern int ceph_add_cap(struct inode *inode,
                        struct ceph_cap *new_cap);
 extern void ceph_remove_cap(struct ceph_cap *cap);
 extern int ceph_get_cap_mds(struct inode *inode);
+extern void ceph_get_more_cap_refs(struct ceph_inode_info *ci, int caps);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                                       struct ceph_snap_context *snapc);
@@ -759,7 +761,7 @@ extern struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
                                       struct nameidata *nd, int mode,
                                       int locked_dir);
 extern int ceph_release(struct inode *inode, struct file *filp);
-
+extern void ceph_release_page_vector(struct page **pages, int num_pages);
 
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;