con->msgr = msgr;
spin_lock_init(&con->con_lock);
- /*INIT_WORK(&con->rwork, ceph_reader);*/ /* setup work structure */
+ INIT_WORK(&con->rwork, try_read); /* setup work structure */
+ INIT_WORK(&con->rwork, try_write); /* setup work structure */
atomic_inc(&con->nref);
return con;
}
-
-
-
/*
* Accepter thread
*/
-static int ceph_accepter(struct ceph_messenger *msgr)
+static int try_accept(struct work_struct *work)
{
struct socket *sd, *new_sd;
struct sockaddr saddr;
- struct ceph_connection *con = NULL;
-
- memset(&saddr, 0, sizeof(saddr));
-
- printk(KERN_INFO "starting kernel thread\n");
- set_current_state(TASK_INTERRUPTIBLE);
-
- /* TBD: if address specified by mount */
- /* make my address from user specified address, fill in saddr */
-
- sd = _klisten(&saddr);
-
- /* an endless loop in which we are accepting connections */
- while (!kthread_should_stop()) {
- /* TBD: should we schedule? or will accept take care of this? */
- new_sd = _kaccept(sd);
- printk(KERN_INFO "accepted connection \n");
- set_current_state(TASK_INTERRUPTIBLE);
- /* initialize the msgr connection */
- con = new_connection(msgr);
- if (con == NULL) {
- printk(KERN_INFO "malloc failure\n");
- sock_release(new_sd);
- break;
- }
- con->sock = new_sd;
- con->state = ACCEPTING;
- con->in_tag = CEPH_MSGR_TAG_READY;
+ struct ceph_connection *con;
+ struct ceph_connection *new_con = NULL;
+
+ con = container_of(work, struct ceph_connection, awork);
+
- prepare_write_accept_announce(msgr, con);
+ printk(KERN_INFO "Entered try_accept\n");
- add_connection_accepting(msgr, con);
- /* hand off to worker threads , read pending */
- /*?? queue_work(recv_wq, &con->rwork);*/
+ if(kernel_accept(sd, &new_sd, sd->file->f_flags) < 0) {
+ printk(KERN_INFO "error accepting connection \n");
+ goto done;
}
- set_current_state(TASK_RUNNING);
- printk(KERN_INFO "kernel thread exiting\n");
- sock_release(sd);
+ printk(KERN_INFO "accepted connection \n");
+
+ /* get the address at the other end */
+ memset(&saddr, 0, sizeof(saddr));
+ if (new_sd->ops->getname(new_sd, saddr, &len, 2)) {
+ printk(KERN_INFO "getname error connection aborted\n");
+ sock_release(new_sd);
+ goto done;
+ }
+
+ /* initialize the msgr connection */
+ new_con = new_connection(msgr);
+ if (new_con == NULL) {
+ printk(KERN_INFO "malloc failure\n");
+ sock_release(new_sd);
+ goto done;
+ }
+ new_con->sock = new_sd;
+ setbit(ACCEPTING, &con->state);
+ new_con->in_tag = CEPH_MSGR_TAG_READY;
+ new_con->peeraddr = saddr;
+/* TBD: may not use this.. */
+ new_sd->sk->sk_user_data = con;
+
+ prepare_write_accept_announce(msgr, con);
+
+ add_connection_accepting(msgr, con);
+
+/* add to poll list? or hand off to send workqueue? */
+
+ /* hand off to worker threads , send pending */
+ /*?? queue_work(send_wq, &new_con->swork);*/
+done:
return(0);
}
destroy_workqueue(send_wq);
return ret;
}
-/* TBD: need to do this during mount, one per kmsgr */
- athread = kthread_run(ceph_accepter, NULL, "ceph accepter thread");
return(ret);
}
destroy_workqueue(send_wq);
destroy_workqueue(recv_wq);
}
+
+struct ceph_connection *new_listener(struct ceph_messenger *msgr)
+{
+ struct ceph_connection *con;
+ struct sockaddr saddr;
+ memset(&saddr, 0, sizeof(saddr));
+
+ con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+ if (con == NULL) return 0;
+ memset(&con, 0, sizeof(con));
+
+ /* create listener connection */
+ spin_lock_init(&con->con_lock);
+ INIT_WORK(&con->awork, try_accept); /* setup work structure */
+ con->msgr = msgr;
+ atomic_inc(&con->nref);
+
+ /* TBD: if address specified by mount */
+ /* make my address from user specified address, fill in saddr */
+
+ con->sock = _klisten(&saddr);
+ return(con);
+}
+
+/*
+ * create a new messenger
+ */
+static struct ceph_messenger *new_messenger()
+{
+ struct ceph_messenger *msgr;
+ struct ceph_connection *con;
+
+ msgr = kmalloc(sizeof(struct ceph_messenger), GFP_KERNEL);
+ if (msgr == NULL) return 0;
+ memset(&msgr, 0, sizeof(msgr));
+
+ con = kmalloc(sizeof(struct ceph_connection), GFP_KERNEL);
+ if (con == NULL) return 0;
+ memset(&con, 0, sizeof(con));
+
+ /* create listener connection */
+ spin_lock_init(&con->con_lock);
+ INIT_WORK(&con->awork, try_accept); /* setup work structure */
+ con->msgr = msgr;
+ atomic_inc(&con->nref);
+
+ /* start up poll thread */
+ athread = kthread_run(ceph_poll, NULL, "ceph poll thread"); */
+ return msgr;
+}
+
+struct ceph_messenger *init_messenger()
+{
+
+}