#include <linux/namei.h>
-static ssize_t ceph_sync_write(struct file *file, const char __user *data,
- size_t count, loff_t *offset);
/*
* if err==0, caller is responsible for a put_session on *psession
};
+
+/*
+ * completely synchronous read and write methods. direct from __user
+ * buffer to osd.
+ */
+static ssize_t ceph_sync_read(struct file *file, char __user *data,
+ size_t count, loff_t *offset)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_client *client = ceph_inode_to_client(inode);
+ int ret = 0;
+ off_t pos = *offset;
+
+ dout(10, "sync_read on file %p %lld~%u\n", file, *offset,
+ (unsigned)count);
+
+ ret = ceph_osdc_sync_read(&client->osdc, ceph_ino(inode),
+ &ci->i_layout,
+ pos, count, data);
+ return ret;
+}
+
+static ssize_t ceph_sync_write(struct file *file, const char __user *data,
+ size_t count, loff_t *offset)
+{
+ struct inode *inode = file->f_dentry->d_inode;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_client *client = ceph_inode_to_client(inode);
+ int ret = 0;
+ off_t pos = *offset;
+
+ dout(10, "sync_write on file %p %lld~%u\n", file, *offset,
+ (unsigned)count);
+
+ if (file->f_flags & O_APPEND)
+ pos = i_size_read(inode);
+
+ ret = ceph_osdc_sync_write(&client->osdc, ceph_ino(inode),
+ &ci->i_layout,
+ pos, count, data);
+ if (ret <= 0)
+ return ret;
+ pos += ret;
+ *offset = pos;
+
+ spin_lock(&inode->i_lock);
+ if (pos > inode->i_size) {
+ inode->i_size = pos;
+ inode->i_blocks = (inode->i_size + 512 - 1) >> 9;
+ dout(10, "extending file size to %d\n", (int)inode->i_size);
+ }
+ spin_unlock(&inode->i_lock);
+
+ return ret;
+}
+
/*
* wrap do_sync_read and friends with checks for cap bits on the inode.
* atomically grab references, so that those bits are released mid-read.
&got));
if (ret < 0)
goto out;
- dout(10, "read %llx %llu~%u got cap refs on %d\n",
+ dout(10, "read %llx %llu~%u got cap refs %d\n",
ceph_ino(inode), *ppos, (unsigned)len, got);
- //if (got & CEPH_CAP_RDCACHE) {
- ret = do_sync_read(filp, buf, len, ppos);
+ if ((got & CEPH_CAP_RDCACHE) == 0 ||
+ (inode->i_sb->s_flags & MS_SYNCHRONOUS))
+ ret = ceph_sync_read(filp, buf, len, ppos);
+ else
+ ret = do_sync_read(filp, buf, len, ppos);
out:
dout(10, "read %llx dropping cap refs on %d\n", ceph_ino(inode), got);
{
struct inode *inode = filp->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_client *client = ceph_inode_to_client(inode);
ssize_t ret;
int got = 0;
goto out;
dout(10, "write got cap refs on %d\n", got);
- if ((got & CEPH_CAP_RDCACHE) && !client->mount_args.sync)
- ret = do_sync_write(filp, buf, len, ppos);
- else
+ if ((got & CEPH_CAP_WRBUFFER) == 0 ||
+ (inode->i_sb->s_flags & MS_SYNCHRONOUS))
ret = ceph_sync_write(filp, buf, len, ppos);
+ else
+ ret = do_sync_write(filp, buf, len, ppos);
out:
dout(10, "write dropping cap refs on %d\n", got);
-/*
- * totally naive write. just to get things sort of working.
- * ugly hack!
- */
-static ssize_t ceph_sync_write(struct file *file, const char __user *data,
- size_t count, loff_t *offset)
-{
- struct inode *inode = file->f_dentry->d_inode;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
- int ret = 0;
- off_t pos = *offset;
-
- dout(10, "sync_write on file %p %lld~%u\n", file, *offset,
- (unsigned)count);
-
- if (file->f_flags & O_APPEND)
- pos = i_size_read(inode);
-
- ret = ceph_osdc_sync_write(osdc, ceph_ino(inode),
- &ci->i_layout,
- pos, count, data);
- if (ret <= 0)
- return ret;
- pos += ret;
-
- spin_lock(&inode->i_lock);
- if (pos > inode->i_size) {
- inode->i_size = pos;
- inode->i_blocks = (inode->i_size + 512 - 1) >> 9;
- dout(10, "extending file size to %d\n", (int)inode->i_size);
- }
- spin_unlock(&inode->i_lock);
- invalidate_inode_pages2(inode->i_mapping);
-
- *offset = pos;
-
- return ret;
-}
-
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
return req;
}
-static struct ceph_osd_request *alloc_request(int nr_pages)
+static struct ceph_osd_request *alloc_request(int nr_pages,
+ struct ceph_msg *msg)
{
struct ceph_osd_request *req;
req = kmalloc(sizeof(*req) + nr_pages*sizeof(void*), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);
-
+ req->r_request = msg;
+ req->r_nr_pages = nr_pages;
return req;
}
-struct ceph_osd_request *register_request(struct ceph_osd_client *osdc,
- struct ceph_msg *msg,
- int nr_pages,
- struct ceph_osd_request *req)
+static int register_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
- struct ceph_osd_request_head *head = msg->front.iov_base;
+ struct ceph_osd_request_head *head = req->r_request->front.iov_base;
req->r_tid = head->tid = ++osdc->last_tid;
req->r_flags = 0;
- req->r_request = msg;
req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
req->r_reply = 0;
req->r_result = 0;
atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */
init_completion(&req->r_completion);
- req->r_nr_pages = nr_pages;
- dout(30, "register_request %p tid %lld with %d pages\n", req, req->r_tid, nr_pages);
- radix_tree_insert(&osdc->request_tree, req->r_tid, (void*)req);
- return req;
+ dout(30, "register_request %p tid %lld\n", req, req->r_tid);
+ return radix_tree_insert(&osdc->request_tree, req->r_tid, (void*)req);
}
-static void send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+static void send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
int ruleno;
int osds[10];
return ret;
}
+
+int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+{
+ struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+ struct ceph_osd_reply_head *replyhead;
+ __s32 rc;
+ int bytes;
+
+ /* register+send request */
+ spin_lock(&osdc->lock);
+ rc = register_request(osdc, req);
+ if (rc < 0) {
+ spin_unlock(&osdc->lock);
+ return rc;
+ }
+ reqhead->osdmap_epoch = osdc->osdmap->epoch;
+ send_request(osdc, req);
+ spin_unlock(&osdc->lock);
+
+ /* wait */
+ dout(10, "do_request tid %llu waiting on %p\n", req->r_tid, req);
+ wait_for_completion(&req->r_completion);
+ dout(10, "do_request tid %llu got reply on %p\n", req->r_tid, req);
+
+ spin_lock(&osdc->lock);
+ unregister_request(osdc, req);
+ spin_unlock(&osdc->lock);
+
+ /* parse reply */
+ replyhead = req->r_reply->front.iov_base;
+ rc = le32_to_cpu(replyhead->result);
+ bytes = le32_to_cpu(req->r_reply->hdr.data_len);
+ dout(10, "do_request result %d, %d bytes\n", rc, bytes);
+ if (rc < 0)
+ return rc;
+ return bytes;
+}
+
+
+int ceph_osdc_sync_read(struct ceph_osd_client *osdc, ceph_ino_t ino,
+ struct ceph_file_layout *layout,
+ __u64 off, __u64 len,
+ char __user *data)
+{
+ struct ceph_msg *reqm;
+ struct ceph_osd_request_head *reqhead;
+ struct ceph_osd_request *req;
+ __u64 toff = off, tlen = len;
+ int nr_pages, i;
+ __s32 rc;
+
+ dout(10, "sync_read on ino %llx at %llu~%llu\n", ino, off, len);
+
+ /* request msg */
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+ if (IS_ERR(reqm))
+ return PTR_ERR(reqm);
+ reqhead = reqm->front.iov_base;
+ reqhead->oid.ino = ino;
+ reqhead->oid.rev = 0;
+ reqhead->flags = 0;
+ calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid,
+ &reqhead->offset, &reqhead->length);
+ if (tlen != 0) {
+ dout(10, " skipping last %llu, writing %llu~%llu\n",
+ tlen, off, len);
+ len -= tlen;
+ }
+ calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+ osdc->osdmap);
+ dout(10, "sync_read object block %u on %llu~%llu\n",
+ reqhead->oid.bno, reqhead->offset, reqhead->length);
+
+ /* how many pages? */
+ nr_pages = calc_pages_for(len, off);
+
+ dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages);
+
+ req = alloc_request(nr_pages, reqm);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ /* allocate temp pages to hold data */
+ for (i=0; i<nr_pages; i++) {
+ req->r_pages[i] = alloc_page(GFP_KERNEL);
+ if (req->r_pages[i] == NULL) {
+ req->r_nr_pages = i+1;
+ put_request(req);
+ return -ENOMEM;
+ }
+ }
+ reqm->nr_pages = nr_pages;
+ reqm->pages = req->r_pages;
+ reqm->hdr.data_len = cpu_to_le32(len);
+ reqm->hdr.data_off = cpu_to_le32(off);
+
+ rc = do_request(osdc, req);
+ put_request(req);
+ dout(10, "sync_read result %d\n", rc);
+ return rc;
+}
+
+
/*
* read a single page.
*/
loff_t off, loff_t len,
struct page *page)
{
- struct ceph_msg *reqm, *reply;
+ struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
- struct ceph_osd_reply_head *replyhead;
__s32 rc;
dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len);
dout(10, "readpage object block %u on %llu~%llu\n",
reqhead->oid.bno, reqhead->offset, reqhead->length);
- req = alloc_request(1);
+ req = alloc_request(1, reqm);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
return PTR_ERR(req);
}
req->r_pages[0] = page;
-
- /* register+send request */
- spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, 1, req);
- if (IS_ERR(req)) {
- spin_unlock(&osdc->lock);
- ceph_msg_put(reqm);
- return PTR_ERR(req);
- }
-
- reqhead->osdmap_epoch = osdc->osdmap->epoch;
-
- send_request(osdc, req);
- spin_unlock(&osdc->lock);
- /* wait */
- dout(10, "readpage tid %llu waiting on %p\n", req->r_tid, req);
- wait_for_completion(&req->r_completion);
- dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req);
-
- spin_lock(&osdc->lock);
- unregister_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- reply = req->r_reply;
- replyhead = reply->front.iov_base;
- rc = le32_to_cpu(replyhead->result);
- dout(10, "readpage result %d, read %d bytes\n", rc,
- le32_to_cpu(reply->hdr.data_len));
- if (rc < 0)
- return rc;
- return le32_to_cpu(reply->hdr.data_len);
+ rc = do_request(osdc, req);
+ put_request(req);
+ dout(10, "readpage result %d\n", rc);
+ return rc;
}
/*
__u64 off, __u64 len,
struct list_head *page_list, int nr_pages)
{
- struct ceph_msg *reqm, *reply;
+ struct ceph_msg *reqm;
struct ceph_osd_request *req;
struct ceph_osd_request_head *reqhead;
- struct ceph_osd_reply_head *replyhead;
struct page *page;
pgoff_t next_index;
int contig_pages;
dout(10, "readpages on ino %llx on %llu~%llu\n", ino, off, len);
/* alloc request, w/ page vector */
- req = alloc_request(nr_pages);
- if (req == 0)
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+ if (IS_ERR(reqm))
+ return PTR_ERR(reqm);
+ req = alloc_request(nr_pages, reqm);
+ if (req == 0) {
+ ceph_msg_put(reqm);
return -ENOMEM;
+ }
/* find adjacent pages */
next_index = list_entry(page_list->prev, struct page, lru)->index;
break;
}
dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
- if (contig_pages == 0)
+ if (contig_pages == 0) {
+ put_request(req);
return 0;
+ }
len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK),
len);
dout(10, "readpages final extent is %llu~%llu\n", off, len);
/* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
- if (IS_ERR(reqm)) {
- put_request(req);
- return PTR_ERR(reqm);
- }
reqhead = reqm->front.iov_base;
reqhead->oid.ino = ino;
reqhead->oid.rev = 0;
dout(10, "readpages object block %u of %llu~%llu\n", reqhead->oid.bno,
reqhead->offset, reqhead->length);
- /* register request */
- spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, req->r_nr_pages, req);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
- spin_unlock(&osdc->lock);
- return PTR_ERR(req);
- }
-
- /* send request */
- reqhead->osdmap_epoch = osdc->osdmap->epoch;
- send_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- /* wait */
- dout(10, "readpages tid %llu waiting on %p\n", req->r_tid, req);
- wait_for_completion(&req->r_completion);
- dout(10, "readpages tid %llu got reply on %p\n", req->r_tid, req);
-
- spin_lock(&osdc->lock);
- unregister_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- reply = req->r_reply;
- replyhead = reply->front.iov_base;
- rc = le32_to_cpu(replyhead->result);
- dout(10, "readpages result %d, read %d bytes\n", rc,
- le32_to_cpu(reply->hdr.data_len));
- if (rc < 0)
- return rc;
- return le32_to_cpu(reply->hdr.data_len);
+ rc = do_request(osdc, req);
+ put_request(req);
+ dout(10, "readpages result %d\n", rc);
+ return rc;
}
struct ceph_file_layout *layout,
__u64 off, __u64 len, const char __user *data)
{
- struct ceph_msg *reqm, *reply;
+ struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
- struct ceph_osd_reply_head *replyhead;
__u64 toff = off, tlen = len;
int nr_pages, i, po, l, left;
__s32 rc;
nr_pages = calc_pages_for(len, off);
dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages);
- req = alloc_request(nr_pages);
- if (IS_ERR(req))
+ req = alloc_request(nr_pages, reqm);
+ if (IS_ERR(req)) {
+ ceph_msg_put(reqm);
return PTR_ERR(req);
+ }
/* copy data into a set of pages */
- for (i=0; i<nr_pages; i++)
+ for (i=0; i<nr_pages; i++) {
req->r_pages[i] = alloc_page(GFP_KERNEL);
+ if (req->r_pages[i] == NULL) {
+ req->r_nr_pages = i+1;
+ put_request(req);
+ return -ENOMEM;
+ }
+ }
left = len;
po = off & ~PAGE_MASK;
for (i=0; i<nr_pages; i++) {
left -= l;
po = 0;
}
+ reqm->pages = req->r_pages;
+ reqm->nr_pages = nr_pages;
+ reqm->hdr.data_len = cpu_to_le32(len);
+ reqm->hdr.data_off = cpu_to_le32(off);
- /* register+send request */
- spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, nr_pages, req);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
- spin_unlock(&osdc->lock);
- return PTR_ERR(req);
- }
- req->r_request->pages = req->r_pages;
- req->r_request->nr_pages = nr_pages;
- req->r_request->hdr.data_len = cpu_to_le32(len);
- req->r_request->hdr.data_off = cpu_to_le32(off);
- reqhead->osdmap_epoch = osdc->osdmap->epoch;
- send_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- /* wait */
- dout(10, "sync_write tid %llu waiting on %p\n", req->r_tid, req);
- wait_for_completion(&req->r_completion);
- dout(10, "sync_write tid %llu got reply on %p\n", req->r_tid, req);
-
- spin_lock(&osdc->lock);
- unregister_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- reply = req->r_reply;
- replyhead = reply->front.iov_base;
- rc = le32_to_cpu(replyhead->result);
- dout(10, "sync_write result %d\n", rc);
+ rc = do_request(osdc, req);
put_request(req);
+ dout(10, "sync_write result %d\n", rc);
if (rc < 0)
return rc;
return len;
loff_t off, loff_t len,
struct page **pagevec, int nr_pages)
{
- struct ceph_msg *reqm, *reply;
+ struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
- struct ceph_osd_reply_head *replyhead;
__u64 toff = off, tlen = len;
- __s32 ret = 0;
+ int rc = 0;
/* request + msg */
reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
- req = alloc_request(nr_pages);
+ req = alloc_request(nr_pages, reqm);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
return PTR_ERR(req);
dout(10, "writepages object block %u is %llu~%llu\n",
reqhead->oid.bno, reqhead->offset, reqhead->length);
- /* register+send request */
- spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, nr_pages, req);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
- put_request(req);
- spin_unlock(&osdc->lock);
- return PTR_ERR(req);
- }
-
- /* copy data into a page in a request message */
+ /* copy pagevec */
memcpy(req->r_pages, pagevec, nr_pages * sizeof(struct page *));
- req->r_request->pages = req->r_pages;
- req->r_request->nr_pages = req->r_nr_pages;
- req->r_request->hdr.data_len = len;
- req->r_request->hdr.data_off = off;
- reqhead->osdmap_epoch = osdc->osdmap->epoch;
-
- dout(10, "writepages sending request\n");
- send_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- /* wait */
- dout(10, "writepages tid %llu waiting for reply on %p\n",
- req->r_tid, req);
- wait_for_completion(&req->r_completion);
- dout(10, "writepages tid %llu got reply on %p\n", req->r_tid, req);
-
- spin_lock(&osdc->lock);
- unregister_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- reply = req->r_reply;
- replyhead = reply->front.iov_base;
- ret = le32_to_cpu(replyhead->result);
- dout(10, "writepages result %d\n", ret);
+ reqm->pages = req->r_pages;
+ reqm->nr_pages = req->r_nr_pages;
+ reqm->hdr.data_len = len;
+ reqm->hdr.data_off = off;
+
+ rc = do_request(osdc, req);
put_request(req);
-
- /* return error, or number of bytes written */
- if (ret < 0)
- return ret;
+ dout(10, "writepages result %d\n", rc);
+ if (rc < 0)
+ return rc;
return len;
+
}