static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
+const static struct ceph_connection_operations mds_con_ops;
+
+
/*
* mds reply parsing
*/
return mdsc->sessions[mds];
}
-static void con_get(struct ceph_connection *con)
-{
- struct ceph_mds_session *s = con->private;
-
- ceph_get_mds_session(s);
-}
-
-static void con_put(struct ceph_connection *con)
-{
- struct ceph_mds_session *s = con->private;
-
- ceph_put_mds_session(s);
-}
-
-/*
- * if the client is unresponsive for long enough, the mds will kill
- * the session entirely.
- */
-static void peer_reset(void *p, struct ceph_connection *con)
-{
- struct ceph_mds_session *s = p;
-
- pr_err("ceph mds%d gave us the boot. IMPLEMENT RECONNECT.\n",
- s->s_mds);
-}
-
/*
* create+register a new session for given mds.
* called under mdsc->mutex.
struct ceph_mds_session *s;
s = kzalloc(sizeof(*s), GFP_NOFS);
+ s->s_mdsc = mdsc;
s->s_mds = mds;
s->s_state = CEPH_MDS_SESSION_NEW;
s->s_ttl = 0;
ceph_con_init(mdsc->client->msgr, &s->s_con,
ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
s->s_con.private = s;
- s->s_con.peer_reset = peer_reset;
- s->s_con.get = con_get;
- s->s_con.put = con_put;
+ s->s_con.ops = &mds_con_ops;
s->s_con.peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MDS);
s->s_con.peer_name.num = cpu_to_le32(mds);
* This preserves the logical ordering of replies, capabilities, etc., sent
* by the MDS as they are applied to our local cache.
*/
-void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct ceph_mds_request *req;
struct ceph_mds_reply_head *head = msg->front.iov_base;
/*
* handle mds notification that our request has been forwarded.
*/
-void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg)
+static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct ceph_mds_request *req;
u64 tid;
/*
* handle a mds session control message
*/
-void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg)
+static void handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
u32 op;
u64 seq;
di->lease_session = NULL;
}
-void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
{
struct super_block *sb = mdsc->client->sb;
struct inode *inode;
return;
}
+static void con_get(struct ceph_connection *con)
+{
+ struct ceph_mds_session *s = con->private;
+
+ ceph_get_mds_session(s);
+}
+
+static void con_put(struct ceph_connection *con)
+{
+ struct ceph_mds_session *s = con->private;
+
+ ceph_put_mds_session(s);
+}
+
+/*
+ * if the client is unresponsive for long enough, the mds will kill
+ * the session entirely.
+ */
+static void peer_reset(struct ceph_connection *con)
+{
+ struct ceph_mds_session *s = con->private;
+
+ pr_err("ceph mds%d gave us the boot. IMPLEMENT RECONNECT.\n",
+ s->s_mds);
+}
+
+static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_mds_session *s = con->private;
+ struct ceph_mds_client *mdsc = s->s_mdsc;
+ int type = le16_to_cpu(msg->hdr.type);
+
+ switch (type) {
+ case CEPH_MSG_MDS_MAP:
+ ceph_mdsc_handle_map(mdsc, msg);
+ break;
+ case CEPH_MSG_CLIENT_SESSION:
+ handle_session(mdsc, msg);
+ break;
+ case CEPH_MSG_CLIENT_REPLY:
+ handle_reply(mdsc, msg);
+ break;
+ case CEPH_MSG_CLIENT_REQUEST_FORWARD:
+ handle_forward(mdsc, msg);
+ break;
+ case CEPH_MSG_CLIENT_CAPS:
+ ceph_handle_caps(mdsc, msg);
+ break;
+ case CEPH_MSG_CLIENT_SNAP:
+ ceph_handle_snap(mdsc, msg);
+ break;
+ case CEPH_MSG_CLIENT_LEASE:
+ handle_lease(mdsc, msg);
+ break;
+
+ default:
+ pr_err("ceph received unknown message type %d %s\n", type,
+ ceph_msg_type_name(type));
+ }
+ ceph_msg_put(msg);
+}
+
+const static struct ceph_connection_operations mds_con_ops = {
+ .get = con_get,
+ .put = con_put,
+ .dispatch = dispatch,
+ .peer_reset = peer_reset,
+ .alloc_msg = ceph_alloc_msg,
+ .alloc_middle = ceph_alloc_middle,
+};
+
+
+
/* eof */
};
struct ceph_mds_session {
+ struct ceph_mds_client *s_mdsc;
int s_mds;
int s_state;
unsigned long s_ttl; /* time until mds kills us */
extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
-extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
-extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
-extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
-
-extern void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
-
extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
struct inode *inode,
struct dentry *dn, int mask);
extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
-extern void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds);
extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
int stop_on_nosnap);
struct dentry *dentry, char action,
u32 seq);
+extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+
#endif
return NULL;
yes:
- con->get(con);
+ con->ops->get(con);
dout("get_connection %p nref = %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
return con;
/*
* generic get/put
*/
-static void get_connection(struct ceph_connection *con)
+void ceph_con_get(struct ceph_connection *con)
{
- dout("get_connection %p nref = %d -> %d\n", con,
+ dout("con_get %p nref = %d -> %d\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) + 1);
atomic_inc(&con->nref);
}
-static void put_connection(struct ceph_connection *con)
+void ceph_con_put(struct ceph_connection *con)
{
- dout("put_connection %p nref = %d -> %d\n", con,
+ dout("con_put %p nref = %d -> %d\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) - 1);
BUG_ON(atomic_read(&con->nref) == 0);
if (atomic_dec_and_test(&con->nref)) {
con->peer_addr = *addr;
con->private = NULL;
- con->get = get_connection;
- con->put = put_connection;
}
/*
dout("register_connection %p %d -> %d\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) + 1);
- con->get(con);
+ con->ops->get(con);
list_add(&con->list_all, &msgr->con_all);
rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
if (rc < 0) {
list_del(&con->list_all);
- con->put(con);
+ con->ops->put(con);
return rc;
}
}
list_del_init(&con->list_bucket);
}
}
- con->put(con);
+ con->ops->put(con);
}
static void remove_connection(struct ceph_messenger *msgr,
/* Tell ceph about it. */
pr_info("reset on %s%d\n", ENTITY_NAME(con->peer_name));
- con->peer_reset(con->private, con);
+ con->ops->peer_reset(con);
break;
case CEPH_MSGR_TAG_RETRY_SESSION:
if (!con->in_msg) {
dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
con->in_hdr.front_len, con->in_hdr.data_len);
- con->in_msg = con->msgr->alloc_msg(con->msgr->parent,
- &con->in_hdr);
+ con->in_msg = con->ops->alloc_msg(con->msgr->parent,
+ &con->in_hdr);
if (!con->in_msg) {
/* skip this message */
dout("alloc_msg returned NULL, skipping message\n");
while (middle_len > 0 && (!m->middle ||
m->middle->vec.iov_len < middle_len)) {
if (m->middle == NULL) {
- BUG_ON(!con->msgr->alloc_middle);
- ret = con->msgr->alloc_middle(con->msgr->parent, m);
+ BUG_ON(!con->ops->alloc_middle);
+ ret = con->ops->alloc_middle(con->msgr->parent, m);
if (ret < 0) {
dout("alloc_middle failed, skipping payload\n");
con->in_base_pos = -middle_len - data_len
/* find pages for data payload */
want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
ret = 0;
- BUG_ON(!con->msgr->prepare_pages);
- ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
+ BUG_ON(!con->ops->prepare_pages);
+ ret = con->ops->prepare_pages(con->msgr->parent, m, want);
if (ret < 0) {
dout("%p prepare_pages failed, skipping payload\n", m);
con->in_base_pos = -data_len - sizeof(m->footer);
le32_to_cpu(con->in_msg->hdr.front_len),
le32_to_cpu(con->in_msg->hdr.data_len),
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
- con->msgr->dispatch(con->msgr->parent, con->in_msg);
+ con->ops->dispatch(con, con->in_msg);
con->in_msg = NULL;
prepare_read_tag(con);
}
return;
}
- con->get(con);
+ con->ops->get(con);
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
!queue_work(ceph_msgr_wq, &con->work.work)) {
dout("ceph_queue_con %p - already BUSY or queued\n", con);
- con->put(con);
+ con->ops->put(con);
} else {
dout("ceph_queue_con %p\n", con);
}
dout("con_work %p done\n", con);
out:
- con->put(con);
+ con->ops->put(con);
}
dout("fault queueing %p %d -> %d delay %lu\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) + 1,
con->delay);
- con->get(con);
+ con->ops->get(con);
queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay));
}
list_all);
dout("destroy removing connection %p\n", con);
set_bit(CLOSED, &con->state);
- con->get(con);
+ con->ops->get(con);
__remove_connection(msgr, con);
/* in case there's queued work. drop a reference if
* we successfully cancel work. */
spin_unlock(&msgr->con_lock);
if (cancel_delayed_work_sync(&con->work))
- con->put(con);
- con->put(con);
+ con->ops->put(con);
+ con->ops->put(con);
dout("destroy removed connection %p\n", con);
spin_lock(&msgr->con_lock);
}
spin_unlock(&msgr->con_lock);
if (con)
- con->put(con);
+ con->ops->put(con);
}
/*
spin_lock(&msgr->con_lock);
con = __get_connection(msgr, &msg->hdr.dst.addr);
if (con) {
- con->put(newcon);
+ con->ops->put(newcon);
dout("ceph_msg_send (lost race and) had connection "
"%p to peer %u.%u.%u.%u:%u\n", con,
IPQUADPORT(msg->hdr.dst.addr.ipaddr));
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
ceph_queue_con(con);
- con->put(con);
+ con->ops->put(con);
dout("ceph_msg_send done\n");
return ret;
}
le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
- dout("ceph_con_send %p %p seq %llu for %s%d on %p pgs %d\n",
- con, msg, le64_to_cpu(msg->hdr.seq),
- ENTITY_NAME(msg->hdr.dst.name), con, msg->nr_pages);
+ dout("ceph_con_send %p %s%d %p seq %llu pgs %d\n",
+ con, ENTITY_NAME(msg->hdr.dst.name), msg,
+ le64_to_cpu(msg->hdr.seq), msg->nr_pages);
list_add_tail(&msg->list_head, &con->out_queue);
spin_unlock(&con->out_queue_lock);
return ERR_PTR(-ENOMEM);
}
+/*
+ * Generic message allocator, for incoming messages.
+ */
+struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr)
+{
+ int type = le32_to_cpu(hdr->type);
+ int front_len = le32_to_cpu(hdr->front_len);
+ struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
+
+ if (!msg) {
+ pr_err("ceph: unable to allocate msg type %d len %d\n",
+ type, front_len);
+ return ERR_PTR(-ENOMEM);
+ }
+ return msg;
+}
+
+/*
+ * Allocate "middle" portion of a message, if it is needed and wasn't
+ * allocated by alloc_msg. This allows us to read a small fixed-size
+ * per-type header in the front and then gracefully fail (i.e.,
+ * propagate the error to the caller based on info in the front) when
+ * the middle is too large.
+ */
+int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ int type = le32_to_cpu(msg->hdr.type);
+ int middle_len = le32_to_cpu(msg->hdr.middle_len);
+
+ dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
+ ceph_msg_type_name(type), middle_len);
+ BUG_ON(!middle_len);
+ BUG_ON(msg->middle);
+
+ msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
+ if (!msg->middle)
+ return -ENOMEM;
+ return 0;
+}
+
+
/*
* Free a generically kmalloc'd message.
*/
extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */
/*
- * Ceph defines these callbacks for handling events:
+ * Ceph defines these callbacks for handling connection events.
*/
-/* handle an incoming message. */
-typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
-/* an incoming message has a data payload; tell me what pages I
- * should read the data into. */
-typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
- int want);
-/* a remote host as terminated a message exchange session, and messages
- * we sent (or they tried to send us) may be lost. */
-typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
- struct ceph_entity_name *pn);
-
-typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p,
- struct ceph_msg_header *hdr);
-typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg);
-
-typedef void (*ceph_con_get_t)(struct ceph_connection *);
-typedef void (*ceph_con_put_t)(struct ceph_connection *);
+struct ceph_connection_operations {
+ void (*get)(struct ceph_connection *);
+ void (*put)(struct ceph_connection *);
+
+ /* handle an incoming message. */
+ void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
+
+ /* a remote host as terminated a message exchange session, and messages
+ * we sent (or they tried to send us) may be lost. */
+ void (*peer_reset) (struct ceph_connection *con);
+
+ struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
+ struct ceph_msg_header *hdr);
+ int (*alloc_middle) (struct ceph_connection *con,
+ struct ceph_msg *msg);
+ /* an incoming message has a data payload; tell me what pages I
+ * should read the data into. */
+ int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
+ int want);
+};
static inline const char *ceph_name_type_str(int t)
{
struct ceph_messenger {
void *parent; /* normally struct ceph_client * */
- ceph_msgr_dispatch_t dispatch;
- ceph_msgr_prepare_pages_t prepare_pages;
- ceph_msgr_alloc_msg_t alloc_msg;
- ceph_msgr_alloc_middle_t alloc_middle;
struct ceph_entity_inst inst; /* my name+address */
*/
struct ceph_connection {
void *private;
- ceph_con_get_t get;
- ceph_con_put_t put;
atomic_t nref;
- ceph_msgr_peer_reset_t peer_reset;
+ const struct ceph_connection_operations *ops;
struct ceph_messenger *msgr;
struct socket *sock;
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con);
extern void ceph_con_close(struct ceph_connection *con);
+extern void ceph_con_get(struct ceph_connection *con);
+extern void ceph_con_put(struct ceph_connection *con);
+
+extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr);
+extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
struct ceph_entity_addr *addr);
* the cluster to try.
*/
+const static struct ceph_connection_operations mon_con_ops;
+
/*
* Decode a monmap blob (e.g., during mount).
*/
if (monc->con) {
dout("open_session closing mon%d\n", monc->last_mon);
- monc->con->put(monc->con);
+ monc->con->ops->put(monc->con);
}
get_random_bytes(&r, 1);
dout("open_session mon%d opened\n", monc->last_mon);
ceph_con_init(monc->client->msgr, monc->con,
&monc->monmap->mon_inst[monc->last_mon].addr);
+ monc->con->private = monc;
+ monc->con->ops = &mon_con_ops;
monc->con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MON);
monc->con->peer_name.num = cpu_to_le32(monc->last_mon);
return 0;
* The monitor responds with mount ack indicate mount success. The
* included client ticket allows the client to talk to MDSs and OSDs.
*/
-void ceph_handle_mount_ack(struct ceph_client *client, struct ceph_msg *msg)
+static void handle_mount_ack(struct ceph_mon_client *monc, struct ceph_msg *msg)
{
- struct ceph_mon_client *monc = &client->monc;
- struct ceph_monmap *monmap = NULL, *old = client->monc.monmap;
+ struct ceph_client *client = monc->client;
+ struct ceph_monmap *monmap = NULL, *old = monc->monmap;
void *p, *end;
s32 result;
u32 len;
mutex_unlock(&monc->req_mutex);
}
-void ceph_monc_handle_umount(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
+static void handle_umount(struct ceph_mon_client *monc, struct ceph_msg *msg)
{
dout("handle_umount\n");
mutex_lock(&monc->req_mutex);
/*
* statfs
*/
-void ceph_monc_handle_statfs_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg)
+static void handle_statfs_reply(struct ceph_mon_client *monc,
+ struct ceph_msg *msg)
{
struct ceph_mon_statfs_request *req;
struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
cancel_timeout(&monc->umountreq);
cancel_delayed_work_sync(&monc->statfs_delayed_work);
if (monc->con) {
- monc->con->put(monc->con);
+ monc->con->ops->put(monc->con);
monc->con = NULL;
}
kfree(monc->monmap);
}
+
+
+/*
+ * handle incoming message
+ */
+static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_mon_client *monc = con->private;
+ int type = le16_to_cpu(msg->hdr.type);
+
+ switch (type) {
+ case CEPH_MSG_CLIENT_MOUNT_ACK:
+ handle_mount_ack(monc, msg);
+ break;
+ case CEPH_MSG_STATFS_REPLY:
+ handle_statfs_reply(monc, msg);
+ break;
+ case CEPH_MSG_CLIENT_UNMOUNT:
+ handle_umount(monc, msg);
+ break;
+
+ case CEPH_MSG_MDS_MAP:
+ ceph_mdsc_handle_map(&monc->client->mdsc, msg);
+ break;
+
+ case CEPH_MSG_OSD_MAP:
+ ceph_osdc_handle_map(&monc->client->osdc, msg);
+ break;
+
+ default:
+ pr_err("ceph received unknown message type %d %s\n", type,
+ ceph_msg_type_name(type));
+ }
+ ceph_msg_put(msg);
+}
+
+const static struct ceph_connection_operations mon_con_ops = {
+ .get = ceph_con_get,
+ .put = ceph_con_put,
+ .dispatch = dispatch,
+ .alloc_msg = ceph_alloc_msg,
+ .alloc_middle = ceph_alloc_middle,
+};
extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
extern void ceph_monc_request_mount(struct ceph_mon_client *monc);
-extern void ceph_handle_mount_ack(struct ceph_client *client,
- struct ceph_msg *msg);
+extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
struct ceph_statfs *buf);
-extern void ceph_monc_handle_statfs_reply(struct ceph_mon_client *monc,
- struct ceph_msg *msg);
-extern void ceph_monc_request_umount(struct ceph_mon_client *monc);
-extern void ceph_monc_handle_umount(struct ceph_mon_client *monc,
- struct ceph_msg *msg);
+
#endif
#include "messenger.h"
#include "decode.h"
+const static struct ceph_connection_operations osd_con_ops;
static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
* reply (because it does not get notification when clients, mds' leave
* the cluster).
*/
-static void peer_reset(void *p, struct ceph_connection *con)
+static void peer_reset(struct ceph_connection *con)
{
- struct ceph_osd *osd = p;
+ struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc = osd->o_osdc;
down_read(&osdc->map_sem);
ceph_con_init(osdc->client->msgr, osd->o_con,
&osdc->osdmap->osd_addr[o]);
osd->o_con->private = osd;
- osd->o_con->peer_reset = peer_reset;
+ osd->o_con->ops = &osd_con_ops;
osd->o_con->peer_name.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
osd->o_con->peer_name.num = cpu_to_le32(o);
return 0;
{
dout("destroy_osd %p\n", osd);
rb_erase(&osd->o_node, &osdc->osds);
- osd->o_con->put(osd->o_con);
+ osd->o_con->ops->put(osd->o_con);
}
static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*/
-void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
struct ceph_osd_reply_head *rhead = msg->front.iov_base;
struct ceph_osd_request *req;
mutex_init(&osdc->request_mutex);
osdc->timeout_tid = 0;
osdc->last_tid = 0;
+ osdc->osds = RB_ROOT;
osdc->requests = RB_ROOT;
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
return rc;
}
+/*
+ * handle incoming message
+ */
+static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
+{
+ struct ceph_osd *osd = con->private;
+ struct ceph_osd_client *osdc = osd->o_osdc;
+ int type = le16_to_cpu(msg->hdr.type);
+
+ switch (type) {
+ case CEPH_MSG_OSD_MAP:
+ ceph_osdc_handle_map(osdc, msg);
+ break;
+ case CEPH_MSG_OSD_OPREPLY:
+ handle_reply(osdc, msg);
+ break;
+
+ default:
+ pr_err("ceph received unknown message type %d %s\n", type,
+ ceph_msg_type_name(type));
+ }
+ ceph_msg_put(msg);
+}
+
+const static struct ceph_connection_operations osd_con_ops = {
+ .get = ceph_con_get,
+ .put = ceph_con_put,
+ .dispatch = dispatch,
+ .peer_reset = peer_reset,
+ .alloc_msg = ceph_alloc_msg,
+ .alloc_middle = ceph_alloc_middle,
+};
+
+
/*
* Ceph superblock operations
*
- * Handle the basics of mounting, unmounting. Also dispatch message
- * types to appropriate handlers and subsystems.
+ * Handle the basics of mounting, unmounting.
*/
-static void ceph_dispatch(void *p, struct ceph_msg *msg);
-static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr);
-static int ceph_alloc_middle(void *p, struct ceph_msg *msg);
/*
* find filename portion of a path (/foo/bar/baz -> baz)
goto out;
}
client->msgr->parent = client;
- client->msgr->dispatch = ceph_dispatch;
- client->msgr->prepare_pages = ceph_osdc_prepare_pages;
- client->msgr->alloc_msg = ceph_alloc_msg;
- client->msgr->alloc_middle = ceph_alloc_middle;
}
/* send mount request, and wait for mon, mds, and osd maps */
return err;
}
+#if 0
static struct ceph_msg_pool *get_pool(struct ceph_client *client, int type)
{
switch (type) {
* Allocate incoming message. Return message, or NULL to ignore message,
* or error to fault connection.
*/
-static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr)
+struct ceph_msg *my_ceph_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr)
{
struct ceph_client *client = p;
int type = le32_to_cpu(hdr->type);
msg->front.iov_len = front_len;
return msg;
}
-
-/*
- * Allocate "middle" portion of a message, if it is needed and wasn't
- * allocated by alloc_msg. This allows us to read a small fixed-size
- * per-type header in the front and then gracefully fail (i.e.,
- * propagate the error to the caller based on info in the front) when
- * the middle is too large.
- */
-static int ceph_alloc_middle(void *p, struct ceph_msg *msg)
-{
- int type = le32_to_cpu(msg->hdr.type);
- int middle_len = le32_to_cpu(msg->hdr.middle_len);
-
- dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
- ceph_msg_type_name(type), middle_len);
- BUG_ON(!middle_len);
- BUG_ON(msg->middle);
-
- msg->middle = ceph_buffer_new_alloc(middle_len, GFP_NOFS);
- if (!msg->middle)
- return -ENOMEM;
- return 0;
-}
-
-
-/*
- * Process an incoming message.
- *
- * This should be relatively fast and must not do any work that waits
- * on other messages to be received.
- */
-void ceph_dispatch(void *p, struct ceph_msg *msg)
-{
- struct ceph_client *client = p;
- int type = le16_to_cpu(msg->hdr.type);
-
- switch (type) {
- case CEPH_MSG_CLIENT_MOUNT_ACK:
- ceph_handle_mount_ack(client, msg);
- break;
-
- /* mon client */
- case CEPH_MSG_STATFS_REPLY:
- ceph_monc_handle_statfs_reply(&client->monc, msg);
- break;
- case CEPH_MSG_CLIENT_UNMOUNT:
- ceph_monc_handle_umount(&client->monc, msg);
- break;
-
- /* mds client */
- case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(&client->mdsc, msg);
- break;
- case CEPH_MSG_CLIENT_SESSION:
- ceph_mdsc_handle_session(&client->mdsc, msg);
- break;
- case CEPH_MSG_CLIENT_REPLY:
- ceph_mdsc_handle_reply(&client->mdsc, msg);
- break;
- case CEPH_MSG_CLIENT_REQUEST_FORWARD:
- ceph_mdsc_handle_forward(&client->mdsc, msg);
- break;
- case CEPH_MSG_CLIENT_CAPS:
- ceph_handle_caps(&client->mdsc, msg);
- break;
- case CEPH_MSG_CLIENT_SNAP:
- ceph_handle_snap(&client->mdsc, msg);
- break;
- case CEPH_MSG_CLIENT_LEASE:
- ceph_mdsc_handle_lease(&client->mdsc, msg);
- break;
-
- /* osd client */
- case CEPH_MSG_OSD_MAP:
- ceph_osdc_handle_map(&client->osdc, msg);
- break;
- case CEPH_MSG_OSD_OPREPLY:
- ceph_osdc_handle_reply(&client->osdc, msg);
- break;
-
- default:
- pr_err("ceph received unknown message type %d %s\n", type,
- ceph_msg_type_name(type));
- }
-
- ceph_msg_put(msg);
-}
-
+#endif
static int ceph_set_super(struct super_block *s, void *data)
{
put_unaligned_le64(val, &fsid->fsid[0]);
}
+/*
+extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
+extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr);
+*/
+
/* inode.c */
extern const struct inode_operations ceph_file_iops;