#include "super.h"
struct workqueue_struct *con_wq;
-struct workqueue_struct *accept_wq; /* accept work queue */
/*
dout(30, "ceph_accept_ready messenger %p sk_state = %u\n",
msgr, sk->sk_state);
- if (msgr && (sk->sk_state == TCP_LISTEN))
- queue_work(accept_wq, &msgr->awork);
+ if (sk->sk_state == TCP_LISTEN)
+ queue_work(con_wq, &msgr->awork);
}
/* Data available on socket or listen socket received a connect */
{
struct ceph_connection *con =
(struct ceph_connection *)sk->sk_user_data;
- if (con &&
- !test_bit(NEW, &con->state) &&
- !test_bit(WAIT, &con->state) &&
- !test_bit(CLOSED, &con->state) &&
- (sk->sk_state != TCP_CLOSE_WAIT)) {
+ if (sk->sk_state != TCP_CLOSE_WAIT) {
dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
con, con->state);
ceph_queue_con(con);
dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
/* only queue to workqueue if a WRITE is pending */
- if (con &&
- !test_bit(NEW, &con->state) &&
- !test_bit(WAIT, &con->state) &&
- !test_bit(CLOSED, &con->state) &&
- test_bit(WRITE_PENDING, &con->state)) {
+ if (test_bit(WRITE_PENDING, &con->state)) {
dout(30, "ceph_write_space %p queuing write work\n", con);
ceph_queue_con(con);
}
{
struct ceph_connection *con =
(struct ceph_connection *)sk->sk_user_data;
- if (con == NULL ||
- test_bit(NEW, &con->state) ||
- test_bit(CLOSED, &con->state) ||
- test_bit(WAIT, &con->state))
- return;
dout(30, "ceph_state_change %p state = %lu sk_state = %u\n",
con, con->state, sk->sk_state);
{
int ret = 0;
- dout(20, "entered work_init\n");
-
- /*
- * this needs to be a single threaded queue until we
- * have add a way to ensure that each connection is only
- * being processed by a single thread at a time.
- */
- con_wq = create_singlethread_workqueue("ceph-net");
+ con_wq = create_workqueue("ceph-net");
if (IS_ERR(con_wq)) {
derr(0, "net worker failed to start: %d\n", ret);
destroy_workqueue(con_wq);
return ret;
}
- accept_wq = create_singlethread_workqueue("ceph-accept");
- if (IS_ERR(accept_wq)) {
- derr(0, "net worker failed to start: %d\n", ret);
- destroy_workqueue(accept_wq);
- ret = PTR_ERR(accept_wq);
- accept_wq = 0;
- return ret;
- }
-
- dout(20, "successfully created wrkqueues\n");
-
return(ret);
}
void ceph_workqueue_shutdown(void)
{
destroy_workqueue(con_wq);
- destroy_workqueue(accept_wq);
}
*/
void ceph_queue_con(struct ceph_connection *con)
{
+ if (test_bit(WAIT, &con->state) ||
+ test_bit(CLOSED, &con->state)) {
+ dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED\n", con);
+ return;
+ }
+
atomic_inc(&con->nref);
dout(40, "ceph_queue_con %p %d -> %d\n", con,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
- if (!queue_work(con_wq, &con->work.work)) {
- dout(40, "ceph_queue_write %p - already queued\n", con);
+
+ set_bit(QUEUED, &con->state);
+ if (test_bit(BUSY, &con->state) ||
+ !queue_work(con_wq, &con->work.work)) {
+ dout(40, "ceph_queue_write %p - already BUSY or queued\n", con);
put_connection(con);
}
}
dout(40, "ceph_queue_con_delayed %p delay %lu %d -> %d\n", con,
con->delay,
atomic_read(&con->nref) - 1, atomic_read(&con->nref));
- if (!queue_delayed_work(con_wq, &con->work,
+ set_bit(QUEUED, &con->state);
+ if (test_bit(BUSY, &con->state) ||
+ !queue_delayed_work(con_wq, &con->work,
round_jiffies_relative(con->delay))) {
- dout(40, "ceph_queue_con_delayed %p - already queued\n", con);
+ dout(40, "ceph_queue_con_delayed %p - already BUSY or queued\n",
+ con);
put_connection(con);
}
}
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
-
+
+more:
+ if (test_and_set_bit(BUSY, &con->state) != 0) {
+ dout(10, "con_work %p BUSY already set\n", con);
+ goto out;
+ }
+ dout(10, "con_work %p start, clearing QUEUED\n", con);
+ clear_bit(QUEUED, &con->state);
+
if (test_bit(CLOSED, &con->state) ||
test_bit(STANDBY, &con->state)) {
dout(5, "con_work CLOSED|STANDBY\n");
ceph_fault(con);
done:
+ clear_bit(BUSY, &con->state);
+ if (test_bit(QUEUED, &con->state)) {
+ dout(10, "con_work %p QUEUED reset, looping\n", con);
+ goto more;
+ }
+ dout(10, "con_work %p done\n", con);
+
+out:
put_connection(con);
}