#include <linux/random.h>
#include "client.h"
#include "super.h"
+#include "ktcp.h"
/* debug level; defined in include/ceph_fs.h */
int ceph_debug = 20;
+/*
+ * directory of filesystems mounted by this host
+ *
+ * key: fsid.major ^ fsid.minor
+ * value: struct ceph_client.fsid_item
+ */
+static spinlock_t ceph_client_spinlock = SPIN_LOCK_UNLOCKED;
+static int ceph_num_clients = 0;
+
+RADIX_TREE(ceph_clients, GFP_KERNEL);
+
+static void get_client_counter(void)
+{
+ spin_lock(&ceph_client_spinlock);
+ if (ceph_num_clients == 0) {
+ dout(1, "first client, setting up workqueues\n");
+ ceph_workqueue_init();
+ }
+ ceph_num_clients++;
+ spin_unlock(&ceph_client_spinlock);
+}
+
+static void put_client_counter(void)
+{
+ spin_lock(&ceph_client_spinlock);
+ ceph_num_clients--;
+ if (ceph_num_clients == 0) {
+ dout(1, "last client, shutting down workqueues\n");
+ ceph_workqueue_shutdown();
+ }
+ spin_unlock(&ceph_client_spinlock);
+}
+
void ceph_dispatch(struct ceph_client *client, struct ceph_msg *msg);
atomic_set(&cl->nref, 0);
init_waitqueue_head(&cl->mount_wq);
spin_lock_init(&cl->sb_lock);
+ get_client_counter();
/* messenger */
cl->msgr = ceph_messenger_create();
return cl;
fail:
+ put_client_counter();
kfree(cl);
return ERR_PTR(err);
}
/*
- * directory of filesystems mounted by this host
- *
- * key: fsid.major ^ fsid.minor
- * value: struct ceph_client.fsid_item
- */
-
-/* ignore all this until later
-RADIX_TREE(ceph_clients, GFP_KERNEL);
-
static struct ceph_client *get_client_fsid(struct ceph_fsid *fsid)
{
/* unmount */
/* ... */
+ put_client_counter();
kfree(cl);
}
}
{
struct sock *sk = sock->sk;
sk->sk_user_data = user_data;
- printk(KERN_INFO "Entered set_sock_callbacks\n");
+ dout(20, "set_sock_callbacks\n");
/* Install callbacks */
sk->sk_data_ready = ceph_data_ready;
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
if (ret < 0) {
- printk(KERN_INFO "sock_create_kern error: %d\n", ret);
+ derr(1, "ceph_tcp_connect sock_create_kern error: %d\n", ret);
goto done;
}
if (ret == -EINPROGRESS) return 0;
if (ret < 0) {
/* TBD check for fatal errors, retry if not fatal.. */
- printk(KERN_INFO "kernel_connect error: %d\n", ret);
+ derr(1, "ceph_tcp_connect kernel_connect error: %d\n", ret);
sock_release(con->sock);
con->sock = NULL;
}
*/
static struct ceph_connection *get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
{
- struct ceph_connection *con;
+ struct ceph_connection *con = NULL;
struct list_head *head, *p;
unsigned long key = hash_addr(addr);
/* existing? */
spin_lock(&msgr->con_lock);
head = radix_tree_lookup(&msgr->con_open, key);
- if (head) {
+ if (head == NULL)
+ goto out;
+ if (list_empty(head)) {
+ con = list_entry(head, struct ceph_connection, list_bucket);
+ if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
+ atomic_inc(&con->nref);
+ goto out;
+ }
+ } else {
list_for_each(p, head) {
con = list_entry(p, struct ceph_connection, list_bucket);
if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
static void put_connection(struct ceph_connection *con)
{
if (atomic_dec_and_test(&con->nref)) {
+ dout(20, "put_connection destroying %p\n", con);
sock_release(con->sock);
kfree(con);
}
spin_lock(&msgr->con_lock);
head = radix_tree_lookup(&msgr->con_open, key);
if (head) {
+ dout(20, "add_connection %p in existing bucket %lu\n", con, key);
list_add(&con->list_bucket, head);
} else {
+ dout(20, "add_connection %p in new bucket %lu\n", con, key);
INIT_LIST_HEAD(&con->list_bucket); /* empty */
radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
}
/* remove from con_open too */
if (list_empty(&con->list_bucket)) {
/* last one */
+ dout(20, "remove_connection %p and removing bucket %lu\n", con, key);
radix_tree_delete(&msgr->con_open, key);
} else {
+ dout(20, "remove_connection %p from bucket %lu\n", con, key);
list_del(&con->list_bucket);
}
}
con = new_connection(msgr);
if (IS_ERR(con))
return PTR_ERR(con);
- dout(5, "opening new connection to peer %x:%d\n",
+ dout(5, "ceph_msg_send opening new connection %p to peer %x:%d\n", con,
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
con->peer_addr = msg->hdr.dst.addr;
add_connection(msgr, con);
} else {
- dout(5, "had connection to peer %x:%d\n",
+ dout(5, "ceph_msg_send had connection %p to peer %x:%d\n", con,
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
}
/* initiate connect? */
if (test_bit(NEW, &con->state)) {
+ dout(5, "ceph_msg_send initiating connect on %p\n", con);
ret = ceph_tcp_connect(con);
if (ret < 0){
derr(1, "connection failure to peer %x:%d\n",
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
remove_connection(msgr, con);
- kfree(con);
+ put_connection(con);
return(ret);
-
}
}
/* queue */
- dout(1, "queuing outgoing message for %s%d\n",
- ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num);
+ dout(1, "ceph_msg_send queuing outgoing message for %s%d on %p\n",
+ ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
ceph_msg_get(msg);
list_add(&msg->list_head, &con->out_queue);