}
-/*
- * initialize a new connection
- */
-void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
- struct ceph_entity_addr *addr)
-{
- con->msgr = msgr;
- atomic_set(&con->nref, 1);
- INIT_LIST_HEAD(&con->list_all);
- INIT_LIST_HEAD(&con->list_bucket);
- spin_lock_init(&con->out_queue_lock);
- INIT_LIST_HEAD(&con->out_queue);
- INIT_LIST_HEAD(&con->out_sent);
- INIT_DELAYED_WORK(&con->work, con_work);
- con->peer_addr = *addr;
-}
-
/*
* The con_tree radix_tree has an unsigned long key and void * value.
* Since ceph_entity_addr is bigger than that, we use a trivial hash
return NULL;
yes:
- atomic_inc(&con->nref);
+ con->get(con);
dout("get_connection %p nref = %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
return con;
}
/*
- * drop a reference
+ * clean up connection state
+ */
+static void ceph_con_destroy(struct ceph_connection *con)
+{
+ dout("con_destroy %p destroying\n", con);
+ ceph_msg_put_list(&con->out_queue);
+ ceph_msg_put_list(&con->out_sent);
+ set_bit(CLOSED, &con->state);
+ con_close_socket(con); /* silently ignore errors */
+}
+
+/*
+ * generic get/put
*/
+static void get_connection(struct ceph_connection *con)
+{
+ dout("get_connection %p nref = %d -> %d\n", con,
+ atomic_read(&con->nref), atomic_read(&con->nref) + 1);
+ atomic_inc(&con->nref);
+}
+
static void put_connection(struct ceph_connection *con)
{
dout("put_connection %p nref = %d -> %d\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) - 1);
BUG_ON(atomic_read(&con->nref) == 0);
if (atomic_dec_and_test(&con->nref)) {
- dout("put_connection %p destroying\n", con);
- ceph_msg_put_list(&con->out_queue);
- ceph_msg_put_list(&con->out_sent);
- set_bit(CLOSED, &con->state);
- con_close_socket(con); /* silently ignore possible errors */
+ ceph_con_destroy(con);
kfree(con);
}
}
+/*
+ * initialize a new connection.
+ *
+ * NOTE: assumes struct is initially zeroed!
+ */
+void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con,
+ struct ceph_entity_addr *addr)
+{
+ atomic_set(&con->nref, 1);
+ con->msgr = msgr;
+ INIT_LIST_HEAD(&con->list_all);
+ INIT_LIST_HEAD(&con->list_bucket);
+ spin_lock_init(&con->out_queue_lock);
+ INIT_LIST_HEAD(&con->out_queue);
+ INIT_LIST_HEAD(&con->out_sent);
+ INIT_DELAYED_WORK(&con->work, con_work);
+ con->peer_addr = *addr;
+
+ con->private = NULL;
+ con->get = get_connection;
+ con->put = put_connection;
+}
+
/*
* add a connection to the con_tree.
*
dout("register_connection %p %d -> %d\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) + 1);
- atomic_inc(&con->nref);
+ con->get(con);
list_add(&con->list_all, &msgr->con_all);
rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
if (rc < 0) {
list_del(&con->list_all);
- put_connection(con);
+ con->put(con);
return rc;
}
}
list_del_init(&con->list_bucket);
}
}
- put_connection(con);
+ con->put(con);
}
static void remove_connection(struct ceph_messenger *msgr,
return;
}
- atomic_inc(&con->nref);
- dout("ceph_queue_con %p %d -> %d\n", con,
- atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+ con->get(con);
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
!queue_work(ceph_msgr_wq, &con->work.work)) {
dout("ceph_queue_con %p - already BUSY or queued\n", con);
- put_connection(con);
+ con->put(con);
+ } else {
+ dout("ceph_queue_con %p\n", con);
}
}
dout("con_work %p done\n", con);
out:
- put_connection(con);
+ con->put(con);
}
dout("fault queueing %p %d -> %d delay %lu\n", con,
atomic_read(&con->nref), atomic_read(&con->nref) + 1,
con->delay);
- atomic_inc(&con->nref);
+ con->get(con);
queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay));
}
if (myaddr) {
msgr->inst.addr = *myaddr;
} else {
- dout("create my ip not specified, binding to INADDR_ANY\n");
+ dout("create ip not specified, initially INADDR_ANY\n");
msgr->inst.addr.ipaddr.sin_addr.s_addr = htonl(INADDR_ANY);
msgr->inst.addr.ipaddr.sin_port = htons(0); /* any port */
}
list_all);
dout("destroy removing connection %p\n", con);
set_bit(CLOSED, &con->state);
- atomic_inc(&con->nref);
- dout(" get %p %d -> %d\n", con,
- atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+ con->get(con);
__remove_connection(msgr, con);
/* in case there's queued work. drop a reference if
* we successfully cancel work. */
spin_unlock(&msgr->con_lock);
if (cancel_delayed_work_sync(&con->work))
- put_connection(con);
- put_connection(con);
+ con->put(con);
+ con->put(con);
dout("destroy removed connection %p\n", con);
spin_lock(&msgr->con_lock);
/*
* mark a peer down. drop any open connections.
*/
+void ceph_con_close(struct ceph_connection *con)
+{
+ dout("close %p peer %u.%u.%u.%u:%u\n", con,
+ IPQUADPORT(con->peer_addr.ipaddr));
+ set_bit(CLOSED, &con->state); /* in case there's queued work */
+}
+
void ceph_messenger_mark_down(struct ceph_messenger *msgr,
struct ceph_entity_addr *addr)
{
}
spin_unlock(&msgr->con_lock);
if (con)
- put_connection(con);
+ con->put(con);
}
-
/*
* A single ceph_msg can't be queued for send twice, unless it's
* already been delivered (i.e. we have the only remaining reference),
spin_lock(&msgr->con_lock);
con = __get_connection(msgr, &msg->hdr.dst.addr);
if (con) {
- put_connection(newcon);
+ con->put(newcon);
dout("ceph_msg_send (lost race and) had connection "
"%p to peer %u.%u.%u.%u:%u\n", con,
IPQUADPORT(msg->hdr.dst.addr.ipaddr));
if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
ceph_queue_con(con);
- put_connection(con);
+ con->put(con);
dout("ceph_msg_send done\n");
return ret;
}
/* set source */
msg->hdr.src = con->msgr->inst;
msg->hdr.orig_src = con->msgr->inst;
+ msg->hdr.dst.addr = con->peer_addr;
+ msg->hdr.dst.name = con->peer_name;
/* queue */
spin_lock(&con->out_queue_lock);