-#include <linux/slab.h>
-#include <linux/err.h>
#include <asm/uaccess.h>
+#include <linux/err.h>
+#include <linux/highmem.h>
#include <linux/mm.h>
-#include <linux/highmem.h> /* kmap, kunmap */
-#include <linux/pagemap.h> /* read_cache_pages */
+#include <linux/pagemap.h>
+#include <linux/slab.h>
#include "ceph_debug.h"
#include "crush/mapper.h"
#include "decode.h"
-struct ceph_readdesc {
- struct ceph_osd_client *osdc;
- struct ceph_file_layout *layout;
- loff_t off;
- loff_t len;
-};
+static void reschedule_timeout(struct ceph_osd_client *osdc);
/*
- * calculate the mapping of an extent onto an object, and fill out the
- * request accordingly. shorten extent as necessary if it hits an
+ * calculate the mapping of a file extent onto an object, and fill out the
+ * request accordingly. shorten extent as necessary if it crosses an
* object boundary.
*/
static void calc_layout(struct ceph_osd_client *osdc,
struct ceph_vino vino, struct ceph_file_layout *layout,
- __u64 off, __u64 *plen,
+ u64 off, u64 *plen,
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
- __u64 orig_len = *plen;
- __u64 objoff, objlen;
+ u64 orig_len = *plen;
+ u64 objoff, objlen; /* extent in object */
+ /* object extent? */
reqhead->oid.ino = cpu_to_le64(vino.ino);
reqhead->oid.snap = cpu_to_le64(vino.snap);
calc_file_object_mapping(layout, off, plen, &reqhead->oid,
&objoff, &objlen);
if (*plen < orig_len)
- dout(10, " skipping last %llu, writing %llu~%llu\n",
+ dout(10, " skipping last %llu, final file extent %llu~%llu\n",
orig_len - *plen, off, *plen);
reqhead->offset = cpu_to_le64(objoff);
reqhead->length = cpu_to_le64(objlen);
+ req->r_num_pages = calc_pages_for(off, *plen);
+ /* pgid? */
calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
osdc->osdmap);
- req->r_num_pages = calc_pages_for(off, *plen);
- dout(10, "calc_layout %08llx.%08x %llu~%llu pgid %llx (%d pages)\n",
+ dout(10, "calc_layout %llx.%08x %llu~%llu pgid %llx (%d pages)\n",
le64_to_cpu(reqhead->oid.ino), le32_to_cpu(reqhead->oid.bno),
objoff, objlen, le64_to_cpu(reqhead->layout.ol_pgid),
req->r_num_pages);
/*
* requests
*/
-
static void get_request(struct ceph_osd_request *req)
{
atomic_inc(&req->r_ref);
}
}
-
-
+/*
+ * build osd request message only.
+ */
static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
- struct ceph_snap_context *snapc)
+ struct ceph_snap_context *snapc)
{
struct ceph_msg *req;
struct ceph_osd_request_head *head;
head->client_inc = cpu_to_le32(1); /* always, for now. */
head->flags = 0;
- /* snaps */
if (snapc) {
head->snap_seq = cpu_to_le64(snapc->seq);
head->num_snaps = cpu_to_le32(snapc->num_snaps);
memcpy(req->front.iov_base + sizeof(*head), snapc->snaps,
snapc->num_snaps*sizeof(u64));
- dout(10, "snapc seq %lld %d snaps\n", snapc->seq,
- snapc->num_snaps);
}
return req;
}
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.
+ */
struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct ceph_file_layout *layout,
struct ceph_vino vino,
- __u64 off, __u64 *plen, int op,
+ u64 off, u64 *plen, int op,
struct ceph_snap_context *snapc)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
int num_pages = calc_pages_for(off, *plen);
+ struct ceph_osd_request_head *head;
+ /* we may overallocate here, if our write extent is shortened below */
req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
req->r_request = msg;
req->r_snapc = ceph_get_snap_context(snapc);
- /* calculate max write size */
+ /* calculate max write size, pgid */
calc_layout(osdc, vino, layout, off, plen, req);
+ head = msg->front.iov_base;
+ req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
+
atomic_set(&req->r_ref, 1);
+ init_completion(&req->r_completion);
return req;
}
-static void reschedule_timeout(struct ceph_osd_client *osdc)
-{
- int timeout = osdc->client->mount_args.osd_timeout;
- dout(10, "reschedule timeout (%d seconds)\n", timeout);
- schedule_delayed_work(&osdc->timeout_work,
- round_jiffies_relative(timeout*HZ));
-}
+/*
+ * register request, assign tid.
+ */
static int register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *head = req->r_request->front.iov_base;
int rc;
- rc = radix_tree_preload(GFP_NOFS);
- if (rc < 0) {
- derr(10, "ENOMEM in register_request\n");
- return rc;
- }
-
mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid;
head->tid = cpu_to_le64(req->r_tid);
- req->r_flags = 0;
- req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
- req->r_reply = NULL;
- req->r_result = 0;
- init_completion(&req->r_completion);
dout(30, "register_request %p tid %lld\n", req, req->r_tid);
- get_request(req);
rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
+ if (rc < 0)
+ goto out;
- if (osdc->nr_requests == 0)
+ get_request(req);
+ if (osdc->num_requests == 0)
reschedule_timeout(osdc);
- osdc->nr_requests++;
+ osdc->num_requests++;
+out:
mutex_unlock(&osdc->request_mutex);
- radix_tree_preload_end();
-
return rc;
}
+/*
+ * called under osdc->request_mutex
+ */
static void __unregister_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+ struct ceph_osd_request *req)
{
dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
radix_tree_delete(&osdc->request_tree, req->r_tid);
- osdc->nr_requests--;
+ osdc->num_requests--;
cancel_delayed_work(&osdc->timeout_work);
- if (osdc->nr_requests)
+ if (osdc->num_requests)
reschedule_timeout(osdc);
ceph_osdc_put_request(req);
}
/*
- * pick an osd. the pg primary, namely.
+ * pick an osd. the first up osd in the pg. or -1.
+ * caller should hold map_sem for read.
*/
static int pick_osd(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
int ruleno;
- int i;
- int pps; /* placement ps */
+ unsigned pps; /* placement ps */
int osds[10];
- int nr_osds;
+ int i, num;
ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool,
req->r_pgid.pg.type, req->r_pgid.pg.size);
pps = ceph_stable_mod(req->r_pgid.pg.ps,
osdc->osdmap->pgp_num,
osdc->osdmap->pgp_num_mask);
- nr_osds = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
- req->r_pgid.pg.size, req->r_pgid.pg.preferred);
+ num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
+ min_t(int, req->r_pgid.pg.size, ARRAY_SIZE(osds)),
+ req->r_pgid.pg.preferred);
- for (i = 0; i < nr_osds; i++)
+ for (i = 0; i < num; i++)
if (ceph_osd_is_up(osdc->osdmap, osds[i]))
return osds[i];
return -1;
* caller should hold map_sem (for read)
*/
static int send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req,
- int osd)
+ struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
- int rc;
+ int osd;
- if (osd < 0)
- osd = pick_osd(osdc, req);
+ osd = pick_osd(osdc, req);
if (osd < 0) {
dout(10, "send_request %p no up osds in pg\n", req);
ceph_monc_request_osdmap(&osdc->client->monc,
reqhead = req->r_request->front.iov_base;
reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
+ reqhead->flags |= req->r_flags; /* e.g., RETRY */
req->r_request->hdr.dst.name.type =
cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
+
req->r_last_osd = osd;
req->r_last_osd_addr = req->r_request->hdr.dst.addr;
req->r_last_stamp = jiffies;
ceph_msg_get(req->r_request); /* send consumes a ref */
- rc = ceph_msg_send(osdc->client->msgr, req->r_request,
- BASE_DELAY_INTERVAL);
-
- return rc;
+ return ceph_msg_send(osdc->client->msgr, req->r_request,
+ BASE_DELAY_INTERVAL);
}
/*
- * handle osd op reply
+ * handle osd op reply. either call the callback if it is specified,
+ * or do the completion to wake up the waiting thread.
*/
void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
return;
}
get_request(req);
-
if (req->r_reply == NULL) {
- dout(10, "handle_reply tid %llu saving reply\n", tid);
+ /* no data payload, or r_reply would have been set by
+ prepare_pages. */
ceph_msg_get(msg);
req->r_reply = msg;
- } else if (req->r_reply == msg)
- dout(10, "handle_reply tid %llu already had this reply\n", tid);
- else {
- dout(10, "handle_reply tid %llu had OTHER reply?\n", tid);
+ } else if (req->r_reply == msg) {
+ /* r_reply was set by prepare_pages; now it's fully read. */
+ } else {
+ dout(10, "handle_reply tid %llu already had reply?\n", tid);
goto done;
}
- dout(10, "handle_reply tid %llu flags %d |= %d\n", tid, req->r_flags,
- rhead->flags);
- req->r_flags |= le32_to_cpu(rhead->flags);
+ dout(10, "handle_reply tid %llu flags %d\n", tid,
+ le32_to_cpu(rhead->flags));
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
/*
- * caller should hold read sem
+ * Resubmit osd requests whose osd or osd address has changed. Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_entity_addr *who)
{
- u64 next_tid = 0;
struct ceph_osd_request *req;
+ u64 next_tid = 0;
int got;
int osd;
int needmap = 0;
next_tid, 1);
if (got == 0)
break;
-
next_tid = req->r_tid + 1;
osd = pick_osd(osdc, req);
- if (osd < 0) {
- dout(20, "tid %llu maps to no osd\n",
- req->r_tid);
+ if (osd < 0 || osd >= osdc->osdmap->max_osd) {
+ dout(20, "tid %llu maps to no valid osd\n", req->r_tid);
needmap++; /* request a newer map */
req->r_last_osd = -1;
memset(&req->r_last_osd_addr, 0,
who))) {
dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd);
get_request(req);
+ mutex_unlock(&osdc->request_mutex);
req->r_request = ceph_msg_maybe_dup(req->r_request);
if (!req->r_aborted) {
req->r_flags |= CEPH_OSD_OP_RETRY;
- send_request(osdc, req, osd);
+ send_request(osdc, req);
}
ceph_osdc_put_request(req);
+ mutex_lock(&osdc->request_mutex);
}
}
-
mutex_unlock(&osdc->request_mutex);
+
if (needmap) {
- dout(10, "%d requests pending on down osds, need new map\n",
- needmap);
+ dout(10, "%d requests for down osds, need new map\n", needmap);
ceph_monc_request_osdmap(&osdc->client->monc,
osdc->osdmap->epoch);
}
}
+/*
+ * Process updated osd map.
+ *
+ * The message contains any number of incremental and full maps.
+ */
void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
void *p, *end, *next;
__u32 nr_maps, maplen;
__u32 epoch;
- struct ceph_osdmap *newmap = NULL;
+ struct ceph_osdmap *newmap = NULL, *oldmap;
int err;
struct ceph_fsid fsid;
/* full maps */
ceph_decode_32_safe(&p, end, nr_maps, bad);
dout(30, " %d full maps\n", nr_maps);
- while (nr_maps > 1) {
+ while (nr_maps) {
ceph_decode_need(&p, end, 2*sizeof(__u32), bad);
ceph_decode_32(&p, epoch);
ceph_decode_32(&p, maplen);
ceph_decode_need(&p, end, maplen, bad);
- dout(5, "skipping non-latest full map %u len %d\n",
- epoch, maplen);
- p += maplen;
- nr_maps--;
- }
- if (nr_maps) {
- ceph_decode_need(&p, end, 2*sizeof(__u32), bad);
- ceph_decode_32(&p, epoch);
- ceph_decode_32(&p, maplen);
- ceph_decode_need(&p, end, maplen, bad);
- if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
+ if (nr_maps > 1) {
+ dout(5, "skipping non-latest full map %u len %d\n",
+ epoch, maplen);
+ } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
dout(10, "skipping full map %u len %d, "
"older than our %u\n", epoch, maplen,
osdc->osdmap->epoch);
- p += maplen;
} else {
dout(10, "taking full map %u len %d\n", epoch, maplen);
newmap = osdmap_decode(&p, p+maplen);
err = PTR_ERR(newmap);
goto bad;
}
- if (osdc->osdmap)
- osdmap_destroy(osdc->osdmap);
+ oldmap = osdc->osdmap;
osdc->osdmap = newmap;
+ if (oldmap)
+ osdmap_destroy(oldmap);
}
+ p += maplen;
+ nr_maps--;
}
done:
return;
}
-
+/*
+ * If we detect that a tcp connection to an osd resets, we need to
+ * resubmit all requests for that osd. That's because although we reliably
+ * deliver our requests, the osd doesn't not try as hard to deliver the
+ * reply (because it does not get notification when clients, mds' leave
+ * the cluster).
+ */
void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
struct ceph_entity_addr *addr)
{
/*
- * find pages for message payload to be read into.
+ * A read request prepares specific pages that data is to be read into.
+ * When a message is being read off the wire, we call prepare_pages to
+ * find those pages.
* 0 = success, -1 failure.
*/
int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
struct ceph_osd_client *osdc = &client->osdc;
struct ceph_osd_reply_head *rhead = m->front.iov_base;
struct ceph_osd_request *req;
- __u64 tid;
+ u64 tid;
int ret = -1;
int type = le16_to_cpu(m->hdr.type);
dout(10, "prepare_pages unknown tid %llu\n", tid);
goto out;
}
- dout(10, "prepare_pages tid %llu have %d pages, want %d\n",
+ dout(10, "prepare_pages tid %llu has %d pages, want %d\n",
tid, req->r_num_pages, want);
- if (likely(req->r_num_pages >= want &&
- req->r_reply == NULL)) {
+ if (likely(req->r_num_pages >= want && req->r_reply == NULL)) {
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
ceph_msg_get(m);
return ret;
}
-
-static int start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+/*
+ * Register request, send initial attempt.
+ */
+static int start_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
int rc;
rc = register_request(osdc, req);
-
if (rc < 0)
return rc;
-
down_read(&osdc->map_sem);
- rc = send_request(osdc, req, -1);
+ rc = send_request(osdc, req);
up_read(&osdc->map_sem);
-
return rc;
}
/*
* synchronously do an osd request.
+ *
+ * If we are interrupted, take our pages away from any previous sent
+ * request message that may still be being written to the socket.
*/
-static int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+static int do_sync_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
struct ceph_osd_reply_head *replyhead;
__s32 rc;
int bytes;
rc = start_request(osdc, req); /* register+send request */
+ if (rc)
+ return rc;
- if (rc >= 0)
- rc = wait_for_completion_interruptible(&req->r_completion);
-
+ rc = wait_for_completion_interruptible(&req->r_completion);
if (rc < 0) {
struct ceph_msg *msg;
+
dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
rc, req->r_request);
/*
+ * we were interrupted.
+ *
* mark req aborted _before_ revoking pages, so that
* if a racing kick_request _does_ dup the page vec
* pointer, it will definitely then see the aborted
return bytes;
}
+
+
+
+/*
+ * if one or more requests takes too long, a timeout expires.
+ *
+ * FIXME.
+ */
+static void reschedule_timeout(struct ceph_osd_client *osdc)
+{
+ int timeout = osdc->client->mount_args.osd_timeout;
+ dout(10, "reschedule timeout (%d seconds)\n", timeout);
+ schedule_delayed_work(&osdc->timeout_work,
+ round_jiffies_relative(timeout*HZ));
+}
+
static void handle_timeout(struct work_struct *work)
{
u64 next_tid = 0;
up_read(&osdc->map_sem);
}
+
+/*
+ * init, shutdown
+ */
void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
{
dout(5, "init\n");
osdc->client = client;
-
osdc->osdmap = NULL;
init_rwsem(&osdc->map_sem);
init_completion(&osdc->map_waiters);
osdc->last_requested_map = 0;
-
mutex_init(&osdc->request_mutex);
osdc->last_tid = 0;
- osdc->nr_requests = 0;
INIT_RADIX_TREE(&osdc->request_tree, GFP_ATOMIC);
+ osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
}
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
cancel_delayed_work_sync(&osdc->timeout_work);
-
if (osdc->osdmap) {
osdmap_destroy(osdc->osdmap);
osdc->osdmap = NULL;
/*
* synchronous read direct to user buffer.
*
- * if read spans object boundary, just do two separate reads. FIXME:
- * for a correct atomic read, we should take read locks on all
+ * if read spans object boundary, just do two separate reads.
+ *
+ * FIXME: for a correct atomic read, we should take read locks on all
* objects.
*/
int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
- __u64 off, __u64 len,
+ u64 off, u64 len,
char __user *data)
{
struct ceph_osd_request *req;
int i, po, left, l;
- __s32 rc;
+ int rc;
int finalrc = 0;
dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino,
while (left > 0) {
int bad;
l = min_t(int, left, PAGE_CACHE_SIZE-po);
- dout(20, "copy po %d left %d l %d page %d\n",
- po, left, l, i);
bad = copy_to_user(data,
page_address(req->r_pages[i]) + po,
l);
*/
int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
- loff_t off, loff_t len,
+ u64 off, u64 len,
struct page *page)
{
struct ceph_osd_request *req;
- __s32 rc;
+ int rc;
dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino,
vino.snap, off, len);
-
- /* request msg */
- req = ceph_osdc_new_request(osdc, layout, vino, off, (__u64 *)&len,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
CEPH_OSD_OP_READ, NULL);
if (IS_ERR(req))
return PTR_ERR(req);
BUG_ON(len != PAGE_CACHE_SIZE);
- req->r_pages[0] = page;
+ req->r_pages[0] = page;
rc = do_sync_request(osdc, req);
ceph_osdc_put_request(req);
+
dout(10, "readpage result %d\n", rc);
if (rc == -ENOENT)
- rc = 0; /* object page dne; zero it */
+ rc = 0; /* object page dne; caller will zero it */
return rc;
}
int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct address_space *mapping,
struct ceph_vino vino, struct ceph_file_layout *layout,
- __u64 off, __u64 len,
+ u64 off, u64 len,
struct list_head *page_list, int num_pages)
{
struct ceph_osd_request *req;
out:
ceph_osdc_put_request(req);
dout(10, "readpages result %d\n", rc);
- if (rc < 0)
- dout(10, "hrm!\n");
return rc;
}
int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
- __u64 off, __u64 len, const char __user *data)
+ u64 off, u64 len, const char __user *data)
{
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
int i, po, l, left;
- __s32 rc;
+ int rc;
int finalrc = 0;
dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
return PTR_ERR(req);
reqm = req->r_request;
reqhead = reqm->front.iov_base;
- reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK | /* just ack for now... FIXME */
+ reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK | /* ack for now, FIXME */
CEPH_OSD_OP_ORDERSNAP); /* get EOLDSNAPC if out of order */
dout(10, "sync_write %llu~%llu -> %d pages\n", off, len,
int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
- loff_t off, loff_t len,
+ u64 off, u64 len,
struct page **pages, int num_pages)
{
struct ceph_msg *reqm;
BUG_ON(vino.snap != CEPH_NOSNAP);
- /* request + msg */
- req = ceph_osdc_new_request(osdc, layout, vino, off, (__u64 *)&len,
+ req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
CEPH_OSD_OP_WRITE, snapc);
if (IS_ERR(req))
return PTR_ERR(req);
#ifndef _FS_CEPH_OSD_CLIENT_H
#define _FS_CEPH_OSD_CLIENT_H
-/* this will be equivalent to osdc/Objecter.h */
-
#include <linux/radix-tree.h>
#include <linux/completion.h>
struct ceph_msg;
struct ceph_snap_context;
+struct ceph_osd_request;
/*
- * pending request
+ * completion callback for async writepages
*/
-enum {
- REQUEST_ACK, /* write serialized */
- REQUEST_SAFE, /* write committed */
- REQUEST_DONE /* read/stat/whatever completed */
-};
-
-struct ceph_osd_request;
-
typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
+/* an in-flight request */
struct ceph_osd_request {
- __u64 r_tid;
- int r_aborted;
- int r_flags;
- struct ceph_snap_context *r_snapc;
- struct inode *r_inode;
- struct writeback_control *r_wbc;
+ __u64 r_tid; /* unique for this client */
struct ceph_msg *r_request;
- int r_last_osd; /* last osd we sent request to */
- struct ceph_entity_addr r_last_osd_addr;
- unsigned long r_last_stamp;
- union ceph_pg r_pgid;
struct ceph_msg *r_reply;
int r_result;
+ int r_flags; /* any additional flags for the osd */
+ int r_aborted; /* set if we cancel this request */
+
atomic_t r_ref;
- ceph_osdc_callback_t r_callback;
- struct completion r_completion; /* on ack or commit or read? */
- unsigned r_num_pages; /* size of page array (follows) */
- struct page *r_pages[0]; /* pages for data payload */
+ struct completion r_completion; /* on completion, or... */
+ ceph_osdc_callback_t r_callback; /* ...async callback. */
+ struct inode *r_inode; /* needed for async write */
+ struct writeback_control *r_wbc;
+
+ int r_last_osd; /* last osd we sent request to */
+ struct ceph_entity_addr r_last_osd_addr;
+ unsigned long r_last_stamp;
+
+ union ceph_pg r_pgid; /* placement group */
+ struct ceph_snap_context *r_snapc; /* snap context for writes */
+ unsigned r_num_pages; /* size of page array (follows) */
+ struct page *r_pages[0]; /* pages for data payload */
};
struct ceph_osd_client {
struct ceph_osdmap *osdmap; /* current map */
struct rw_semaphore map_sem;
struct completion map_waiters;
- __u64 last_requested_map;
+ u64 last_requested_map;
struct mutex request_mutex;
- __u64 last_tid; /* tid of last request */
+ u64 last_tid; /* tid of last request */
struct radix_tree_root request_tree; /* pending requests, by tid */
- int nr_requests;
+ int num_requests;
struct delayed_work timeout_work;
};
-static inline bool ceph_osdc_flag(struct ceph_osd_client *osdc, int flag)
-{
- return osdc->osdmap && (osdc->osdmap->flags & flag);
-}
-
extern void ceph_osdc_init(struct ceph_osd_client *osdc,
struct ceph_client *client);
extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
struct ceph_msg *msg);
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
+
+/* incoming read messages use this to discover which pages to read
+ * the data payload into. */
extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want);
extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
- loff_t off, loff_t len,
+ u64 off, u64 len,
struct page *page);
extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct address_space *mapping,
struct ceph_vino vino,
struct ceph_file_layout *layout,
- __u64 off, __u64 len,
+ u64 off, u64 len,
struct list_head *page_list, int nr_pages);
extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *sc,
- loff_t off, loff_t len,
+ u64 off, u64 len,
struct page **pagevec, int nr_pages);
extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
- __u64 off, __u64 len,
+ u64 off, u64 len,
char __user *data);
extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *sc,
- __u64 off, __u64 len,
+ u64 off, u64 len,
const char __user *data);
#endif