<< ").";
}
+class C_time_wakeup : public EventCallback {
+ AsyncConnectionRef conn;
+
+ public:
+ C_time_wakeup(AsyncConnectionRef c): conn(c) {}
+ void do_request(int fd_or_id) {
+ conn->wakeup_from(fd_or_id);
+ }
+};
+
class C_handle_read : public EventCallback {
AsyncConnectionRef conn;
public:
C_handle_read(AsyncConnectionRef c): conn(c) {}
- void do_request(int fd) {
+ void do_request(int fd_or_id) {
conn->process();
}
};
assert(!outcoming_bl.length());
connect_seq++;
state = STATE_CONNECTING;
- center->create_time_event(0, read_handler);
+ center->dispatch_event_external(read_handler);
return 0;
}
// if send_message always successfully send, it may have no
// opportunity to send seq ack. 10 is a experience value.
if (in_seq > in_seq_acked + 10) {
- center->create_time_event(2, write_handler);
+ center->dispatch_event_external(write_handler);
}
state = STATE_OPEN;
async_msgr->ms_fast_dispatch(message);
lock.Lock();
} else {
- center->create_time_event(1, EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
+ center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
}
break;
// message may in queue between last _try_send and connection ready
// write event may already notify and we need to force scheduler again
if (is_queued())
- center->create_time_event(1, write_handler);
+ center->dispatch_event_external(write_handler);
break;
}
}
// woke up again;
- center->create_time_event(backoff, read_handler);
+ register_time_events.insert(center->create_time_event(
+ backoff, EventCallbackRef(new C_time_wakeup(this))));
}
void AsyncConnection::was_session_reset()
if (sd > 0)
::close(sd);
sd = -1;
+ for (set<uint64_t>::iterator it = register_time_events.begin();
+ it != register_time_events.end(); ++it)
+ center->delete_time_event(*it);
async_msgr->unregister_conn(this);
// Here we need to dispatch "signal" event, because we want to ensure signal
// it after all events called by this "_stop" has be done.
center->dispatch_event_external(signal_handler);
- put();
}
int AsyncConnection::_send(Message *m)
fault();
}
+void AsyncConnection::wakeup_from(uint64_t id)
+{
+ lock.Lock();
+ register_time_events.erase(id);
+ lock.Unlock();
+ process();
+}
+
void AsyncConnection::local_deliver()
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
p->mark_down();
- p->get();
ms_deliver_handle_reset(p.get());
}
accepting_conns.clear();
ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
conns.erase(it);
p->mark_down();
- p->get();
ms_deliver_handle_reset(p.get());
}
lock.Unlock();
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
p->mark_down();
- p->get();
ms_deliver_handle_reset(p.get());
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- connection dne" << dendl;
return id;
}
+// TODO: Ineffective implementation now!
+void EventCenter::delete_time_event(uint64_t id)
+{
+ ldout(cct, 10) << __func__ << " id=" << id << dendl;
+ if (id >= time_event_next_id)
+ return ;
+
+
+ for (map<utime_t, list<TimeEvent> >::iterator it = time_events.begin();
+ it != time_events.end(); ++it) {
+ for (list<TimeEvent>::iterator j = it->second.begin();
+ j != it->second.end(); ++j) {
+ if (j->id == id) {
+ it->second.erase(j);
+ if (it->second.empty())
+ time_events.erase(it);
+ return ;
+ }
+ }
+ }
+}
+
void EventCenter::wakeup()
{
ldout(cct, 1) << __func__ << dendl;