]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-client.git/commitdiff
XXX: rest of David's changes
authorDavid Howells <dhowells@redhat.com>
Tue, 18 Jan 2022 15:04:52 +0000 (15:04 +0000)
committerJeff Layton <jlayton@kernel.org>
Thu, 19 May 2022 20:44:16 +0000 (16:44 -0400)
fs/ceph/addr.c
include/linux/ceph/libceph.h
net/ceph/messenger_v1.c
net/ceph/pagevec.c

index 938679a7a1e3d6db13d6741667ca1a88bc6bf21a..2ad545f734fb55579ff7552dbe60960179786411 100644 (file)
@@ -220,14 +220,8 @@ static void finish_netfs_read(struct ceph_osd_request *req)
                        __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
                iov_iter_advance(&subreq->iter, err);
        }
-       if (!iov_iter_is_bvec(&subreq->iter))
-               ceph_put_page_vector(osd_data->pages,
-                                    calc_pages_for(osd_data->alignment,
-                                    osd_data->length),
-                                    false);
 
        netfs_subreq_terminated(subreq, err, true);
-       iput(req->r_inode);
 }
 
 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq)
@@ -291,10 +285,7 @@ static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_osd_request *req;
        struct ceph_vino vino = ceph_vino(inode);
-       struct iov_iter *iter = &subreq->iter;
-       struct page **pages;
-       size_t page_off;
-       int err = 0;
+       int err;
        u64 len = subreq->len;
 
        if (ci->i_inline_version != CEPH_INLINE_NONE &&
@@ -315,38 +306,14 @@ static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
                __func__, subreq->start, subreq->len, len, rreq->debug_id,
                subreq->debug_index, iov_iter_count(&subreq->iter));
 
-       if (iov_iter_is_bvec(iter)) {
-               /*
-                * FIXME: remove force cast, ideally by plumbing an IOV_ITER osd_data
-                *        variant.
-                */
-               osd_req_op_extent_osd_data_bvecs(req, 0, (__force struct bio_vec *)iter->bvec,
-                               iter->nr_segs, len);
-               goto submit;
-       }
-
-       err = iov_iter_get_pages_alloc(&subreq->iter, &pages, len, &page_off);
-       if (err < len) {
-               if (err < 0) {
-                       dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
-                       goto out;
-               }
-               len = err;
-               req->r_ops[0].extent.length = err;
-       }
-
-       osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
-submit:
+       osd_req_op_extent_osd_iter(req, 0, &subreq->iter);
        req->r_callback = finish_netfs_read;
        req->r_priv = subreq;
        req->r_inode = inode;
-       ihold(inode);
 
        err = ceph_osdc_start_request(req->r_osdc, req, false);
-       if (err)
-               iput(inode);
-out:
        ceph_osdc_put_request(req);
+out:
        if (err)
                netfs_subreq_terminated(subreq, err, false);
        dout("%s: result %d\n", __func__, err);
index 00af2c98da75a7f3f36094ceb193b9d11d3171f6..bc85560fd93b29d4026d104c1102d8e304410b29 100644 (file)
@@ -324,8 +324,6 @@ int ceph_wait_for_latest_osdmap(struct ceph_client *client,
 
 /* pagevec.c */
 extern void ceph_release_page_vector(struct page **pages, int num_pages);
-extern void ceph_put_page_vector(struct page **pages, int num_pages,
-                                bool dirty);
 extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
 extern int ceph_copy_user_to_page_vector(struct page **pages,
                                         const void __user *data,
index 6b014eca3a1305e48aeb62d533951ad98c68ad78..dd282fc1a23c139fa2d3a0cf7bfea2955c1aeff2 100644 (file)
@@ -18,6 +18,20 @@ static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
 static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
 
+static int ceph_tcp_recv_iter(struct socket *sock, struct iov_iter *iter)
+{
+       struct msghdr msg = {
+               .msg_iter       = *iter,
+       };
+       unsigned int flags = MSG_DONTWAIT | MSG_NOSIGNAL;
+       int r;
+
+       r = sock_recvmsg(sock, &msg, flags);
+       if (r == -EAGAIN)
+               r = 0;
+       return r;
+}
+
 /*
  * If @buf is NULL, discard up to @len bytes.
  */
@@ -77,6 +91,30 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
        return r;
 }
 
+/*
+ * write something from an iterator.  @more is true if caller will be
+ * sending more data shortly.
+ */
+static int ceph_tcp_send_iter(struct socket *sock, struct iov_iter *iter,
+                             bool more)
+{
+       struct msghdr msg = {
+               .msg_flags      = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_ZEROCOPY,
+               .msg_iter       = *iter,
+       };
+       int r;
+
+       if (more)
+               msg.msg_flags |= MSG_MORE;
+       else
+               msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
+
+       r = sock_sendmsg(sock, &msg);
+       if (r == -EAGAIN)
+               r = 0;
+       return r;
+}
+
 /*
  * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
  */
@@ -455,6 +493,15 @@ out:
        return ret;  /* done! */
 }
 
+static ssize_t ceph_crc_scan(struct iov_iter *i, const void *p,
+                            size_t len, size_t off, void *_priv)
+{
+       u32 *crc = _priv;
+
+       *crc = crc32c(*crc, p, len);
+       return len;
+}
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -467,7 +514,7 @@ static int write_partial_message_data(struct ceph_connection *con)
        struct ceph_msg *msg = con->out_msg;
        struct ceph_msg_data_cursor *cursor = &msg->cursor;
        bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-       int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
+       int ret, more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
        u32 crc;
 
        dout("%s %p msg %p\n", __func__, con, msg);
@@ -484,31 +531,45 @@ static int write_partial_message_data(struct ceph_connection *con)
         * been revoked, so use the zero page.
         */
        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
-       while (cursor->total_resid) {
-               struct page *page;
-               size_t page_offset;
-               size_t length;
-               int ret;
 
-               if (!cursor->resid) {
-                       ceph_msg_data_advance(cursor, 0);
-                       continue;
-               }
-
-               page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
-               if (length == cursor->total_resid)
-                       more = MSG_MORE;
-               ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
-                                       more);
-               if (ret <= 0) {
-                       if (do_datacrc)
-                               msg->footer.data_crc = cpu_to_le32(crc);
+       if (cursor->data->type == CEPH_MSG_DATA_ITER) {
+               struct ceph_msg_data *data = cursor->data;
 
+               ret = ceph_tcp_send_iter(con->sock, &data->iter, more);
+               if (ret <= 0)
                        return ret;
+               cursor->total_resid -= ret;
+               if (cursor->total_resid > 0)
+                       return 0;
+
+               //iov_iter_revert(&data->iter, data->iter_count);
+               iov_iter_scan(&data->iter, data->iter_count, ceph_crc_scan, &crc);
+       } else {
+               while (cursor->total_resid) {
+                       struct page *page;
+                       size_t page_offset;
+                       size_t length;
+
+                       if (!cursor->resid) {
+                               ceph_msg_data_advance(cursor, 0);
+                               continue;
+                       }
+
+                       page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
+                       if (length == cursor->total_resid)
+                               more = MSG_MORE;
+                       ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
+                                               more);
+                       if (ret <= 0) {
+                               if (do_datacrc)
+                                       msg->footer.data_crc = cpu_to_le32(crc);
+
+                               return ret;
+                       }
+                       if (do_datacrc && cursor->need_crc)
+                               crc = ceph_crc32c_page(crc, page, page_offset, length);
+                       ceph_msg_data_advance(cursor, (size_t)ret);
                }
-               if (do_datacrc && cursor->need_crc)
-                       crc = ceph_crc32c_page(crc, page, page_offset, length);
-               ceph_msg_data_advance(cursor, (size_t)ret);
        }
 
        dout("%s %p msg %p done\n", __func__, con, msg);
@@ -1002,24 +1063,39 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
        if (do_datacrc)
                crc = con->in_data_crc;
-       while (cursor->total_resid) {
-               if (!cursor->resid) {
-                       ceph_msg_data_advance(cursor, 0);
-                       continue;
-               }
 
-               page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
-               ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
-               if (ret <= 0) {
-                       if (do_datacrc)
-                               con->in_data_crc = crc;
+       if (cursor->data->type == CEPH_MSG_DATA_ITER) {
+               struct ceph_msg_data *data = cursor->data;
 
+               ret = ceph_tcp_recv_iter(con->sock, &data->iter);
+               if (ret <= 0)
                        return ret;
-               }
+               cursor->total_resid -= ret;
+               if (cursor->total_resid > 0)
+                       return 0;
 
-               if (do_datacrc)
-                       crc = ceph_crc32c_page(crc, page, page_offset, ret);
-               ceph_msg_data_advance(cursor, (size_t)ret);
+               iov_iter_scan(&data->iter, data->iter_count, ceph_crc_scan, &crc);
+
+       } else {
+               while (cursor->total_resid) {
+                       if (!cursor->resid) {
+                               ceph_msg_data_advance(cursor, 0);
+                               continue;
+                       }
+
+                       page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
+                       ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
+                       if (ret <= 0) {
+                               if (do_datacrc)
+                                       con->in_data_crc = crc;
+
+                               return ret;
+                       }
+
+                       if (do_datacrc)
+                               crc = ceph_crc32c_page(crc, page, page_offset, ret);
+                       ceph_msg_data_advance(cursor, (size_t)ret);
+               }
        }
        if (do_datacrc)
                con->in_data_crc = crc;
index 64305e7056a1c738ef178d59544822af8ce49ea4..c3e349c3d54dc464d515ba93947e4d28e994c180 100644 (file)
 
 #include <linux/ceph/libceph.h>
 
-void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty)
-{
-       int i;
-
-       for (i = 0; i < num_pages; i++) {
-               if (dirty)
-                       set_page_dirty_lock(pages[i]);
-               put_page(pages[i]);
-       }
-       kvfree(pages);
-}
-EXPORT_SYMBOL(ceph_put_page_vector);
-
 void ceph_release_page_vector(struct page **pages, int num_pages)
 {
        int i;