}
};
+class C_handle_stop : public EventCallback {
+ AsyncConnectionRef conn;
+
+ public:
+ C_handle_stop(AsyncConnection *c): conn(c) {}
+ void do_request(int id) {
+ conn->stop();
+ }
+};
+
+
static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
// create a buffer to read into that matches the data alignment
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
state(STATE_NONE), state_after_send(0), sd(-1),
- lock("AsyncConnection::lock"), open_write(false),
+ lock("AsyncConnection::lock"), open_write(false), keepalive(false),
got_bad_auth(false), authorizer(NULL),
state_buffer(4096), state_offset(0), net(cct), center(c)
{
state = STATE_CONNECTING;
// rescheduler connection in order to avoid lock dep
- center->create_time_event(0, read_handler);
+ // may called by external thread(send_message)
+ center->dispatch_event_external(read_handler);
}
void AsyncConnection::accept(int incoming)
if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) {
_connect();
} else if (sd > 0 && !open_write) {
- center->create_file_event(sd, EVENT_WRITABLE, write_handler);
- open_write = true;
+ center->dispatch_event_external(write_handler);
}
return 0;
}
connect_seq = 0;
}
+void AsyncConnection::mark_down()
+{
+ Mutex::Locker l(lock);
+ center->dispatch_event_external(EventCallbackRef(new C_handle_stop(this)));
+}
+
// Who call "_stop():
// 1. receive STATE_OPEN_TAG_CLOSE
// 2. fault when policy.lossy
}
}
+void AsyncConnection::send_keepalive()
+{
+ Mutex::Locker l(lock);
+ keepalive = true;
+ center->dispatch_event_external(write_handler);
+}
+
void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp)
{
assert(lock.is_locked());
bufferlist bl;
int r;
if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+ if (keepalive) {
+ _send_keepalive_or_ack();
+ keepalive = false;
+ }
+
if (in_seq > in_seq_acked) {
ceph_le64 s;
s = in_seq;
}
return m;
}
-
public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
~AsyncConnection();
void accept(int sd);
int send_message(Message *m);
- void send_keepalive() {
- Mutex::Locker l(lock);
- if (state == STATE_OPEN)
- _send_keepalive_or_ack();
- }
- void mark_down() {
- Mutex::Locker l(lock);
- _stop();
- }
+ void send_keepalive();
+ void mark_down();
void mark_disposable() {
Mutex::Locker l(lock);
policy.lossy = true;
}
- void handle_write();
- void process();
-
private:
enum {
STATE_NONE,
EventCallbackRef write_handler;
EventCallbackRef reset_handler;
EventCallbackRef remote_reset_handler;
+ bool keepalive;
// Tis section are temp variables used by state transition
NetHandler net;
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;
+
+ public:
+ // used by eventcallback
+ void handle_write();
+ void process();
+ void stop() {
+ Mutex::Locker l(lock);
+ _stop();
+ }
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
}
static ostream& _prefix(std::ostream *_dout, Processor *p) {
- return *_dout << "-- Processor";
+ return *_dout << " Processor -- ";
}
static ostream& _prefix(std::ostream *_dout, Worker *w) {
ldout(msgr->cct, 1) << __func__ << " start" << dendl;
// start thread
- create();
+ if (listen_sd > 0)
+ create();
return 0;
}
// create connection
Worker *w = workers[conn_id % workers.size()];
AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+ conn->connect(addr, type);
w->center.dispatch_event_external(EventCallbackRef(new C_handle_connect(conn, addr, type)));
assert(!conns.count(addr));
conns[addr] = conn;