kernel/export.c\
kernel/file.c\
kernel/inode.c\
- kernel/ktcp.c\
- kernel/ktcp.h\
kernel/mds_client.c\
kernel/mds_client.h\
kernel/mdsmap.c\
obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-objs := super.o inode.o dir.o file.o addr.o export.o \
- ktcp.o messenger.o \
+ messenger.o \
mds_client.o mdsmap.o \
mon_client.o \
osd_client.o osdmap.o crush/crush.o crush/mapper.o \
+++ /dev/null
-#include <linux/socket.h>
-#include <linux/net.h>
-#include <net/tcp.h>
-#include <linux/string.h>
-#include "messenger.h"
-#include "ktcp.h"
-
-int ceph_debug_tcp;
-#define DOUT_VAR ceph_debug_tcp
-#define DOUT_PREFIX "tcp: "
-#include "super.h"
-
-struct workqueue_struct *con_wq;
-
-
-/*
- * socket callback functions
- */
-
-/* listen socket received a connect */
-static void ceph_accept_ready(struct sock *sk, int count_unused)
-{
- struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
-
- dout(30, "ceph_accept_ready messenger %p sk_state = %u\n",
- msgr, sk->sk_state);
- if (sk->sk_state == TCP_LISTEN)
- queue_work(con_wq, &msgr->awork);
-}
-
-/* Data available on socket or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
-{
- struct ceph_connection *con =
- (struct ceph_connection *)sk->sk_user_data;
- if (sk->sk_state != TCP_CLOSE_WAIT) {
- dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
- con, con->state);
- ceph_queue_con(con);
- }
-}
-
-/* socket has bufferspace for writing */
-static void ceph_write_space(struct sock *sk)
-{
- struct ceph_connection *con =
- (struct ceph_connection *)sk->sk_user_data;
-
- dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
-
- /* only queue to workqueue if a WRITE is pending */
- if (test_bit(WRITE_PENDING, &con->state)) {
- dout(30, "ceph_write_space %p queuing write work\n", con);
- ceph_queue_con(con);
- }
-
- /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */
- clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
-}
-
-/* sockets state has change */
-static void ceph_state_change(struct sock *sk)
-{
- struct ceph_connection *con =
- (struct ceph_connection *)sk->sk_user_data;
-
- dout(30, "ceph_state_change %p state = %lu sk_state = %u\n",
- con, con->state, sk->sk_state);
-
- switch (sk->sk_state) {
- case TCP_CLOSE:
- dout(30, "ceph_state_change TCP_CLOSE\n");
- case TCP_CLOSE_WAIT:
- dout(30, "ceph_state_change TCP_CLOSE_WAIT\n");
- set_bit(SOCK_CLOSE, &con->state);
- if (test_bit(CONNECTING, &con->state))
- con->error_msg = "connection refused";
- else
- con->error_msg = "socket closed";
- ceph_queue_con(con);
- break;
- case TCP_ESTABLISHED:
- dout(30, "ceph_state_change TCP_ESTABLISHED\n");
- ceph_write_space(sk);
- break;
- }
-}
-
-/* make a listening socket active by setting up the data ready call back */
-static void listen_sock_callbacks(struct socket *sock,
- struct ceph_messenger *msgr)
-{
- struct sock *sk = sock->sk;
- sk->sk_user_data = (void *)msgr;
- sk->sk_data_ready = ceph_accept_ready;
-}
-
-/* make a socket active by setting up the call back functions */
-static void set_sock_callbacks(struct socket *sock,
- struct ceph_connection *con)
-{
- struct sock *sk = sock->sk;
- sk->sk_user_data = (void *)con;
- sk->sk_data_ready = ceph_data_ready;
- sk->sk_write_space = ceph_write_space;
- sk->sk_state_change = ceph_state_change;
-}
-
-/*
- * initiate connection to a remote socket.
- */
-struct socket *ceph_tcp_connect(struct ceph_connection *con)
-{
- int ret;
- struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
- struct socket *sock;
-
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
- if (ret)
- return ERR_PTR(ret);
-
- con->sock = sock;
- sock->sk->sk_allocation = GFP_NOFS;
-
- set_sock_callbacks(sock, con);
-
- ret = sock->ops->connect(sock, paddr,
- sizeof(struct sockaddr_in), O_NONBLOCK);
- if (ret == -EINPROGRESS) {
- dout(20, "connect EINPROGRESS sk_state = = %u\n",
- sock->sk->sk_state);
- ret = 0;
- }
- if (ret < 0) {
- /* TBD check for fatal errors, retry if not fatal.. */
- derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
- IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
- sock_release(sock);
- con->sock = 0;
- }
-
- if (ret < 0)
- return ERR_PTR(ret);
- return sock;
-}
-
-/*
- * setup listening socket
- */
-int ceph_tcp_listen(struct ceph_messenger *msgr)
-{
- int ret;
- int optval = 1;
- struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
- int nlen;
- struct socket *sock;
-
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
- if (ret)
- return ret;
-
- sock->sk->sk_allocation = GFP_NOFS;
-
- ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
- (char *)&optval, sizeof(optval));
- if (ret < 0) {
- derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
- goto err;
- }
-
- ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
- sizeof(*myaddr));
- if (ret < 0) {
- derr(0, "Failed to bind: %d\n", ret);
- goto err;
- }
-
- /* what port did we bind to? */
- nlen = sizeof(*myaddr);
- ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen,
- 0);
- if (ret < 0) {
- derr(0, "failed to getsockname: %d\n", ret);
- goto err;
- }
- dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
-
- ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
- (char *)&optval, sizeof(optval));
- if (ret < 0) {
- derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
- goto err;
- }
-
- /* TBD: probaby want to tune the backlog queue .. */
- ret = sock->ops->listen(sock, NUM_BACKUP);
- if (ret < 0) {
- derr(0, "kernel_listen error: %d\n", ret);
- goto err;
- }
-
- /* ok! */
- msgr->listen_sock = sock;
- listen_sock_callbacks(sock, msgr);
- return ret;
-
-err:
- sock_release(sock);
- return ret;
-}
-
-/*
- * accept a connection
- */
-int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
-{
- int ret;
- struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
- int len;
- struct socket *sock;
-
- ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
- if (ret)
- return ret;
- con->sock = sock;
-
- sock->sk->sk_allocation = GFP_NOFS;
-
- ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
- if (ret < 0) {
- derr(0, "accept error: %d\n", ret);
- goto err;
- }
-
- sock->ops = lsock->ops;
- sock->type = lsock->type;
- ret = sock->ops->getname(sock, paddr, &len, 2);
- if (ret < 0) {
- derr(0, "getname error: %d\n", ret);
- goto err;
- }
-
- /* setup callbacks */
- set_sock_callbacks(sock, con);
-
- return ret;
-
-err:
- sock->ops->shutdown(sock, SHUT_RDWR);
- sock_release(sock);
- return ret;
-}
-
-/*
- * receive a message this may return after partial send
- */
-int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
-{
- struct kvec iov = {buf, len};
- struct msghdr msg = {.msg_flags = 0};
- int rlen = 0; /* length read */
-
- msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
- rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
- return(rlen);
-}
-
-
-/*
- * Send a message this may return after partial send
- */
-int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
- size_t kvlen, size_t len, int more)
-{
- struct msghdr msg = {.msg_flags = 0};
- int rlen = 0;
-
- msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
- if (more)
- msg.msg_flags |= MSG_MORE;
- else
- msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
-
- /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
- rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
- /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
- return(rlen);
-}
-
-/*
- * workqueue initialization
- */
-
-int ceph_workqueue_init(void)
-{
- int ret = 0;
-
- con_wq = create_workqueue("ceph-net");
- if (IS_ERR(con_wq)) {
- derr(0, "net worker failed to start: %d\n", ret);
- destroy_workqueue(con_wq);
- ret = PTR_ERR(con_wq);
- con_wq = 0;
- return ret;
- }
-
- return(ret);
-}
-
-/*
- * workqueue shutdown
- */
-void ceph_workqueue_shutdown(void)
-{
- destroy_workqueue(con_wq);
-}
+++ /dev/null
-#ifndef _FS_CEPH_TCP_H
-#define _FS_CEPH_TCP_H
-
-extern struct workqueue_struct *con_wq; /* receive work queue */
-
-/* prototype definitions */
-struct socket *ceph_tcp_connect(struct ceph_connection *);
-int ceph_tcp_listen(struct ceph_messenger *);
-int ceph_tcp_accept(struct socket *, struct ceph_connection *);
-int ceph_tcp_recvmsg(struct socket *, void *, size_t );
-int ceph_tcp_sendmsg(struct socket *, struct kvec *, size_t, size_t, int more);
-int ceph_workqueue_init(void);
-void ceph_workqueue_shutdown(void);
-
-/* Max number of outstanding connections in listener queueu */
-#define NUM_BACKUP 10
-#endif
#include "ceph_fs.h"
#include "messenger.h"
-#include "ktcp.h"
int ceph_debug_msgr;
#define DOUT_VAR ceph_debug_msgr
static void con_work(struct work_struct *);
static void try_accept(struct work_struct *);
+
/*
- * connections
+ * workqueue
*/
+struct workqueue_struct *ceph_msgr_wq;
-static void con_close_socket(struct ceph_connection *con)
+int ceph_msgr_init(void)
{
- dout(10, "con_close_socket on %p sock %p\n", con, con->sock);
- if (!con->sock)
- return;
- con->sock->ops->shutdown(con->sock, SHUT_RDWR);
- sock_release(con->sock);
- con->sock = 0;
+ ceph_msgr_wq = create_workqueue("ceph-msgr");
+ if (IS_ERR(ceph_msgr_wq)) {
+ int ret = PTR_ERR(ceph_msgr_wq);
+ derr(0, "failed to create workqueue: %d\n", ret);
+ ceph_msgr_wq = 0;
+ return ret;
+ }
+ return 0;
+}
+
+void ceph_msgr_exit(void)
+{
+ destroy_workqueue(ceph_msgr_wq);
+}
+
+
+/*
+ * socket callback functions
+ */
+
+/* listen socket received a connect */
+static void ceph_accept_ready(struct sock *sk, int count_unused)
+{
+ struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
+
+ dout(30, "ceph_accept_ready messenger %p sk_state = %u\n",
+ msgr, sk->sk_state);
+ if (sk->sk_state == TCP_LISTEN)
+ queue_work(ceph_msgr_wq, &msgr->awork);
+}
+
+/* Data available on socket or listen socket received a connect */
+static void ceph_data_ready(struct sock *sk, int count_unused)
+{
+ struct ceph_connection *con =
+ (struct ceph_connection *)sk->sk_user_data;
+ if (sk->sk_state != TCP_CLOSE_WAIT) {
+ dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
+ con, con->state);
+ ceph_queue_con(con);
+ }
+}
+
+/* socket has bufferspace for writing */
+static void ceph_write_space(struct sock *sk)
+{
+ struct ceph_connection *con =
+ (struct ceph_connection *)sk->sk_user_data;
+
+ dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
+
+ /* only queue to workqueue if a WRITE is pending */
+ if (test_bit(WRITE_PENDING, &con->state)) {
+ dout(30, "ceph_write_space %p queuing write work\n", con);
+ ceph_queue_con(con);
+ }
+
+ /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */
+ clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+}
+
+/* sockets state has change */
+static void ceph_state_change(struct sock *sk)
+{
+ struct ceph_connection *con =
+ (struct ceph_connection *)sk->sk_user_data;
+
+ dout(30, "ceph_state_change %p state = %lu sk_state = %u\n",
+ con, con->state, sk->sk_state);
+
+ switch (sk->sk_state) {
+ case TCP_CLOSE:
+ dout(30, "ceph_state_change TCP_CLOSE\n");
+ case TCP_CLOSE_WAIT:
+ dout(30, "ceph_state_change TCP_CLOSE_WAIT\n");
+ set_bit(SOCK_CLOSE, &con->state);
+ if (test_bit(CONNECTING, &con->state))
+ con->error_msg = "connection refused";
+ else
+ con->error_msg = "socket closed";
+ ceph_queue_con(con);
+ break;
+ case TCP_ESTABLISHED:
+ dout(30, "ceph_state_change TCP_ESTABLISHED\n");
+ ceph_write_space(sk);
+ break;
+ }
+}
+
+/* make a listening socket active by setting up the data ready call back */
+static void listen_sock_callbacks(struct socket *sock,
+ struct ceph_messenger *msgr)
+{
+ struct sock *sk = sock->sk;
+ sk->sk_user_data = (void *)msgr;
+ sk->sk_data_ready = ceph_accept_ready;
+}
+
+/* make a socket active by setting up the call back functions */
+static void set_sock_callbacks(struct socket *sock,
+ struct ceph_connection *con)
+{
+ struct sock *sk = sock->sk;
+ sk->sk_user_data = (void *)con;
+ sk->sk_data_ready = ceph_data_ready;
+ sk->sk_write_space = ceph_write_space;
+ sk->sk_state_change = ceph_state_change;
+}
+
+
+/*
+ * socket helpers
+ */
+
+/*
+ * initiate connection to a remote socket.
+ */
+struct socket *ceph_tcp_connect(struct ceph_connection *con)
+{
+ int ret;
+ struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+ struct socket *sock;
+
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+ if (ret)
+ return ERR_PTR(ret);
+
+ con->sock = sock;
+ sock->sk->sk_allocation = GFP_NOFS;
+
+ set_sock_callbacks(sock, con);
+
+ ret = sock->ops->connect(sock, paddr,
+ sizeof(struct sockaddr_in), O_NONBLOCK);
+ if (ret == -EINPROGRESS) {
+ dout(20, "connect EINPROGRESS sk_state = = %u\n",
+ sock->sk->sk_state);
+ ret = 0;
+ }
+ if (ret < 0) {
+ /* TBD check for fatal errors, retry if not fatal.. */
+ derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
+ IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
+ sock_release(sock);
+ con->sock = 0;
+ }
+
+ if (ret < 0)
+ return ERR_PTR(ret);
+ return sock;
+}
+
+/*
+ * setup listening socket
+ */
+int ceph_tcp_listen(struct ceph_messenger *msgr)
+{
+ int ret;
+ int optval = 1;
+ struct sockaddr_in *myaddr = &msgr->inst.addr.ipaddr;
+ int nlen;
+ struct socket *sock;
+
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+ if (ret)
+ return ret;
+
+ sock->sk->sk_allocation = GFP_NOFS;
+
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+ (char *)&optval, sizeof(optval));
+ if (ret < 0) {
+ derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
+ goto err;
+ }
+
+ ret = sock->ops->bind(sock, (struct sockaddr *)myaddr,
+ sizeof(*myaddr));
+ if (ret < 0) {
+ derr(0, "Failed to bind: %d\n", ret);
+ goto err;
+ }
+
+ /* what port did we bind to? */
+ nlen = sizeof(*myaddr);
+ ret = sock->ops->getname(sock, (struct sockaddr *)myaddr, &nlen,
+ 0);
+ if (ret < 0) {
+ derr(0, "failed to getsockname: %d\n", ret);
+ goto err;
+ }
+ dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
+
+ ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
+ (char *)&optval, sizeof(optval));
+ if (ret < 0) {
+ derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
+ goto err;
+ }
+
+ /* TBD: probaby want to tune the backlog queue .. */
+ ret = sock->ops->listen(sock, CEPH_MSGR_BACKUP);
+ if (ret < 0) {
+ derr(0, "kernel_listen error: %d\n", ret);
+ goto err;
+ }
+
+ /* ok! */
+ msgr->listen_sock = sock;
+ listen_sock_callbacks(sock, msgr);
+ return ret;
+
+err:
+ sock_release(sock);
+ return ret;
}
+/*
+ * accept a connection
+ */
+int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
+{
+ int ret;
+ struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
+ int len;
+ struct socket *sock;
+
+ ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
+ if (ret)
+ return ret;
+ con->sock = sock;
+
+ sock->sk->sk_allocation = GFP_NOFS;
+
+ ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
+ if (ret < 0) {
+ derr(0, "accept error: %d\n", ret);
+ goto err;
+ }
+
+ sock->ops = lsock->ops;
+ sock->type = lsock->type;
+ ret = sock->ops->getname(sock, paddr, &len, 2);
+ if (ret < 0) {
+ derr(0, "getname error: %d\n", ret);
+ goto err;
+ }
+
+ /* setup callbacks */
+ set_sock_callbacks(sock, con);
+
+ return ret;
+
+err:
+ sock->ops->shutdown(sock, SHUT_RDWR);
+ sock_release(sock);
+ return ret;
+}
+
+/*
+ * receive a message this may return after partial send
+ */
+int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
+{
+ struct kvec iov = {buf, len};
+ struct msghdr msg = {.msg_flags = 0};
+ int rlen = 0; /* length read */
+
+ msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+ rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
+ return(rlen);
+}
+
+
+/*
+ * Send a message this may return after partial send
+ */
+int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
+ size_t kvlen, size_t len, int more)
+{
+ struct msghdr msg = {.msg_flags = 0};
+ int rlen = 0;
+
+ msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
+ if (more)
+ msg.msg_flags |= MSG_MORE;
+ else
+ msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */
+
+ /*printk(KERN_DEBUG "before sendmsg %d\n", len);*/
+ rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
+ /*printk(KERN_DEBUG "after sendmsg %d\n", rlen);*/
+ return(rlen);
+}
+
+
+
/*
* create a new connection. initial state is NEW.
*/
}
+/*
+ * close connection socket
+ */
+static void con_close_socket(struct ceph_connection *con)
+{
+ dout(10, "con_close_socket on %p sock %p\n", con, con->sock);
+ if (!con->sock)
+ return;
+ con->sock->ops->shutdown(con->sock, SHUT_RDWR);
+ sock_release(con->sock);
+ con->sock = 0;
+}
/*
* drop a reference
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
- !queue_work(con_wq, &con->work.work)) {
- dout(40, "ceph_queue_write %p - already BUSY or queued\n", con);
+ !queue_work(ceph_msgr_wq, &con->work.work)) {
+ dout(40, "ceph_queue_con %p - already BUSY or queued\n", con);
put_connection(con);
}
}
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
set_bit(QUEUED, &con->state);
if (test_bit(BUSY, &con->state) ||
- !queue_delayed_work(con_wq, &con->work,
+ !queue_delayed_work(ceph_msgr_wq, &con->work,
round_jiffies_relative(con->delay))) {
dout(40, "ceph_queue_con_delayed %p - already BUSY or queued\n",
con);
}
}
-
struct ceph_msg;
+extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */
+
typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_name *pn);
typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
}
}
+#define CEPH_MSGR_BACKUP 10 /* backlogged incoming connections */
+
/* use format string %s%d */
#define ENTITY_NAME(n) \
ceph_name_type_str(le32_to_cpu((n).type)), \
unsigned long delay; /* delay interval */
};
+extern int ceph_msgr_init(void);
+extern void ceph_msgr_exit(void);
extern struct ceph_messenger *
ceph_messenger_create(struct ceph_entity_addr *myaddr);
static struct proc_dir_entry *proc_fs_ceph;
-void ceph_proc_init(void)
+int ceph_proc_init(void)
{
struct proc_dir_entry *pde;
proc_fs_ceph = proc_mkdir("ceph", proc_root_fs);
-
if (!proc_fs_ceph)
- return;
+ return -ENOMEM;
proc_fs_ceph->owner = THIS_MODULE;
pde = create_proc_read_entry("debug", 0,
if (pde)
pde->write_proc = ceph_debug_level_write;
+ return 0;
}
void ceph_proc_cleanup()
#define DOUT_VAR ceph_debug_super
#define DOUT_PREFIX "super: "
#include "super.h"
-#include "ktcp.h"
#include <linux/statfs.h>
#include "mon_client.h"
Opt_debug,
Opt_debug_console,
Opt_debug_msgr,
- Opt_debug_tcp,
Opt_debug_mdsc,
Opt_debug_osdc,
Opt_debug_addr,
{Opt_fsidminor, "fsidminor=%ld"},
{Opt_debug, "debug=%d"},
{Opt_debug_msgr, "debug_msgr=%d"},
- {Opt_debug_tcp, "debug_tcp=%d"},
{Opt_debug_mdsc, "debug_mdsc=%d"},
{Opt_debug_osdc, "debug_osdc=%d"},
{Opt_debug_addr, "debug_addr=%d"},
case Opt_debug_msgr:
ceph_debug_msgr = intval;
break;
- case Opt_debug_tcp:
- ceph_debug_tcp = intval;
- break;
case Opt_debug_mdsc:
ceph_debug_mdsc = intval;
break;
return 0;
}
-/*
- * share work queue between clients.
- */
-atomic_t ceph_num_clients = ATOMIC_INIT(0);
-
-static void get_client_counter(void)
-{
- if (atomic_add_return(1, &ceph_num_clients) == 1) {
- dout(10, "first client, setting up workqueues\n");
- ceph_workqueue_init();
- }
-}
-
-static void put_client_counter(void)
-{
- if (atomic_dec_and_test(&ceph_num_clients)) {
- dout(10, "last client, shutting down workqueues\n");
- ceph_workqueue_shutdown();
- }
-}
-
/*
* create a fresh client instance
*/
cl->msgr = 0;
- /* start work queues? */
- get_client_counter();
-
cl->wb_wq = create_workqueue("ceph-writeback");
if (cl->wb_wq == 0)
goto fail;
fail:
/* fixme: use type->init() eventually */
- put_client_counter();
return ERR_PTR(-ENOMEM);
}
destroy_workqueue(cl->trunc_wq);
if (cl->msgr)
ceph_messenger_destroy(cl->msgr);
- put_client_counter();
kfree(cl);
dout(10, "destroy_client %p done\n", cl);
}
dout(1, "init_ceph\n");
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
+ ret = -ENOMEM;
ceph_kobj = kobject_create_and_add("ceph", fs_kobj);
if (!ceph_kobj)
- return -ENOMEM;
+ goto out;
#endif
- ceph_proc_init();
+
+ ret = ceph_proc_init();
+ if (ret < 0)
+ goto out_kobj;
+
+ ret = ceph_msgr_init();
+ if (ret < 0)
+ goto out_proc;
ret = init_inodecache();
if (ret)
- goto out;
+ goto out_msgr;
+
ret = register_filesystem(&ceph_fs_type);
if (ret)
- destroy_inodecache();
+ goto out_icache;
+ return 0;
+
+out_icache:
+ destroy_inodecache();
+out_msgr:
+ ceph_msgr_exit();
+out_proc:
+ ceph_proc_cleanup();
+out_kobj:
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
+ kobject_put(ceph_kobj);
+ ceph_kobj = 0;
+#endif
out:
return ret;
}
static void __exit exit_ceph(void)
{
dout(1, "exit_ceph\n");
-
+ unregister_filesystem(&ceph_fs_type);
+ destroy_inodecache();
+ ceph_msgr_exit();
+ ceph_proc_cleanup();
#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,25)
kobject_put(ceph_kobj);
ceph_kobj = 0;
#endif
- ceph_proc_cleanup();
-
- unregister_filesystem(&ceph_fs_type);
- destroy_inodecache();
}
module_init(init_ceph);
extern int ceph_debug;
extern int ceph_debug_msgr;
extern int ceph_debug_super;
-extern int ceph_debug_tcp;
extern int ceph_debug_mdsc;
extern int ceph_debug_osdc;
extern int ceph_debug_addr;
extern const struct export_operations ceph_export_ops;
/* proc.c */
-extern void ceph_proc_init(void);
+extern int ceph_proc_init(void);
extern void ceph_proc_cleanup(void);
#endif /* _FS_CEPH_SUPER_H */