init_completion(&req->r_safe_completion);
INIT_LIST_HEAD(&req->r_unsafe_item);
req->r_flags = flags;
- req->r_last_osd = -1;
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
return NULL;
}
+
+/*
+ * Track open sessions with osds.
+ */
+static int init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
+{
+ dout("init_osd %p osd%d\n", osd, o);
+ osd->o_osd = o;
+ INIT_LIST_HEAD(&osd->o_requests);
+
+ osd->o_con = kzalloc(sizeof(*osd->o_con), GFP_NOFS);
+ if (!osd->o_con)
+ return -ENOMEM;
+ ceph_con_init(osdc->client->msgr, osd->o_con,
+ &osdc->osdmap->osd_addr[o]);
+ osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
+ osd->o_con->peer_name.num = cpu_to_le32(o);
+ return 0;
+}
+
+static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+{
+ dout("destroy_osd %p\n", osd);
+ rb_erase(&osd->o_node, &osdc->osds);
+ osd->o_con->put(osd->o_con);
+}
+
+static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
+{
+ struct rb_node **p = &osdc->osds.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_osd *osd = NULL;
+
+ while (*p) {
+ parent = *p;
+ osd = rb_entry(parent, struct ceph_osd, o_node);
+ if (new->o_osd < osd->o_osd)
+ p = &(*p)->rb_left;
+ else if (new->o_osd > osd->o_osd)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ rb_link_node(&new->o_node, parent, p);
+ rb_insert_color(&new->o_node, &osdc->osds);
+}
+
+static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
+{
+ struct ceph_osd *osd;
+ struct rb_node *n = osdc->osds.rb_node;
+
+ while (n) {
+ osd = rb_entry(n, struct ceph_osd, o_node);
+ if (o < osd->o_osd)
+ n = n->rb_left;
+ else if (o > osd->o_osd)
+ n = n->rb_right;
+ else
+ return osd;
+ }
+ return NULL;
+}
+
+
+
+
/*
* Register request, assign tid. If this is the first request, set up
* the timeout event.
dout("__unregister_request %p tid %lld\n", req, req->r_tid);
rb_erase(&req->r_node, &osdc->requests);
osdc->num_requests--;
+
+ list_del_init(&req->r_osd_item);
+ if (list_empty(&req->r_osd->o_requests))
+ destroy_osd(osdc, req->r_osd);
+ req->r_osd = NULL;
+
ceph_osdc_put_request(req);
if (req->r_tid == osdc->timeout_tid) {
}
}
+
/*
* Pick an osd (the first 'up' osd in the pg), and put result in
* req->r_last_osd[_addr]. If none, set to -1.
{
struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
union ceph_pg pgid;
- int osd = -1;
+ int o = -1;
int err;
err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
if (err)
return err;
pgid.pg64 = le64_to_cpu(reqhead->layout.ol_pgid);
- osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
- dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
- req->r_tid, pgid.pg64, pgid.pg.pool, osd, req->r_last_osd);
- if (req->r_last_osd == osd &&
- (osd < 0 || ceph_entity_addr_equal(&osdc->osdmap->osd_addr[osd],
- &req->r_last_osd_addr)))
+ o = ceph_calc_pg_primary(osdc->osdmap, pgid);
+
+ if (req->r_osd && req->r_osd->o_osd == o)
return 0;
- req->r_last_osd = osd;
- if (osd >= 0)
- req->r_last_osd_addr = osdc->osdmap->osd_addr[osd];
+
+ dout("map_osds tid %llu pgid %llx pool %d osd%d (was osd%d)\n",
+ req->r_tid, pgid.pg64, pgid.pg.pool, o,
+ req->r_osd ? req->r_osd->o_osd : -1);
+
+ /* XXX FIXME: try to reuse r_osd where possible */
+ if (req->r_osd) {
+ list_del_init(&req->r_osd_item);
+ if (list_empty(&req->r_osd->o_requests))
+ destroy_osd(osdc, req->r_osd);
+ req->r_osd = NULL;
+ }
+
+ req->r_osd = __lookup_osd(osdc, o);
+ if (!req->r_osd) {
+ req->r_osd = kmalloc(sizeof(*req->r_osd), GFP_NOFS);
+ if (!req->r_osd)
+ return -ENOMEM;
+ init_osd(osdc, req->r_osd, o);
+ __insert_osd(osdc, req->r_osd);
+ }
+ list_add(&req->r_osd_item, &req->r_osd->o_requests);
return 1;
}
struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
- int osd;
+ int err;
- map_osds(osdc, req);
- if (req->r_last_osd < 0) {
+ err = map_osds(osdc, req);
+ if (err == -ENOMEM)
+ return err;
+ if (req->r_osd == NULL) {
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_osdmap(&osdc->client->monc,
osdc->osdmap->epoch+1);
return 0;
}
- osd = req->r_last_osd;
dout("send_request %p tid %llu to osd%d flags %d\n",
- req, req->r_tid, osd, req->r_flags);
+ req, req->r_tid, req->r_osd->o_osd, req->r_flags);
reqhead = req->r_request->front.iov_base;
reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
reqhead->reassert_version = req->r_reassert_version;
- req->r_request->hdr.dst.name.type =
- cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
- req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
- req->r_request->hdr.dst.addr = req->r_last_osd_addr;
-
req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
ceph_msg_get(req->r_request); /* send consumes a ref */
- return ceph_msg_send(osdc->client->msgr, req->r_request,
- BASE_DELAY_INTERVAL);
+ ceph_con_send(req->r_osd->o_con, req->r_request);
+ return 0;
}
/*
*/
static void handle_timeout(struct work_struct *work)
{
+#if 0
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
struct ceph_osd_request *req;
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
+#endif
}
/*
*
* If @who is specified, resubmit requests for that specific osd.
*
- * Caller should hold map_sem for read.
+ * Caller should hold map_sem for read and request_mutex.
*/
static void kick_requests(struct ceph_osd_client *osdc,
struct ceph_entity_addr *who)
{
+#if 0
struct ceph_osd_request *req;
struct rb_node *p;
int needmap = 0;
ceph_monc_request_osdmap(&osdc->client->monc,
osdc->osdmap->epoch+1);
}
+#endif
}
/*
#ifndef _FS_CEPH_OSD_CLIENT_H
#define _FS_CEPH_OSD_CLIENT_H
-#include <linux/radix-tree.h>
#include <linux/completion.h>
#include <linux/mempool.h>
+#include <linux/rbtree.h>
#include "types.h"
#include "osdmap.h"
struct ceph_snap_context;
struct ceph_osd_request;
struct ceph_osd_client;
+struct ceph_connection;
/*
* completion callback for async writepages
typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *,
struct ceph_msg *);
+/* a given osd we're communicating with */
+struct ceph_osd {
+ int o_osd;
+ struct rb_node o_node;
+ struct ceph_connection *o_con;
+ struct list_head o_requests;
+};
+
/* an in-flight request */
struct ceph_osd_request {
u64 r_tid; /* unique for this client */
struct rb_node r_node;
+ struct list_head r_osd_item;
+ struct ceph_osd *r_osd;
struct ceph_msg *r_request, *r_reply;
int r_result;
char r_oid[40]; /* object name */
int r_oid_len;
- int r_last_osd; /* pg osds */
- struct ceph_entity_addr r_last_osd_addr;
unsigned long r_timeout_stamp;
bool r_resend; /* msg send failed, needs retry */
u64 last_requested_map;
struct mutex request_mutex;
+ struct rb_root osds; /* osds */
u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */