static int ceph_readpage(struct file *filp, struct page *page)
{
struct inode *inode = filp->f_dentry->d_inode;
- struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
int err = 0;
dout(10, "ceph_readpage file %p page %p index %lu\n", filp,
page, page->index);
+ dout(10, " inode %p\n", inode);
err = ceph_osdc_readpage(osdc, inode->i_ino, &ci->i_layout,
page->index << PAGE_SHIFT, PAGE_SIZE, page);
if (err) goto out_unlock;
if ((err = ceph_mdsc_do_request(mdsc, req, &rinfo, -1)) < 0)
return ERR_PTR(err);
- ino = le64_to_cpu(rinfo.trace_in[rinfo.trace_nr-1].in->ino);
- dout(10, "got and parsed stat result, ino %lu\n", ino);
- inode = iget(dir->i_sb, ino);
- if (!inode)
- return ERR_PTR(-EACCES);
- if ((err = ceph_fill_inode(inode, rinfo.trace_in[rinfo.trace_nr-1].in)) < 0)
- return ERR_PTR(err);
- d_add(dentry, inode);
+ if (rinfo.trace_nr > 0) {
+ ino = le64_to_cpu(rinfo.trace_in[rinfo.trace_nr-1].in->ino);
+ dout(10, "got and parsed stat result, ino %lu\n", ino);
+ inode = iget(dir->i_sb, ino);
+ if (!inode)
+ return ERR_PTR(-EACCES);
+ if ((err = ceph_fill_inode(inode, rinfo.trace_in[rinfo.trace_nr-1].in)) < 0)
+ return ERR_PTR(err);
+ d_add(dentry, inode);
+ } else {
+ dout(10, "no trace in reply? wtf.\n");
+ }
return NULL;
}
const struct file_operations ceph_file_fops = {
.open = ceph_open,
.release = ceph_release,
-/* .llseek = generic_file_llseek,
- .read = ceph_file_read,
+ .llseek = generic_file_llseek,
+ .read = do_sync_read,
+ .write = do_sync_write,
+ .aio_read = generic_file_aio_read,
+ .aio_write = generic_file_aio_write,
+/* .read = ceph_file_read,
.write = ceph_file_write,
.open = ceph_file_open,
// .release = ceph_dir_release,
int i;
inode->i_ino = le64_to_cpu(info->ino);
- inode->i_mode = le32_to_cpu(info->mode) | S_IFDIR;
+ inode->i_mode = le32_to_cpu(info->mode);
inode->i_uid = le32_to_cpu(info->uid);
inode->i_gid = le32_to_cpu(info->gid);
inode->i_nlink = le32_to_cpu(info->nlink);
inode->i_rdev = le32_to_cpu(info->rdev);
inode->i_blocks = 1;
inode->i_rdev = 0;
- dout(30, "new_inode ino=%lx by %d.%d sz=%llu\n", inode->i_ino,
- inode->i_uid, inode->i_gid, inode->i_size);
+ dout(30, "new_inode ino=%lx by %d.%d sz=%llu mode %o\n", inode->i_ino,
+ inode->i_uid, inode->i_gid, inode->i_size, inode->i_mode);
ceph_decode_timespec(&inode->i_atime, &info->atime);
ceph_decode_timespec(&inode->i_mtime, &info->mtime);
/* ceph inode */
dout(30, "inode %p, ci %p\n", inode, ci);
- ci->i_layout = info->layout; //swab?
+ ci->i_layout = info->layout;
+ dout(30, "inode layout %p su %d\n", &ci->i_layout, ci->i_layout.fl_stripe_unit);
if (le32_to_cpu(info->fragtree.nsplits) > 0) {
//ci->i_fragtree = kmalloc(...);
ci->i_wr_mtime.tv_sec = 0;
ci->i_wr_mtime.tv_usec = 0;
- //inode->i_mapping->a_ops = &ceph_aops;
+ inode->i_mapping->a_ops = &ceph_aops;
switch (inode->i_mode & S_IFMT) {
case S_IFIFO:
req->r_resend_mds = -1; /* forget any specific mds hint */
req->r_attempts++;
rhead = req->r_request->front.iov_base;
- rhead->retry_attempt = cpu_to_le32(req->r_attempts);
+ rhead->retry_attempt = cpu_to_le32(req->r_attempts-1);
rhead->oldest_client_tid = cpu_to_le64(get_oldest_tid(mdsc));
send_msg_mds(mdsc, req->r_request, mds);
/* pages */
con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK;
+ con->out_msg_pos.page_pos = m->hdr.data_off & ~PAGE_MASK;
con->out_msg_pos.data_pos = 0;
set_bit(WRITE_PENDING, &con->state);
}
}
-struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op, int nr_pages)
+struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op)
{
struct ceph_msg *req;
struct ceph_osd_request_head *head;
- int size = sizeof(struct ceph_osd_request_head) + nr_pages*(sizeof(void*));
- req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0);
+ req = ceph_msg_new(CEPH_MSG_OSD_OP, sizeof(struct ceph_osd_request_head), 0, 0, 0);
if (IS_ERR(req))
return req;
- req->nr_pages = nr_pages;
memset(req->front.iov_base, 0, req->front.iov_len);
head = req->front.iov_base;
}
struct ceph_osd_request *register_request(struct ceph_osd_client *osdc,
- struct ceph_msg *msg)
+ struct ceph_msg *msg,
+ int nr_pages)
{
struct ceph_osd_request *req;
struct ceph_osd_request_head *head = msg->front.iov_base;
- req = kmalloc(sizeof(*req), GFP_KERNEL);
+ req = kmalloc(sizeof(*req) + nr_pages*sizeof(void*), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);
req->r_tid = head->tid = ++osdc->last_tid;
req->r_result = 0;
atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */
init_completion(&req->r_completion);
+ req->r_nr_pages = nr_pages;
dout(30, "register_request %p tid %lld\n", req, req->r_tid);
radix_tree_insert(&osdc->request_tree, req->r_tid, (void*)req);
struct ceph_osd_request *req;
struct ceph_osd_reply_head *replyhead;
- dout(10, "readpage on ino %llu at %lld~%lld\n", ino, off, len);
+ dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len);
/* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 1);
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
reqhead = reqm->front.iov_base;
calc_file_object_mapping(layout, &off, &len, &reqhead->oid,
&reqhead->offset, &reqhead->length);
BUG_ON(len != 0);
- reqm->pages[0] = page;
calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
+ dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
- /* register request */
+ /* register+send request */
spin_lock(&osdc->lock);
- req = register_request(osdc, reqm);
+ req = register_request(osdc, reqm, 1);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
spin_unlock(&osdc->lock);
return PTR_ERR(req);
}
+ req->r_pages[0] = page;
reqhead->osdmap_epoch = osdc->osdmap->epoch;
- dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
-
- /* send */
send_request(osdc, req);
spin_unlock(&osdc->lock);
/* wait */
dout(10, "readpage waiting for reply on %p\n", req);
- while (!test_bit(REQUEST_DONE, &req->r_flags))
- wait_for_completion(&req->r_completion);
+ wait_for_completion(&req->r_completion);
dout(10, "readpage got reply on %p\n", req);
spin_lock(&osdc->lock);
{
struct ceph_object oid;
- BUG_ON(layout->fl_stripe_unit & PAGE_MASK);
+ BUG_ON(layout->fl_stripe_unit & ~PAGE_MASK);
/* map range onto objects */
oid.ino = ino;
unsigned first_oxlen;
loff_t t;
- BUG_ON(layout->fl_stripe_unit & PAGE_MASK);
+ BUG_ON((layout->fl_stripe_unit & ~PAGE_MASK) != 0);
su = *off / layout->fl_stripe_unit;
stripeno = su / layout->fl_stripe_count;
stripepos = su % layout->fl_stripe_count;
first_oxlen = min_t(loff_t, *len, layout->fl_stripe_unit);
*oxlen = first_oxlen;
- /* multiple stripe units in this object? */
+ /* multiple stripe units across this object? */
t = *len;
while (t > stripe_len && *oxoff + *oxlen < layout->fl_object_size) {
*oxlen += min_t(loff_t, layout->fl_stripe_unit, t);
len -= off;
}
nr += len >> PAGE_SHIFT;
- if (len & PAGE_MASK)
+ if (len & ~PAGE_MASK)
nr++;
return nr;
}
bufferlist data;
if (env.data_len) {
int left = env.data_len;
- if (env.data_off & PAGE_MASK) {
+ if (env.data_off & ~PAGE_MASK) {
// head
- int head = MIN(PAGE_SIZE - (env.data_off & PAGE_MASK),
+ int head = MIN(PAGE_SIZE - (env.data_off & ~PAGE_MASK),
(unsigned)left);
bp = buffer::create(head);
if (tcp_read( sd, bp.c_str(), head ) < 0)
}
// middle
- int middle = left & ~PAGE_MASK;
+ int middle = left & PAGE_MASK;
if (middle > 0) {
bp = buffer::create_page_aligned(middle);
if (tcp_read( sd, bp.c_str(), middle ) < 0)
op->get_offset(), op->get_length(),
bl);
reply->set_data(bl);
- reply->set_length(r);
- dout(15) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+ if (r >= 0)
+ reply->set_length(r);
+ else
+ reply->set_length(0);
+ dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
}
osd->logger->inc("c_rd");
osd->logger->inc("c_rdb", op->get_length());