struct page *page;
int want;
u64 offset, len;
+ struct ceph_osd_request_head *reqhead;
+ struct ceph_osd_op *op;
next = 0;
locked_pages = 0;
&ci->i_layout,
ceph_vino(inode),
offset, &len,
- CEPH_OSD_OP_WRITE, 0,
+ CEPH_OSD_OP_WRITE,
+ CEPH_OSD_OP_MODIFY |
+ CEPH_OSD_OP_ONDISK,
snapc, do_sync,
ci->i_truncate_seq,
ci->i_truncate_size);
(u64)locked_pages << PAGE_CACHE_SHIFT);
dout(10, "writepages got %d pages at %llu~%llu\n",
locked_pages, offset, len);
- rc = ceph_osdc_writepages_start(&client->osdc, req,
- len, locked_pages);
+
+ /* revise final length, page count */
+ req->r_num_pages = locked_pages;
+ reqhead = req->r_request->front.iov_base;
+ op = (void *)(reqhead + 1);
+ op->length = cpu_to_le64(len);
+ req->r_request->hdr.data_len = cpu_to_le32(len);
+
+ rc = ceph_osdc_start_request(&client->osdc, req);
req = NULL;
/*
* FIXME: if writepages_start fails (ENOMEM?) we should
ceph_get_cap_refs(ci, need, want, got, endoff));
}
+/*
+ * Take cap refs. Caller must already now we hold at least on ref on
+ * the caps in question or we don't know this is safe.
+ */
+void ceph_get_more_cap_refs(struct ceph_inode_info *ci, int caps)
+{
+ spin_lock(&ci->vfs_inode.i_lock);
+ __take_cap_refs(ci, caps);
+ spin_unlock(&ci->vfs_inode.i_lock);
+}
+
/*
* Release cap refs.
*
kfree(pages);
}
-static void release_page_vector(struct page **pages, int num_pages)
+void ceph_release_page_vector(struct page **pages, int num_pages)
{
int i;
for (i = 0; i < num_pages; i++) {
pages[i] = alloc_page(GFP_NOFS);
if (pages[i] == NULL) {
- release_page_vector(pages, i);
+ ceph_release_page_vector(pages, i);
return ERR_PTR(-ENOMEM);
}
}
if (file->f_flags & O_DIRECT)
put_page_vector(pages, num_pages);
else
- release_page_vector(pages, num_pages);
+ ceph_release_page_vector(pages, num_pages);
return ret;
}
+/*
+ * Write commit callback, called if we requested both an ACK and
+ * ONDISK commit reply from the OSD.
+ */
+static void sync_write_commit(struct ceph_osd_request *req)
+{
+ struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+
+ dout(10, "sync_write_commit %p tid %llu\n", req, req->r_tid);
+ spin_lock(&ci->i_listener_lock);
+ list_del_init(&req->r_unsafe_item);
+ spin_unlock(&ci->i_listener_lock);
+ ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+}
+
+/*
+ * Wait on any unsafe replies for the given inode. First wait on the
+ * newest request, and make that the upper bound. Then, if there are
+ * more requests, keep waiting on the oldest as long as it is still older
+ * than the original request.
+ */
+static void sync_write_wait(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct list_head *head = &ci->i_unsafe_writes;
+ struct ceph_osd_request *req;
+ u64 last_tid;
+
+ spin_lock(&ci->i_listener_lock);
+ if (list_empty(head))
+ goto out;
+
+ /* set upper bound as _last_ entry in chain */
+ req = list_entry(head->prev, struct ceph_osd_request,
+ r_unsafe_item);
+ last_tid = req->r_tid;
+
+ do {
+ ceph_osdc_get_request(req);
+ spin_unlock(&ci->i_listener_lock);
+ dout(10, "sync_write_wait on tid %llu (until %llu)\n",
+ req->r_tid, last_tid);
+ wait_for_completion(&req->r_safe_completion);
+ spin_lock(&ci->i_listener_lock);
+ ceph_osdc_put_request(req);
+
+ /*
+ * from here on look at first entry in chain, since we
+ * only want to wait for anything older than last_tid
+ */
+ req = list_entry(head->next, struct ceph_osd_request,
+ r_unsafe_item);
+ } while (req->r_tid <= last_tid);
+out:
+ spin_unlock(&ci->i_listener_lock);
+}
+
/*
* synchronous write. from userspace.
*
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_client *client = ceph_inode_to_client(inode);
- struct page **pages, **page_pos;
- int num_pages, pages_left;
+ struct ceph_osd_request *req;
+ struct page **pages;
+ int num_pages;
long long unsigned pos;
+ u64 len;
int written = 0;
int flags;
int do_sync = 0;
pos = i_size_read(inode);
else
pos = *offset;
- num_pages = calc_pages_for(pos, left);
+
+ flags = CEPH_OSD_OP_ORDERSNAP |
+ CEPH_OSD_OP_ONDISK |
+ CEPH_OSD_OP_MODIFY;
+ if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
+ flags |= CEPH_OSD_OP_ACK;
+ else
+ do_sync = 1;
+
+ /*
+ * we may need to do multiple writes here if we span an object
+ * boundary. this isn't atomic, unfortunately. :(
+ */
+more:
+ len = left;
+ req = ceph_osdc_new_request(&client->osdc, &ci->i_layout,
+ ceph_vino(inode), pos, &len,
+ CEPH_OSD_OP_WRITE, flags,
+ ci->i_snap_realm->cached_context,
+ do_sync,
+ ci->i_truncate_seq, ci->i_truncate_size);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ num_pages = calc_pages_for(pos, len);
if (file->f_flags & O_DIRECT) {
- pages = get_direct_page_vector(data, num_pages, pos, left);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
+ pages = get_direct_page_vector(data, num_pages, pos, len);
+ if (IS_ERR(pages)) {
+ ret = PTR_ERR(pages);
+ goto out;
+ }
/*
* throw out any page cache pages in this range. this
* may block.
*/
- truncate_inode_pages_range(inode->i_mapping, pos, pos+left);
+ truncate_inode_pages_range(inode->i_mapping, pos, pos+len);
} else {
pages = alloc_page_vector(num_pages);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
- ret = copy_user_to_page_vector(pages, data, pos, left);
- if (ret < 0)
+ if (IS_ERR(pages)) {
+ ret = PTR_ERR(pages);
goto out;
- }
-
- flags = CEPH_OSD_OP_ORDERSNAP;
- if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
- flags |= CEPH_OSD_OP_ACK;
- else
- do_sync = 1;
+ }
+ ret = copy_user_to_page_vector(pages, data, pos, len);
+ if (ret < 0) {
+ ceph_release_page_vector(pages, num_pages);
+ goto out;
+ }
- /*
- * we may need to do multiple writes here if we span an object
- * boundary. this isn't atomic, unfortunately. :(
- */
- page_pos = pages;
- pages_left = num_pages;
+ if ((file->f_flags & O_SYNC) == 0) {
+ /* get a second commit callback */
+ req->r_safe_callback = sync_write_commit;
+ req->r_own_pages = 1;
+ }
+ }
+ req->r_pages = pages;
+ req->r_num_pages = num_pages;
+ req->r_inode = inode;
+
+ ret = ceph_osdc_start_request(&client->osdc, req);
+ if (!ret) {
+ if (req->r_safe_callback) {
+ /*
+ * Add to inode unsafe list only after we
+ * start_request so that a tid has been assigned.
+ */
+ spin_lock(&ci->i_listener_lock);
+ list_add(&ci->i_unsafe_writes, &req->r_unsafe_item);
+ spin_unlock(&ci->i_listener_lock);
+ ceph_get_more_cap_refs(ci, CEPH_CAP_FILE_WR);
+ }
+ ret = ceph_osdc_wait_request(&client->osdc, req);
+ }
-more:
- ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode),
- &ci->i_layout,
- ci->i_snap_realm->cached_context,
- pos, left, ci->i_truncate_seq,
- ci->i_truncate_size,
- page_pos, pages_left,
- flags, do_sync);
- if (ret > 0) {
- int didpages =
- ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
+ if (file->f_flags & O_DIRECT)
+ put_page_vector(pages, num_pages);
+ else if (file->f_flags & O_SYNC)
+ ceph_release_page_vector(pages, num_pages);
- pos += ret;
- written += ret;
- left -= ret;
- if (left) {
- page_pos += didpages;
- pages_left -= didpages;
- BUG_ON(!pages_left);
+out:
+ ceph_osdc_put_request(req);
+ if (ret == 0) {
+ pos += len;
+ written += len;
+ left -= len;
+ if (left)
goto more;
- }
ret = written;
*offset = pos;
if (pos > i_size_read(inode))
ceph_inode_set_size(inode, pos);
}
-
-out:
- if (file->f_flags & O_DIRECT)
- put_page_vector(pages, num_pages);
- else
- release_page_vector(pages, num_pages);
return ret;
}
* Atomically grab references, so that those bits are not released
* back to the MDS mid-read.
*
- * Hmm, the sync reach case isn't actually async... should it be?
+ * Hmm, the sync read case isn't actually async... should it be?
*/
static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
unsigned long nr_segs, loff_t pos)
int ret;
dout(10, "fsync on inode %p\n", inode);
+ sync_write_wait(inode);
+
ret = filemap_write_and_wait(inode->i_mapping);
if (ret < 0)
return ret;
ci->i_wrbuffer_ref_head = 0;
ci->i_rdcache_gen = 0;
ci->i_rdcache_revoking = 0;
+ INIT_LIST_HEAD(&ci->i_unsafe_writes);
ci->i_snap_realm = NULL;
INIT_LIST_HEAD(&ci->i_snap_realm_item);
/*
* requests
*/
-static void get_request(struct ceph_osd_request *req)
-{
- atomic_inc(&req->r_ref);
-}
-
void ceph_osdc_put_request(struct ceph_osd_request *req)
{
dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
ceph_msg_put(req->r_request);
if (req->r_reply)
ceph_msg_put(req->r_reply);
+ if (req->r_own_pages)
+ ceph_release_page_vector(req->r_pages,
+ req->r_num_pages);
ceph_put_snap_context(req->r_snapc);
kfree(req);
}
if (req == NULL)
return ERR_PTR(-ENOMEM);
+ atomic_set(&req->r_ref, 1);
+ init_completion(&req->r_completion);
+ init_completion(&req->r_safe_completion);
+ INIT_LIST_HEAD(&req->r_unsafe_item);
+
/* create message */
if (snapc)
msg_size += sizeof(u64) * snapc->num_snaps;
calc_layout(osdc, vino, layout, off, plen, req);
req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
+ if (flags & CEPH_OSD_OP_MODIFY) {
+ req->r_request->hdr.data_off = cpu_to_le16(off);
+ req->r_request->hdr.data_len = cpu_to_le32(*plen);
+ }
+
/* additional ops */
if (do_trunc) {
op++;
- op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ?
+ op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ?
CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
op->truncate_seq = cpu_to_le32(truncate_seq);
prevofs = le64_to_cpu((op-1)->offset);
for (i = 0; i < snapc->num_snaps; i++)
snaps[i] = cpu_to_le64(snapc->snaps[i]);
}
-
- atomic_set(&req->r_ref, 1);
- init_completion(&req->r_completion);
return req;
}
-
/*
* Register request, assign tid. If this is the first request, set up
* the timeout event.
if (rc < 0)
goto out;
- get_request(req);
+ ceph_osdc_get_request(req);
osdc->num_requests++;
req->r_timeout_stamp =
container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_osd_request *req;
unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
- unsigned long next_timeout = timeout + jiffies;
+ unsigned long next_timeout = timeout + jiffies;
RADIX_TREE(pings, GFP_NOFS); /* only send 1 ping per osd */
u64 next_tid = 0;
int got;
reqhead = req->r_request->front.iov_base;
reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
+ reqhead->reassert_version = req->r_reassert_version;
req->r_request->hdr.dst.name.type =
cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
struct ceph_osd_request *req;
u64 tid;
- int numops;
+ int numops, flags;
if (msg->front.iov_len < sizeof(*rhead))
goto bad;
mutex_unlock(&osdc->request_mutex);
return;
}
- get_request(req);
- if (req->r_reply == NULL) {
- /* no data payload, or r_reply would have been set by
- prepare_pages. */
- ceph_msg_get(msg);
- req->r_reply = msg;
- } else if (req->r_reply == msg) {
- /* r_reply was set by prepare_pages; now it's fully read. */
- } else {
- dout(10, "handle_reply tid %llu already had reply?\n", tid);
+ ceph_osdc_get_request(req);
+ flags = le32_to_cpu(rhead->flags);
+
+ if (req->r_aborted) {
+ dout(10, "handle_reply tid %llu aborted\n", tid);
goto done;
}
- dout(10, "handle_reply tid %llu flags %d\n", tid,
- le32_to_cpu(rhead->flags));
- __unregister_request(osdc, req);
+
+ if (req->r_reassert_version.epoch == 0) {
+ /* first ack */
+ if (req->r_reply == NULL) {
+ /* no data payload, or r_reply would have been set by
+ prepare_pages. */
+ ceph_msg_get(msg);
+ req->r_reply = msg;
+ } else {
+ /* r_reply was set by prepare_pages */
+ BUG_ON(req->r_reply != msg);
+ }
+
+ /* in case we need to replay this op, */
+ req->r_reassert_version = rhead->reassert_version;
+ } else if ((flags & CEPH_OSD_OP_ONDISK) == 0) {
+ dout(10, "handle_reply tid %llu dup ack\n", tid);
+ goto done;
+ }
+
+ dout(10, "handle_reply tid %llu flags %d\n", tid, flags);
+
+ if (flags & CEPH_OSD_OP_ONDISK)
+ __unregister_request(osdc, req);
+
mutex_unlock(&osdc->request_mutex);
if (req->r_callback)
req->r_callback(req);
else
- complete(&req->r_completion); /* see do_sync_request */
+ complete(&req->r_completion);
+
+ if ((flags & CEPH_OSD_OP_ONDISK) && req->r_safe_callback) {
+ req->r_safe_callback(req);
+ complete(&req->r_safe_completion); /* fsync waiter */
+ }
+
done:
ceph_osdc_put_request(req);
return;
dout(20, "kicking tid %llu osd%d\n", req->r_tid,
req->r_last_osd);
- get_request(req);
+ ceph_osdc_get_request(req);
mutex_unlock(&osdc->request_mutex);
req->r_request = ceph_msg_maybe_dup(req->r_request);
if (!req->r_aborted) {
/*
* Register request, send initial attempt.
*/
-static int start_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+int ceph_osdc_start_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
int rc;
+ req->r_request->pages = req->r_pages;
+ req->r_request->nr_pages = req->r_num_pages;
+
rc = register_request(osdc, req);
if (rc < 0)
return rc;
+
down_read(&osdc->map_sem);
rc = send_request(osdc, req);
up_read(&osdc->map_sem);
return rc;
}
-/*
- * synchronously do an osd request.
- *
- * If we are interrupted, take our pages away from any previous sent
- * request message that may still be being written to the socket.
- */
-static int do_sync_request(struct ceph_osd_client *osdc,
+int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
struct ceph_osd_reply_head *replyhead;
__s32 rc;
int bytes;
- rc = start_request(osdc, req); /* register+send request */
- if (rc)
- return rc;
-
rc = wait_for_completion_interruptible(&req->r_completion);
if (rc < 0) {
- struct ceph_msg *msg;
-
- dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
- rc, req->r_request);
- /*
- * we were interrupted.
- *
- * mark req aborted _before_ revoking pages, so that
- * if a racing kick_request _does_ dup the page vec
- * pointer, it will definitely then see the aborted
- * flag and not send the request.
- */
- req->r_aborted = 1;
- msg = req->r_request;
- mutex_lock(&msg->page_mutex);
- msg->pages = NULL;
- mutex_unlock(&msg->page_mutex);
- if (req->r_reply) {
- mutex_lock(&req->r_reply->page_mutex);
- req->r_reply->pages = NULL;
- mutex_unlock(&req->r_reply->page_mutex);
- }
+ ceph_osdc_abort_request(osdc, req);
return rc;
}
replyhead = req->r_reply->front.iov_base;
rc = le32_to_cpu(replyhead->result);
bytes = le32_to_cpu(req->r_reply->hdr.data_len);
- dout(10, "do_sync_request tid %llu result %d, %d bytes\n",
+ dout(10, "wait_request tid %llu result %d, %d bytes\n",
req->r_tid, rc, bytes);
if (rc < 0)
return rc;
return bytes;
}
+
+/*
+ * To abort an in-progress request, take pages away from outgoing or
+ * incoming message.
+ */
+void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
+{
+ struct ceph_msg *msg;
+
+ dout(0, "abort_request tid %llu, revoking %p pages\n", req->r_tid,
+ req->r_request);
+ /*
+ * mark req aborted _before_ revoking pages, so that
+ * if a racing kick_request _does_ dup the page vec
+ * pointer, it will definitely then see the aborted
+ * flag and not send the request.
+ */
+ req->r_aborted = 1;
+ msg = req->r_request;
+ mutex_lock(&msg->page_mutex);
+ msg->pages = NULL;
+ mutex_unlock(&msg->page_mutex);
+ if (req->r_reply) {
+ mutex_lock(&req->r_reply->page_mutex);
+ req->r_reply->pages = NULL;
+ mutex_unlock(&req->r_reply->page_mutex);
+ }
+}
+
/*
* init, shutdown
*/
dout(10, "readpages final extent is %llu~%llu (%d pages)\n",
off, len, req->r_num_pages);
- rc = do_sync_request(osdc, req);
+
+ rc = ceph_osdc_start_request(osdc, req);
+ if (!rc)
+ rc = ceph_osdc_wait_request(osdc, req);
if (rc >= 0) {
read = rc;
struct page **pages, int num_pages,
int flags, int do_sync)
{
- struct ceph_msg *reqm;
struct ceph_osd_request *req;
int rc = 0;
dout(10, "writepages %llu~%llu (%d pages)\n", off, len,
req->r_num_pages);
- /* set up data payload */
- reqm = req->r_request;
- reqm->pages = pages;
- reqm->nr_pages = req->r_num_pages;
- reqm->hdr.data_len = cpu_to_le32(len);
- reqm->hdr.data_off = cpu_to_le16(off);
+ rc = ceph_osdc_start_request(osdc, req);
+ if (!rc)
+ rc = ceph_osdc_wait_request(osdc, req);
- rc = do_sync_request(osdc, req);
ceph_osdc_put_request(req);
if (rc == 0)
rc = len;
return rc;
}
-/*
- * start an async write
- */
-int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req,
- u64 len, int num_pages)
-{
- struct ceph_msg *reqm = req->r_request;
- struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
- struct ceph_osd_op *op = (void *)(reqhead + 1);
- u64 off = le64_to_cpu(op->offset);
- int rc;
- int flags;
-
- dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
-
- flags = CEPH_OSD_OP_MODIFY;
- if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
- flags |= CEPH_OSD_OP_ACK;
- else
- flags |= CEPH_OSD_OP_ONDISK;
- reqhead->flags = cpu_to_le32(flags);
- op->length = cpu_to_le64(len);
-
- /* reference pages in message */
- reqm->pages = req->r_pages;
- reqm->nr_pages = req->r_num_pages = num_pages;
- reqm->hdr.data_len = cpu_to_le32(len);
- reqm->hdr.data_off = cpu_to_le16(off);
-
- rc = start_request(osdc, req);
- return rc;
-}
-
int r_aborted; /* set if we cancel this request */
atomic_t r_ref;
- struct completion r_completion; /* on completion, or... */
- ceph_osdc_callback_t r_callback; /* ...async callback. */
- struct inode *r_inode; /* needed for async write */
- struct writeback_control *r_wbc;
+ struct completion r_completion, r_safe_completion;
+ ceph_osdc_callback_t r_callback, r_safe_callback;
+ struct ceph_eversion r_reassert_version;
+ struct list_head r_unsafe_item;
- int r_last_osd; /* pg osds */
+ struct inode *r_inode; /* for use by callbacks */
+ struct writeback_control *r_wbc; /* ditto */
+
+ int r_last_osd; /* pg osds */
struct ceph_entity_addr r_last_osd_addr;
unsigned long r_timeout_stamp;
struct ceph_snap_context *r_snapc; /* snap context for writes */
unsigned r_num_pages; /* size of page array (follows) */
struct page **r_pages; /* pages for data payload */
+ int r_own_pages; /* if true, i own page list */
};
struct ceph_osd_client {
struct ceph_snap_context *snapc,
int do_sync, u32 truncate_seq,
u64 truncate_size);
+
+static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
+{
+ atomic_inc(&req->r_ref);
+}
extern void ceph_osdc_put_request(struct ceph_osd_request *req);
+extern int ceph_osdc_start_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+extern void ceph_osdc_abort_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
+
+
extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
u32 truncate_seq, u64 truncate_size,
struct page **pages, int nr_pages,
int flags, int do_sync);
-extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req,
- u64 len,
- int nr_pages);
#endif
If it's non-zero, we _may_ have cached
pages. */
u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
+ struct list_head i_unsafe_writes; /* uncommitted sync writes */
struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
struct list_head i_snap_realm_item;
struct ceph_cap *new_cap);
extern void ceph_remove_cap(struct ceph_cap *cap);
extern int ceph_get_cap_mds(struct inode *inode);
+extern void ceph_get_more_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc);
struct nameidata *nd, int mode,
int locked_dir);
extern int ceph_release(struct inode *inode, struct file *filp);
-
+extern void ceph_release_page_vector(struct page **pages, int num_pages);
/* dir.c */
extern const struct file_operations ceph_dir_fops;