/*
* Track open sessions with osds.
*/
-static int open_osd_session(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
{
- int o = osd->o_osd;
+ struct ceph_osd *osd;
- osd->o_con = kzalloc(sizeof(*osd->o_con), GFP_NOFS);
- if (!osd->o_con)
- return -ENOMEM;
- ceph_con_init(osdc->client->msgr, osd->o_con);
- osd->o_con->private = osd;
- osd->o_con->ops = &osd_con_ops;
- osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
- osd->o_con->peer_name.num = cpu_to_le32(o);
- ceph_con_open(osd->o_con, &osdc->osdmap->osd_addr[o]);
- return 0;
-}
+ osd = kzalloc(sizeof(*osd), GFP_NOFS);
+ if (!osd)
+ return NULL;
-static void init_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd, int o)
-{
- dout("init_osd %p osd%d\n", osd, o);
+ atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
- osd->o_osd = o;
INIT_LIST_HEAD(&osd->o_requests);
- osd->o_con = NULL;
+
+ ceph_con_init(osdc->client->msgr, &osd->o_con);
+ osd->o_con.private = osd;
+ osd->o_con.ops = &osd_con_ops;
+ osd->o_con.peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
+ return osd;
+}
+
+static struct ceph_osd *get_osd(struct ceph_osd *osd)
+{
+ if (atomic_inc_not_zero(&osd->o_ref)) {
+ dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
+ atomic_read(&osd->o_ref));
+ return osd;
+ } else {
+ dout("get_osd %p FAIL\n", osd);
+ return NULL;
+ }
+}
+
+static void put_osd(struct ceph_osd *osd)
+{
+ dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
+ atomic_read(&osd->o_ref) - 1);
+ if (atomic_dec_and_test(&osd->o_ref)) {
+ ceph_con_shutdown(&osd->o_con);
+ kfree(osd);
+ }
}
-static void destroy_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+/*
+ * remove an osd from our map
+ */
+static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
- dout("destroy_osd %p\n", osd);
+ dout("remove_osd %p\n", osd);
BUG_ON(!list_empty(&osd->o_requests));
rb_erase(&osd->o_node, &osdc->osds);
- osd->o_con->private = NULL;
- ceph_con_close(osd->o_con);
- osd->o_con->ops->put(osd->o_con);
- osd->o_con = NULL;
+ ceph_con_close(&osd->o_con);
+ put_osd(osd);
}
+/*
+ * reset osd connect
+ */
static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
int ret = 0;
dout("reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests)) {
- destroy_osd(osdc, osd);
- kfree(osd);
+ remove_osd(osdc, osd);
} else {
- ceph_con_close(osd->o_con);
- osd->o_con->ops->put(osd->o_con);
- osd->o_con = NULL;
- ret = open_osd_session(osdc, osd);
+ ceph_con_close(&osd->o_con);
+ ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
}
return ret;
}
}
-
-
/*
* Register request, assign tid. If this is the first request, set up
* the timeout event.
osdc->num_requests--;
list_del_init(&req->r_osd_item);
- if (list_empty(&req->r_osd->o_requests)) {
- destroy_osd(osdc, req->r_osd);
- kfree(req->r_osd);
- }
+ if (list_empty(&req->r_osd->o_requests))
+ remove_osd(osdc, req->r_osd);
req->r_osd = NULL;
ceph_osdc_put_request(req);
list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests)) {
/* try to re-use r_osd if possible */
- newosd = req->r_osd;
- destroy_osd(osdc, newosd);
+ newosd = get_osd(req->r_osd);
+ remove_osd(osdc, newosd);
}
req->r_osd = NULL;
}
newosd = NULL;
} else {
err = -ENOMEM;
- req->r_osd = kmalloc(sizeof(*req->r_osd), GFP_NOFS);
+ req->r_osd = create_osd(osdc);
if (!req->r_osd)
goto out;
}
- init_osd(osdc, req->r_osd, o);
+
+ dout("map_osds osd %p is osd%d\n", req->r_osd, o);
+ req->r_osd->o_osd = o;
+ req->r_osd->o_con.peer_name.num = cpu_to_le32(o);
__insert_osd(osdc, req->r_osd);
+
+ ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
}
if (req->r_osd)
err = 1; /* osd changed */
out:
- kfree(newosd);
+ if (newosd)
+ put_osd(newosd);
return err;
}
dout("send_request %p tid %llu to osd%d flags %d\n",
req, req->r_tid, req->r_osd->o_osd, req->r_flags);
- if (req->r_osd->o_con == NULL) {
- err = open_osd_session(osdc, req->r_osd);
- if (err < 0)
- return err;
- }
-
reqhead = req->r_request->front.iov_base;
reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
ceph_msg_get(req->r_request); /* send consumes a ref */
- ceph_con_send(req->r_osd->o_con, req->r_request);
+ ceph_con_send(&req->r_osd->o_con, req->r_request);
return 0;
}
dout(" tid %llu (at least) timed out on osd%d\n",
req->r_tid, osd->o_osd);
req->r_timeout_stamp = next_timeout;
- ceph_con_keepalive(osd->o_con);
+ ceph_con_keepalive(&osd->o_con);
}
if (osdc->timeout_tid)
n = rb_next(p);
if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
- !ceph_entity_addr_equal(&osd->o_con->peer_addr,
+ !ceph_entity_addr_equal(&osd->o_con.peer_addr,
ceph_osd_addr(osdc->osdmap,
osd->o_osd)))
reset_osd(osdc, osd);
ceph_msg_put(msg);
}
+/*
+ * Wrappers to refcount containing ceph_osd struct
+ */
+static struct ceph_connection *get_osd_con(struct ceph_connection *con)
+{
+ struct ceph_osd *osd = con->private;
+ if (get_osd(osd))
+ return con;
+ return NULL;
+}
+
+static void put_osd_con(struct ceph_connection *con)
+{
+ struct ceph_osd *osd = con->private;
+ put_osd(osd);
+}
+
const static struct ceph_connection_operations osd_con_ops = {
- .get = ceph_con_get,
- .put = ceph_con_put,
+ .get = get_osd_con,
+ .put = put_osd_con,
.dispatch = dispatch,
.peer_reset = osd_reset,
.alloc_msg = ceph_alloc_msg,