- msgr: unidirectional option
- what about mon -> mds/osd messages?
- kclient msgr fixups
- - private field in ceph_connection
- - kill radix_tree; allocate connections in callers (osdc, monc, mdsc)
- - simplify ceph_ping (should be a single byte)
+ - allow msg to be revoked from connection (per-msg queue_seq, mutex)
+ - kill maybe_dup_msg
bugs
return;
}
-static void con_get(struct ceph_connection *con)
+static struct ceph_connection *con_get(struct ceph_connection *con)
{
struct ceph_mds_session *s = con->private;
- ceph_get_mds_session(s);
+ if (ceph_get_mds_session(s))
+ return con;
+ return NULL;
}
static void con_put(struct ceph_connection *con)
/*
* generic get/put
*/
-void ceph_con_get(struct ceph_connection *con)
+struct ceph_connection *ceph_con_get(struct ceph_connection *con)
{
dout("con_get %p nref = %d -> %d\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) + 1);
- atomic_inc(&con->nref);
+ if (atomic_inc_not_zero(&con->nref))
+ return con;
+ return NULL;
}
void ceph_con_put(struct ceph_connection *con)
return;
}
- con->ops->get(con);
+ if (!con->ops->get(con)) {
+ dout("ceph_queue_con %p ref count 0\n", con);
+ return;
+ }
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
* Ceph defines these callbacks for handling connection events.
*/
struct ceph_connection_operations {
- void (*get)(struct ceph_connection *);
+ struct ceph_connection *(*get)(struct ceph_connection *);
void (*put)(struct ceph_connection *);
/* handle an incoming message. */
extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
extern void ceph_con_keepalive(struct ceph_connection *con);
extern void ceph_con_close(struct ceph_connection *con);
-extern void ceph_con_get(struct ceph_connection *con);
+extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
extern void ceph_con_put(struct ceph_connection *con);
extern struct ceph_msg *ceph_msg_new(int type, int front_len,