struct ceph_socket *ceph_socket_create()
{
struct ceph_socket *s;
- int err;
-
+ int err = -ENOMEM;
+
s = kzalloc(sizeof(*s), GFP_NOFS);
- if (!s) {
- derr(10, "ENOMEM creating ceph_socket\n");
- return ERR_PTR(-ENOMEM);
- }
+ if (!s)
+ goto out;
err = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &s->sock);
- if (err) {
- derr(10, "sock_create_kern error %d\n", err);
- return ERR_PTR(err);
- }
+ if (err)
+ goto out_free;
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
- kobject_init_and_add(&s->kobj, &ceph_socket_type,
- ceph_sockets_kobj,
- "socket %p", s);
+ err = kobject_init_and_add(&s->kobj, &ceph_socket_type,
+ ceph_sockets_kobj,
+ "socket %p", s);
+ if (err)
+ goto out_release;
#else
kobject_init(&s->kobj);
kobject_set_name(&s->kobj, "socket %p", s);
s->kobj.ktype = &ceph_socket_type;
#endif
return s;
+
+out_release:
+ sock_release(s->sock);
+out_free:
+ kfree(s);
+out:
+ return ERR_PTR(err);
}
void ceph_socket_get(struct ceph_socket *s)
BUG_ON(!r);
}
-void ceph_socket_put(struct ceph_socket *s, int die)
+void ceph_socket_put(struct ceph_socket *s)
{
dout(10, "socket_put %p\n", s);
if (!s)
return;
- if (die)
- ceph_cancel_sock_callbacks(s);
kobject_put(&s->kobj);
}
/* TBD check for fatal errors, retry if not fatal.. */
derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
- ceph_socket_put(s, 1);
+ ceph_cancel_sock_callbacks(s);
+ ceph_socket_put(s);
con->s = 0;
}
return ret;
err:
- ceph_socket_put(s, 0);
+ ceph_socket_put(s);
return ret;
}
return ret;
err:
- ceph_socket_put(s, 0);
+ ceph_socket_put(s);
con->s = 0;
return ret;
}
/* existing? */
head = radix_tree_lookup(&msgr->con_tree, key);
if (head == NULL)
- goto out;
+ return NULL;
con = list_entry(head, struct ceph_connection, list_bucket);
- if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
- atomic_inc(&con->nref);
- goto out;
- }
+ if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0)
+ goto yes;
list_for_each(p, head) {
con = list_entry(p, struct ceph_connection, list_bucket);
- if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
- atomic_inc(&con->nref);
- goto out;
- }
+ if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0)
+ goto yes;
}
- con = NULL;
-out:
+ return NULL;
+
+yes:
+ atomic_inc(&con->nref);
+ dout(20, "get_connection %p nref = %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
return con;
}
*/
static void put_connection(struct ceph_connection *con)
{
- dout(20, "put_connection nref = %d\n", atomic_read(&con->nref));
+ dout(20, "put_connection %p nref = %d -> %d\n", con,
+ atomic_read(&con->nref), atomic_read(&con->nref) - 1);
if (atomic_dec_and_test(&con->nref)) {
- dout(20, "put_connection destroying %p\n", con);
+ dout(20, "put_connection %p destroying\n", con);
ceph_msg_put_list(&con->out_queue);
ceph_msg_put_list(&con->out_sent);
- ceph_socket_put(con->s, 1);
+ ceph_cancel_sock_callbacks(con->s);
+ ceph_socket_put(con->s);
kfree(con);
}
}
/* inc ref count */
atomic_inc(&con->nref);
+ dout(20, "add_connection %p %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
if (test_and_clear_bit(ACCEPTING, &con->state)) {
list_del_init(&con->list_bucket);
struct ceph_connection *con)
{
atomic_inc(&con->nref);
+ dout(20, "add_connection_accepting %p nref = %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
spin_lock(&msgr->con_lock);
list_add(&con->list_all, &msgr->con_all);
spin_unlock(&msgr->con_lock);
*/
void ceph_queue_write(struct ceph_connection *con)
{
- dout(40, "ceph_queue_write %p\n", con);
atomic_inc(&con->nref);
+ dout(40, "ceph_queue_write %p %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
if (!queue_work(send_wq, &con->swork.work)) {
dout(40, "ceph_queue_write %p - already queued\n", con);
put_connection(con);
void ceph_queue_delayed_write(struct ceph_connection *con)
{
- dout(40, "ceph_queue_delayed_write %p delay %lu\n", con, con->delay);
atomic_inc(&con->nref);
+ dout(40, "ceph_queue_delayed_write %p delay %lu %d -> %d\n", con,
+ con->delay,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
if (!queue_delayed_work(send_wq, &con->swork,
round_jiffies_relative(con->delay))) {
dout(40, "ceph_queue_delayed_write %p - already queued\n", con);
dout(40, "ceph_queue_read %p - already READABLE\n", con);
return;
}
- dout(40, "ceph_queue_read %p\n", con);
atomic_inc(&con->nref);
+ dout(40, "ceph_queue_read %p %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
queue_work(recv_wq, &con->rwork);
}
if (con->delay) {
dout(30, "fault tcp_close delay != 0\n");
- ceph_socket_put(con->s, 1);
+ ceph_cancel_sock_callbacks(con->s);
+ ceph_socket_put(con->s);
con->s = NULL;
set_bit(NEW, &con->state);
done:
dout(30, "try_write done on %p\n", con);
- ceph_socket_put(s, 0);
+ ceph_socket_put(s);
put_connection(con);
return;
}
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
set_bit(WAIT, &con->state);
- ceph_socket_put(con->s, 1);
+ ceph_cancel_sock_callbacks(con->s);
+ ceph_socket_put(con->s);
con->s = NULL;
break;
case CEPH_MSGR_TAG_READY:
out:
dout(20, "try_read done on %p\n", con);
- ceph_socket_put(s, 0);
+ ceph_socket_put(s);
put_connection(con);
return;
}
dout(2, "destroy %p\n", msgr);
/* stop listener */
- ceph_socket_put(msgr->listen_s, 1);
+ ceph_cancel_sock_callbacks(msgr->listen_s);
+ ceph_socket_put(msgr->listen_s);
cancel_work_sync(&msgr->awork);
/* kill off connections */
dout(10, "destroy removing connection %p\n", con);
set_bit(CLOSED, &con->state);
atomic_inc(&con->nref);
+ dout(40, " get %p %d -> %d\n", con,
+ atomic_read(&con->nref) - 1, atomic_read(&con->nref));
__remove_connection(msgr, con);
/* in case there's queued work... */
spin_unlock(&msgr->con_lock);
ceph_cancel_sock_callbacks(con->s);
- cancel_work_sync(&con->rwork);
- cancel_delayed_work_sync(&con->swork);
+ if (cancel_work_sync(&con->rwork))
+ put_connection(con);
+ if (cancel_delayed_work_sync(&con->swork))
+ put_connection(con);
put_connection(con);
dout(10, "destroy removed connection %p\n", con);