From 8ff832e44ec87fb9c0e30168420d0b8aeee7677a Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 2 Apr 2008 07:01:23 -0700 Subject: [PATCH] kclient: some io refactoring, still a bit busted tho --- src/kernel/file.c | 116 ++++++++------ src/kernel/osd_client.c | 332 ++++++++++++++++++++-------------------- src/kernel/osd_client.h | 4 + src/kernel/super.c | 5 - src/kernel/super.h | 1 - 5 files changed, 238 insertions(+), 220 deletions(-) diff --git a/src/kernel/file.c b/src/kernel/file.c index 3ca89088ab05f..80de0aeeab335 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -11,8 +11,6 @@ int ceph_debug_file = 50; #include -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t count, loff_t *offset); /* * if err==0, caller is responsible for a put_session on *psession @@ -172,6 +170,63 @@ const struct inode_operations ceph_file_iops = { }; + +/* + * completely synchronous read and write methods. direct from __user + * buffer to osd. + */ +static ssize_t ceph_sync_read(struct file *file, char __user *data, + size_t count, loff_t *offset) +{ + struct inode *inode = file->f_dentry->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_client *client = ceph_inode_to_client(inode); + int ret = 0; + off_t pos = *offset; + + dout(10, "sync_read on file %p %lld~%u\n", file, *offset, + (unsigned)count); + + ret = ceph_osdc_sync_read(&client->osdc, ceph_ino(inode), + &ci->i_layout, + pos, count, data); + return ret; +} + +static ssize_t ceph_sync_write(struct file *file, const char __user *data, + size_t count, loff_t *offset) +{ + struct inode *inode = file->f_dentry->d_inode; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_client *client = ceph_inode_to_client(inode); + int ret = 0; + off_t pos = *offset; + + dout(10, "sync_write on file %p %lld~%u\n", file, *offset, + (unsigned)count); + + if (file->f_flags & O_APPEND) + pos = i_size_read(inode); + + ret = ceph_osdc_sync_write(&client->osdc, ceph_ino(inode), + &ci->i_layout, + pos, count, data); + if (ret <= 0) + return ret; + pos += ret; + *offset = pos; + + spin_lock(&inode->i_lock); + if (pos > inode->i_size) { + inode->i_size = pos; + inode->i_blocks = (inode->i_size + 512 - 1) >> 9; + dout(10, "extending file size to %d\n", (int)inode->i_size); + } + spin_unlock(&inode->i_lock); + + return ret; +} + /* * wrap do_sync_read and friends with checks for cap bits on the inode. * atomically grab references, so that those bits are released mid-read. @@ -191,11 +246,14 @@ ssize_t ceph_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) &got)); if (ret < 0) goto out; - dout(10, "read %llx %llu~%u got cap refs on %d\n", + dout(10, "read %llx %llu~%u got cap refs %d\n", ceph_ino(inode), *ppos, (unsigned)len, got); - //if (got & CEPH_CAP_RDCACHE) { - ret = do_sync_read(filp, buf, len, ppos); + if ((got & CEPH_CAP_RDCACHE) == 0 || + (inode->i_sb->s_flags & MS_SYNCHRONOUS)) + ret = ceph_sync_read(filp, buf, len, ppos); + else + ret = do_sync_read(filp, buf, len, ppos); out: dout(10, "read %llx dropping cap refs on %d\n", ceph_ino(inode), got); @@ -211,7 +269,6 @@ ssize_t ceph_write(struct file *filp, const char __user *buf, { struct inode *inode = filp->f_dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_client *client = ceph_inode_to_client(inode); ssize_t ret; int got = 0; @@ -224,10 +281,11 @@ ssize_t ceph_write(struct file *filp, const char __user *buf, goto out; dout(10, "write got cap refs on %d\n", got); - if ((got & CEPH_CAP_RDCACHE) && !client->mount_args.sync) - ret = do_sync_write(filp, buf, len, ppos); - else + if ((got & CEPH_CAP_WRBUFFER) == 0 || + (inode->i_sb->s_flags & MS_SYNCHRONOUS)) ret = ceph_sync_write(filp, buf, len, ppos); + else + ret = do_sync_write(filp, buf, len, ppos); out: dout(10, "write dropping cap refs on %d\n", got); @@ -237,46 +295,6 @@ out: -/* - * totally naive write. just to get things sort of working. - * ugly hack! - */ -static ssize_t ceph_sync_write(struct file *file, const char __user *data, - size_t count, loff_t *offset) -{ - struct inode *inode = file->f_dentry->d_inode; - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc; - int ret = 0; - off_t pos = *offset; - - dout(10, "sync_write on file %p %lld~%u\n", file, *offset, - (unsigned)count); - - if (file->f_flags & O_APPEND) - pos = i_size_read(inode); - - ret = ceph_osdc_sync_write(osdc, ceph_ino(inode), - &ci->i_layout, - pos, count, data); - if (ret <= 0) - return ret; - pos += ret; - - spin_lock(&inode->i_lock); - if (pos > inode->i_size) { - inode->i_size = pos; - inode->i_blocks = (inode->i_size + 512 - 1) >> 9; - dout(10, "extending file size to %d\n", (int)inode->i_size); - } - spin_unlock(&inode->i_lock); - invalidate_inode_pages2(inode->i_mapping); - - *offset = pos; - - return ret; -} - const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 5469338e39180..636563bf3b692 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -161,40 +161,38 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op) return req; } -static struct ceph_osd_request *alloc_request(int nr_pages) +static struct ceph_osd_request *alloc_request(int nr_pages, + struct ceph_msg *msg) { struct ceph_osd_request *req; req = kmalloc(sizeof(*req) + nr_pages*sizeof(void*), GFP_KERNEL); if (req == NULL) return ERR_PTR(-ENOMEM); - + req->r_request = msg; + req->r_nr_pages = nr_pages; return req; } -struct ceph_osd_request *register_request(struct ceph_osd_client *osdc, - struct ceph_msg *msg, - int nr_pages, - struct ceph_osd_request *req) +static int register_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { - struct ceph_osd_request_head *head = msg->front.iov_base; + struct ceph_osd_request_head *head = req->r_request->front.iov_base; req->r_tid = head->tid = ++osdc->last_tid; req->r_flags = 0; - req->r_request = msg; req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid); req->r_reply = 0; req->r_result = 0; atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */ init_completion(&req->r_completion); - req->r_nr_pages = nr_pages; - dout(30, "register_request %p tid %lld with %d pages\n", req, req->r_tid, nr_pages); - radix_tree_insert(&osdc->request_tree, req->r_tid, (void*)req); - return req; + dout(30, "register_request %p tid %lld\n", req, req->r_tid); + return radix_tree_insert(&osdc->request_tree, req->r_tid, (void*)req); } -static void send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) +static void send_request(struct ceph_osd_client *osdc, + struct ceph_osd_request *req) { int ruleno; int osds[10]; @@ -316,6 +314,109 @@ out: return ret; } + +int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) +{ + struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; + struct ceph_osd_reply_head *replyhead; + __s32 rc; + int bytes; + + /* register+send request */ + spin_lock(&osdc->lock); + rc = register_request(osdc, req); + if (rc < 0) { + spin_unlock(&osdc->lock); + return rc; + } + reqhead->osdmap_epoch = osdc->osdmap->epoch; + send_request(osdc, req); + spin_unlock(&osdc->lock); + + /* wait */ + dout(10, "do_request tid %llu waiting on %p\n", req->r_tid, req); + wait_for_completion(&req->r_completion); + dout(10, "do_request tid %llu got reply on %p\n", req->r_tid, req); + + spin_lock(&osdc->lock); + unregister_request(osdc, req); + spin_unlock(&osdc->lock); + + /* parse reply */ + replyhead = req->r_reply->front.iov_base; + rc = le32_to_cpu(replyhead->result); + bytes = le32_to_cpu(req->r_reply->hdr.data_len); + dout(10, "do_request result %d, %d bytes\n", rc, bytes); + if (rc < 0) + return rc; + return bytes; +} + + +int ceph_osdc_sync_read(struct ceph_osd_client *osdc, ceph_ino_t ino, + struct ceph_file_layout *layout, + __u64 off, __u64 len, + char __user *data) +{ + struct ceph_msg *reqm; + struct ceph_osd_request_head *reqhead; + struct ceph_osd_request *req; + __u64 toff = off, tlen = len; + int nr_pages, i; + __s32 rc; + + dout(10, "sync_read on ino %llx at %llu~%llu\n", ino, off, len); + + /* request msg */ + reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); + if (IS_ERR(reqm)) + return PTR_ERR(reqm); + reqhead = reqm->front.iov_base; + reqhead->oid.ino = ino; + reqhead->oid.rev = 0; + reqhead->flags = 0; + calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid, + &reqhead->offset, &reqhead->length); + if (tlen != 0) { + dout(10, " skipping last %llu, writing %llu~%llu\n", + tlen, off, len); + len -= tlen; + } + calc_object_layout(&reqhead->layout, &reqhead->oid, layout, + osdc->osdmap); + dout(10, "sync_read object block %u on %llu~%llu\n", + reqhead->oid.bno, reqhead->offset, reqhead->length); + + /* how many pages? */ + nr_pages = calc_pages_for(len, off); + + dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages); + + req = alloc_request(nr_pages, reqm); + if (IS_ERR(req)) + return PTR_ERR(req); + + /* allocate temp pages to hold data */ + for (i=0; ir_pages[i] = alloc_page(GFP_KERNEL); + if (req->r_pages[i] == NULL) { + req->r_nr_pages = i+1; + put_request(req); + return -ENOMEM; + } + } + reqm->nr_pages = nr_pages; + reqm->pages = req->r_pages; + reqm->hdr.data_len = cpu_to_le32(len); + reqm->hdr.data_off = cpu_to_le32(off); + + rc = do_request(osdc, req); + put_request(req); + dout(10, "sync_read result %d\n", rc); + return rc; +} + + /* * read a single page. */ @@ -324,10 +425,9 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, ceph_ino_t ino, loff_t off, loff_t len, struct page *page) { - struct ceph_msg *reqm, *reply; + struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; struct ceph_osd_request *req; - struct ceph_osd_reply_head *replyhead; __s32 rc; dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len); @@ -349,44 +449,17 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, ceph_ino_t ino, dout(10, "readpage object block %u on %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length); - req = alloc_request(1); + req = alloc_request(1, reqm); if (IS_ERR(req)) { ceph_msg_put(reqm); return PTR_ERR(req); } req->r_pages[0] = page; - - /* register+send request */ - spin_lock(&osdc->lock); - req = register_request(osdc, reqm, 1, req); - if (IS_ERR(req)) { - spin_unlock(&osdc->lock); - ceph_msg_put(reqm); - return PTR_ERR(req); - } - - reqhead->osdmap_epoch = osdc->osdmap->epoch; - - send_request(osdc, req); - spin_unlock(&osdc->lock); - /* wait */ - dout(10, "readpage tid %llu waiting on %p\n", req->r_tid, req); - wait_for_completion(&req->r_completion); - dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req); - - spin_lock(&osdc->lock); - unregister_request(osdc, req); - spin_unlock(&osdc->lock); - - reply = req->r_reply; - replyhead = reply->front.iov_base; - rc = le32_to_cpu(replyhead->result); - dout(10, "readpage result %d, read %d bytes\n", rc, - le32_to_cpu(reply->hdr.data_len)); - if (rc < 0) - return rc; - return le32_to_cpu(reply->hdr.data_len); + rc = do_request(osdc, req); + put_request(req); + dout(10, "readpage result %d\n", rc); + return rc; } /* @@ -399,10 +472,9 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, __u64 off, __u64 len, struct list_head *page_list, int nr_pages) { - struct ceph_msg *reqm, *reply; + struct ceph_msg *reqm; struct ceph_osd_request *req; struct ceph_osd_request_head *reqhead; - struct ceph_osd_reply_head *replyhead; struct page *page; pgoff_t next_index; int contig_pages; @@ -418,9 +490,14 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout(10, "readpages on ino %llx on %llu~%llu\n", ino, off, len); /* alloc request, w/ page vector */ - req = alloc_request(nr_pages); - if (req == 0) + reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); + if (IS_ERR(reqm)) + return PTR_ERR(reqm); + req = alloc_request(nr_pages, reqm); + if (req == 0) { + ceph_msg_put(reqm); return -ENOMEM; + } /* find adjacent pages */ next_index = list_entry(page_list->prev, struct page, lru)->index; @@ -434,18 +511,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, break; } dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages); - if (contig_pages == 0) + if (contig_pages == 0) { + put_request(req); return 0; + } len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK), len); dout(10, "readpages final extent is %llu~%llu\n", off, len); /* request msg */ - reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); - if (IS_ERR(reqm)) { - put_request(req); - return PTR_ERR(reqm); - } reqhead = reqm->front.iov_base; reqhead->oid.ino = ino; reqhead->oid.rev = 0; @@ -460,37 +534,10 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout(10, "readpages object block %u of %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length); - /* register request */ - spin_lock(&osdc->lock); - req = register_request(osdc, reqm, req->r_nr_pages, req); - if (IS_ERR(req)) { - ceph_msg_put(reqm); - spin_unlock(&osdc->lock); - return PTR_ERR(req); - } - - /* send request */ - reqhead->osdmap_epoch = osdc->osdmap->epoch; - send_request(osdc, req); - spin_unlock(&osdc->lock); - - /* wait */ - dout(10, "readpages tid %llu waiting on %p\n", req->r_tid, req); - wait_for_completion(&req->r_completion); - dout(10, "readpages tid %llu got reply on %p\n", req->r_tid, req); - - spin_lock(&osdc->lock); - unregister_request(osdc, req); - spin_unlock(&osdc->lock); - - reply = req->r_reply; - replyhead = reply->front.iov_base; - rc = le32_to_cpu(replyhead->result); - dout(10, "readpages result %d, read %d bytes\n", rc, - le32_to_cpu(reply->hdr.data_len)); - if (rc < 0) - return rc; - return le32_to_cpu(reply->hdr.data_len); + rc = do_request(osdc, req); + put_request(req); + dout(10, "readpages result %d\n", rc); + return rc; } @@ -501,10 +548,9 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, ceph_ino_t ino, struct ceph_file_layout *layout, __u64 off, __u64 len, const char __user *data) { - struct ceph_msg *reqm, *reply; + struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; struct ceph_osd_request *req; - struct ceph_osd_reply_head *replyhead; __u64 toff = off, tlen = len; int nr_pages, i, po, l, left; __s32 rc; @@ -535,13 +581,21 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, ceph_ino_t ino, nr_pages = calc_pages_for(len, off); dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages); - req = alloc_request(nr_pages); - if (IS_ERR(req)) + req = alloc_request(nr_pages, reqm); + if (IS_ERR(req)) { + ceph_msg_put(reqm); return PTR_ERR(req); + } /* copy data into a set of pages */ - for (i=0; ir_pages[i] = alloc_page(GFP_KERNEL); + if (req->r_pages[i] == NULL) { + req->r_nr_pages = i+1; + put_request(req); + return -ENOMEM; + } + } left = len; po = off & ~PAGE_MASK; for (i=0; ipages = req->r_pages; + reqm->nr_pages = nr_pages; + reqm->hdr.data_len = cpu_to_le32(len); + reqm->hdr.data_off = cpu_to_le32(off); - /* register+send request */ - spin_lock(&osdc->lock); - req = register_request(osdc, reqm, nr_pages, req); - if (IS_ERR(req)) { - ceph_msg_put(reqm); - spin_unlock(&osdc->lock); - return PTR_ERR(req); - } - req->r_request->pages = req->r_pages; - req->r_request->nr_pages = nr_pages; - req->r_request->hdr.data_len = cpu_to_le32(len); - req->r_request->hdr.data_off = cpu_to_le32(off); - reqhead->osdmap_epoch = osdc->osdmap->epoch; - send_request(osdc, req); - spin_unlock(&osdc->lock); - - /* wait */ - dout(10, "sync_write tid %llu waiting on %p\n", req->r_tid, req); - wait_for_completion(&req->r_completion); - dout(10, "sync_write tid %llu got reply on %p\n", req->r_tid, req); - - spin_lock(&osdc->lock); - unregister_request(osdc, req); - spin_unlock(&osdc->lock); - - reply = req->r_reply; - replyhead = reply->front.iov_base; - rc = le32_to_cpu(replyhead->result); - dout(10, "sync_write result %d\n", rc); + rc = do_request(osdc, req); put_request(req); + dout(10, "sync_write result %d\n", rc); if (rc < 0) return rc; return len; @@ -595,18 +626,17 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino, loff_t off, loff_t len, struct page **pagevec, int nr_pages) { - struct ceph_msg *reqm, *reply; + struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; struct ceph_osd_request *req; - struct ceph_osd_reply_head *replyhead; __u64 toff = off, tlen = len; - __s32 ret = 0; + int rc = 0; /* request + msg */ reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE); if (IS_ERR(reqm)) return PTR_ERR(reqm); - req = alloc_request(nr_pages); + req = alloc_request(nr_pages, reqm); if (IS_ERR(req)) { ceph_msg_put(reqm); return PTR_ERR(req); @@ -631,48 +661,20 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino, dout(10, "writepages object block %u is %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length); - /* register+send request */ - spin_lock(&osdc->lock); - req = register_request(osdc, reqm, nr_pages, req); - if (IS_ERR(req)) { - ceph_msg_put(reqm); - put_request(req); - spin_unlock(&osdc->lock); - return PTR_ERR(req); - } - - /* copy data into a page in a request message */ + /* copy pagevec */ memcpy(req->r_pages, pagevec, nr_pages * sizeof(struct page *)); - req->r_request->pages = req->r_pages; - req->r_request->nr_pages = req->r_nr_pages; - req->r_request->hdr.data_len = len; - req->r_request->hdr.data_off = off; - reqhead->osdmap_epoch = osdc->osdmap->epoch; - - dout(10, "writepages sending request\n"); - send_request(osdc, req); - spin_unlock(&osdc->lock); - - /* wait */ - dout(10, "writepages tid %llu waiting for reply on %p\n", - req->r_tid, req); - wait_for_completion(&req->r_completion); - dout(10, "writepages tid %llu got reply on %p\n", req->r_tid, req); - - spin_lock(&osdc->lock); - unregister_request(osdc, req); - spin_unlock(&osdc->lock); - - reply = req->r_reply; - replyhead = reply->front.iov_base; - ret = le32_to_cpu(replyhead->result); - dout(10, "writepages result %d\n", ret); + reqm->pages = req->r_pages; + reqm->nr_pages = req->r_nr_pages; + reqm->hdr.data_len = len; + reqm->hdr.data_off = off; + + rc = do_request(osdc, req); put_request(req); - - /* return error, or number of bytes written */ - if (ret < 0) - return ret; + dout(10, "writepages result %d\n", rc); + if (rc < 0) + return rc; return len; + } diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 87c4ac8cb7ffe..48bb3294d3c8e 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -79,6 +79,10 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino, loff_t off, loff_t len, struct page **pagevec, int nr_pages); +extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc, ceph_ino_t ino, + struct ceph_file_layout *layout, + __u64 off, __u64 len, + char __user *data); extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc, ceph_ino_t ino, struct ceph_file_layout *layout, __u64 off, __u64 len, diff --git a/src/kernel/super.c b/src/kernel/super.c index ca42c99f9f058..782eafa6edc92 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -220,7 +220,6 @@ enum { Opt_wsize, /* int args above */ Opt_ip, - Opt_sync, }; static match_table_t arg_tokens = { @@ -236,7 +235,6 @@ static match_table_t arg_tokens = { {Opt_wsize, "wsize=%d"}, /* int args above */ {Opt_ip, "ip=%s"}, - {Opt_sync, "sync"}, {-1, NULL} }; @@ -383,9 +381,6 @@ static int parse_mount_args(int flags, char *options, const char *dev_name, case Opt_wsize: args->wsize = intval; break; - case Opt_sync: - args->sync = 1; - break; default: BUG_ON(token); diff --git a/src/kernel/super.h b/src/kernel/super.h index 531853e746813..4d6a05413130e 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -63,7 +63,6 @@ struct ceph_mount_args { int num_mon; struct ceph_entity_addr mon_addr[5]; char path[100]; - int sync; int wsize; }; -- 2.39.5