// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <sys/types.h>
+#include <sys/socket.h>
#include <unistd.h>
+
#include "include/Context.h"
#include "common/errno.h"
#include "AsyncMessenger.h"
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
- state(STATE_NONE), state_after_send(0), sd(-1), conn_id(m->dispatch_queue.get_id()),
- in_q(&(m->dispatch_queue)), lock("AsyncConnection::lock"), backoff(0),
+ state(STATE_NONE), state_after_send(0), sd(-1),
+ lock("AsyncConnection::lock"), backoff(0),
got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { }
AsyncConnection::~AsyncConnection()
<< policy.throttler_bytes->get_max() << dendl;
// FIXME: when to try it again?
if (policy.throttler_bytes->get_or_fail(message_size))
- state = STATE_OPEN_MESSAGE_THROTTLE_DISPATCH;
+ state = STATE_OPEN_MESSAGE_READ_FRONT;
}
}
break;
}
- case STATE_OPEN_MESSAGE_THROTTLE_DISPATCH:
- {
- uint64_t message_size = current_header.front_len + current_header.middle_len + current_header.data_len;
- if (message_size) {
- // throttle total bytes waiting for dispatch. do this _after_ the
- // policy throttle, as this one does not deadlock (unless dispatch
- // blocks indefinitely, which it shouldn't). in contrast, the
- // policy throttle carries for the lifetime of the message.
- ldout(async_msgr->cct,10) << __func__ << " wants " << message_size << " from dispatch throttler "
- << async_msgr->dispatch_throttler.get_current() << "/"
- << async_msgr->dispatch_throttler.get_max() << dendl;
- if (async_msgr->dispatch_throttler.get_or_fail(message_size)) {
- state = STATE_OPEN_MESSAGE_READ_FRONT;
- throttle_stamp = ceph_clock_now(async_msgr->cct);
- }
- }
-
- break;
- }
case STATE_OPEN_MESSAGE_READ_FRONT:
{
// read front
//
ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
- if (auth_handler == NULL) {
+ if (auth_handler) {
ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl;
} else {
if (auth_handler->check_message_signature(message)) {
ldout(async_msgr->cct,0) << __func__ << " got old message "
<< message->get_seq() << " <= " << in_seq << " " << message << " " << *message
<< ", discarding" << dendl;
- async_msgr->dispatch_throttle_release(message->get_dispatch_throttle_size());
message->put();
if (has_feature(CEPH_FEATURE_RECONNECT_SEQ) && async_msgr->cct->_conf->ms_die_on_old_message)
assert(0 == "old msgs despite reconnect_seq feature");
in_seq = message->get_seq();
ldout(async_msgr->cct, 10) << __func__ << " got message " << message->get_seq()
<< " " << message << " " << *message << dendl;
- in_q->fast_preprocess(message);
- if (in_q->can_fast_dispatch(message)) {
- in_q->fast_dispatch(message);
+ async_msgr->ms_fast_preprocess(message);
+ if (async_msgr->ms_can_fast_dispatch(message)) {
+ async_msgr->ms_fast_dispatch(message);
} else {
- in_q->enqueue(message, message->get_priority(), conn_id);
+ async_msgr->ms_deliver_dispatch(message);
}
state = STATE_OPEN;
<< policy.throttler_bytes->get_max() << dendl;
policy.throttler_bytes->put(message_size);
}
-
- if (state > STATE_OPEN_MESSAGE_THROTTLE_DISPATCH &&
- state <= STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH) {
- async_msgr->dispatch_throttle_release(message_size);
- }
}
fault();
state = STATE_FAULT;
}
get();
- async_msgr->dispatch_queue.queue_connect(this);
+ async_msgr->ms_deliver_handle_connect(this);
get();
async_msgr->ms_deliver_handle_fast_connect(this);
if (existing->policy.lossy) {
// disconnect from the Connection
existing->get();
- async_msgr->dispatch_queue.queue_reset(existing);
+ async_msgr->ms_deliver_handle_reset(existing);
} else {
// queue a reset on the new connection, which we're dumping for the old
get();
- async_msgr->dispatch_queue.queue_reset(this);
+ async_msgr->ms_deliver_handle_reset(this);
// reset the in_seq if this is a hard reset from peer,
// otherwise we respect our original connection's value
// notify
get();
- async_msgr->dispatch_queue.queue_accept(this);
+ async_msgr->ms_deliver_handle_accept(this);
get();
async_msgr->ms_deliver_handle_fast_accept(this);
if (policy.lossy && state != STATE_CONNECTING) {
ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
- in_q->discard_queue(conn_id);
_stop();
return ;
}
void AsyncConnection::was_session_reset()
{
ldout(async_msgr->cct,10) << __func__ << "was_session_reset" << dendl;
- in_q->discard_queue(conn_id);
discard_out_queue();
outcoming_bl.clear();
get();
- async_msgr->dispatch_queue.queue_remote_reset(this);
+ async_msgr->ms_deliver_handle_remote_reset(this);
if (randomize_out_seq()) {
lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
get();
- async_msgr->dispatch_queue.queue_reset(this);
+ async_msgr->ms_deliver_handle_reset(this);
shutdown_socket();
discard_out_queue();
outcoming_bl.clear();
STATE_OPEN_MESSAGE_HEADER,
STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
STATE_OPEN_MESSAGE_THROTTLE_BYTES,
- STATE_OPEN_MESSAGE_THROTTLE_DISPATCH,
STATE_OPEN_MESSAGE_READ_FRONT,
STATE_OPEN_MESSAGE_READ_MIDDLE,
STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
int state_after_send;
int sd;
int port;
- uint64_t conn_id;
Messenger::Policy policy;
map<int, list<Message*> > out_q; // priority queue for outbound msgs
- DispatchQueue *in_q;
list<Message*> sent;
Mutex lock;
utime_t backoff; // backoff time
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
processor(this, _nonce, ¢er),
- nonce(_nonce),
lock("AsyncMessenger::lock"),
- center(cct), did_bind(false),
+ nonce(_nonce), did_bind(false),
global_seq(0),
cluster_protocol(0),
local_connection(new AsyncConnection(cct, this)),
- dispatch_throttler(cct, string("msgr_dispatch_throttler-") + mname,
- cct->_conf->ms_dispatch_throttle_bytes),
- dispatch_queue(cct, this)
+ center(cct)
{
ceph_spin_init(&global_seq_lock);
_init_local_connection();
void AsyncMessenger::ready()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
- dispatch_queue.start();
lock.Lock();
if (did_bind)
{
ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
mark_down_all();
- dispatch_queue.shutdown();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
}
lock.Unlock();
- if(dispatch_queue.is_started()) {
- ldout(cct,10) << __func__ << ": waiting for dispatch queue" << dendl;
- dispatch_queue.wait();
- ldout(cct,10) << __func__ << ": dispatch queue is stopped" << dendl;
- }
-
// done! clean up.
if (did_bind) {
ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
if (my_inst.addr == dest_addr) {
// local
ldout(cct, 20) << __func__ << " " << *m << " local" << dendl;
- dispatch_queue.local_delivery(m, m->get_priority());
+ m->set_connection(local_connection.get());
+ m->set_recv_stamp(ceph_clock_now(cct));
+ ms_fast_preprocess(m);
+ if (ms_can_fast_dispatch(m)) {
+ ms_fast_dispatch(m);
+ } else {
+ if (m->get_priority() >= CEPH_MSG_PRIO_LOW) {
+ ms_fast_dispatch(m);
+ } else {
+ ms_deliver_dispatch(m);
+ }
+ }
+
return;
}
ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
p->mark_down();
p->get();
- dispatch_queue.queue_reset(p);
+ ms_deliver_handle_reset(p);
}
accepting_conns.clear();
conns.erase(it);
p->mark_down();
p->get();
- dispatch_queue.queue_reset(p);
+ ms_deliver_handle_reset(p);
}
lock.Unlock();
}
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
_stop_conn(p);
p->get();
- dispatch_queue.queue_reset(p);
+ ms_deliver_handle_reset(p);
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl;
}
_init_local_connection();
lock.Unlock();
}
-
-void AsyncMessenger::dispatch_throttle_release(uint64_t msize) {
- if (msize) {
- ldout(cct,10) << __func__ << " " << msize << " to dispatch throttler "
- << dispatch_throttler.get_current() << "/"
- << dispatch_throttler.get_max() << dendl;
- dispatch_throttler.put(msize);
- }
-}
void set_addr_unknowns(entity_addr_t& addr);
int get_dispatch_queue_len() {
- return dispatch_queue.get_queue_len();
+ return 0;
}
double get_dispatch_queue_max_age(utime_t now) {
- return dispatch_queue.get_max_age(now);
+ return 0;
}
/** @} Accessors */
return p->second;
}
- void *_stop_conn(AsyncConnection *c) {
+ void _stop_conn(AsyncConnection *c) {
assert(lock.is_locked());
if (c) {
c->mark_down();
/// con used for sending messages to ourselves
ConnectionRef local_connection;
- /// Throttle preventing us from building up a big backlog waiting for dispatch
- Throttle dispatch_throttler;
-
EventCenter center;
- DispatchQueue dispatch_queue;
/**
* @defgroup AsyncMessenger internals
_init_local_connection();
}
- /**
- * Release memory accounting back to the dispatch throttler.
- *
- * @param msize The amount of memory to release.
- */
- void dispatch_throttle_release(uint64_t msize);
-
/**
* @} // AsyncMessenger Internals
*/
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "common/errno.h"
+#include "Event.h"
#ifdef HAVE_EPOLL
-#include "EventEpoll.h""
+#include "EventEpoll.h"
#else
#ifdef HAVE_KQUEUE
#include "EventKqueue.h"
driver = new KqueueDriver(cct);
#else
driver = new SelectDriver(cct);
+#endif
#endif
if (!driver) {
return 0;
}
-EventCenter::~EventCenter();
+EventCenter::~EventCenter()
{
if (driver)
delete driver;
int EventCenter::create_event(int fd, int mask, EventCallback *ctxt)
{
Mutex::Locker l(lock);
- if (events.size() > center->nevent) {
+ if (events.size() > nevent) {
lderr(cct) << __func__ << " event count is exceed." << dendl;
return -ERANGE;
}
- int r = driver->add_event(fd, mask);
+ EventCenter::Event *event = _get_event(fd);
+
+ int r = driver->add_event(fd, event ? event->mask: EVENT_NONE, mask);
if (r < 0)
return r;
- if (events.find(fd) == events.end()) {
+ if (!event) {
events[fd] = EventCenter::Event();
+ event = &events[fd];
}
- EventCenter::Event *event = &events[fd];
-
event->mask |= mask;
- if (mask & EVENT_READABLE)
+ if (mask & EVENT_READABLE) {
+ if (event->read_cb)
+ delete event->read_cb;
event->read_cb = ctxt;
- if (mask & EVENT_WRITABLE)
+ }
+ if (mask & EVENT_WRITABLE) {
+ if (event->write_cb)
+ delete event->write_cb;
event->write_cb = ctxt;
+ }
return 0;
}
-void delete_event(int fd, int mask)
+void EventCenter::delete_event(int fd, int mask)
{
Mutex::Locker l(lock);
- if (event->mask == EVENT_NONE)
- return;
- driver->del_event(fd, mask);
- struct event *event = &events[fd];
+ EventCenter::Event *event = _get_event(fd);
+ driver->del_event(fd, event ? event->mask: EVENT_NONE, mask);
+ if (!event) {
+ events[fd] = EventCenter::Event();
+ event = &events[fd];
+ }
- if (mask & EVENT_READABLE)
+ if (event->read_cb)
delete event->read_cb;
- if (mask & EVENT_WRITABLE)
+ if (event->write_cb)
delete event->write_cb;
event->mask = event->mask & (~mask);
events.erase(fd);
}
-int process_events(int timeout_millionseconds)
+int EventCenter::process_events(int timeout_millionseconds)
{
struct timeval tv;
- int j, processed, numevents, mask, fd, rfired;
+ int j, processed, numevents;
if (timeout_millionseconds > 0) {
tv.tv_sec = timeout_millionseconds / 1000;
vector<FiredEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
for (j = 0; j < numevents; j++)
- event_wq.queue(fired_events[i]);
+ event_wq.queue(fired_events[j]);
return numevents;
}
#endif
// We use epoll, kqueue, evport, select in descending order by performance.
-#ifdef __linux__
+#if defined(__linux__)
#define HAVE_EPOLL 1
#endif
public:
virtual ~EventDriver() {} // we want a virtual destructor!!!
virtual int init(int nevent) = 0;
- virtual int add_event(int fd, int mask) = 0;
- virtual void delete_event(int fd, int del_mask) = 0;
- virtual int event_wait(FiredEvent &fired_events, struct timeval *tp) = 0;
+ virtual int add_event(int fd, int cur_mask, int mask) = 0;
+ virtual void del_event(int fd, int cur_mask, int del_mask) = 0;
+ virtual int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp) = 0;
};
class EventCallback {
map<int, Event> events;
EventDriver *driver;
CephContext *cct;
- int nevent;
+ uint64_t nevent;
ThreadPool event_tp;
- Event *get_event(int fd) {
- Mutex::Locker l(lock);
+ Event *_get_event(int fd) {
map<int, Event>::iterator it = events.find(fd);
if (it != events.end()) {
return &it->second;
}
-
return NULL;
}
struct EventWQ : public ThreadPool::WorkQueueVal<FiredEvent> {
~EventCenter();
int init(int nevent);
int create_event(int fd, int mask, EventCallback *ctxt);
- int delete_event(int fd, int mask);
+ void delete_event(int fd, int mask);
int process_events(int timeout_milliseconds);
+ Event *get_event(int fd) {
+ Mutex::Locker l(lock);
+ return _get_event(fd);
+ }
};
#endif
+#include "common/errno.h"
#include "EventEpoll.h"
#define dout_subsys ceph_subsys_ms
int EpollDriver::init(int nevent)
{
- events = malloc(sizeof(struct epoll_event)*nevent);
+ events = (struct epoll_event*)malloc(sizeof(struct epoll_event)*nevent);
if (!events) {
lderr(cct) << __func__ << " unable to malloc memory: "
- << cpp_strerror(errno) << dendl;
- return -error;
+ << cpp_strerror(errno) << dendl;
+ return -errno;
}
memset(events, 0, sizeof(struct epoll_event)*nevent);
if (epfd == -1) {
lderr(cct) << __func__ << " unable to do epoll_create: "
<< cpp_strerror(errno) << dendl;
- return -error;
+ return -errno;
}
return 0;
}
-int EpollDriver::add_event(int fd, int mask)
+int EpollDriver::add_event(int fd, int cur_mask, int add_mask)
{
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
map<int, int>::iterator it = fds.find(fd);
if (it == fds.end()) {
op = EPOLL_CTL_ADD;
- if (deleted_fds.length()) {
- pos = deleted.fds.front();
- deleted.fds.pop_front();
+ if (deleted_fds.size()) {
+ pos = deleted_fds.front();
+ deleted_fds.pop_front();
} else {
fds[fd] = pos = next_pos;
next_pos++;
}
} else {
- pos = it->second;
- op = events[pos].mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+ op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD;
}
ee.events = 0;
- mask |= events[pos].mask; /* Merge old events */
- if (mask & EVENT_READABLE)
+ add_mask |= cur_mask; /* Merge old events */
+ if (add_mask & EVENT_READABLE)
ee.events |= EPOLLIN;
- if (mask & EVENT_WRITABLE)
+ if (add_mask & EVENT_WRITABLE)
ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(epfd, op, fd, &ee) == -1) {
lderr(cct) << __func__ << " unable to add event: "
<< cpp_strerror(errno) << dendl;
- return -error;
+ return -errno;
}
return 0;
}
struct epoll_event ee;
map<int, int>::iterator it = fds.find(fd);
if (it == fds.end())
- return 0;
+ return ;
int mask = cur_mask & (~delmask);
}
}
-int EpollDriver::event_wait(FiredEvent &fired_events, struct timeval *tvp)
+int EpollDriver::event_wait(vector<FiredEvent> &fired_events, struct timeval *tvp)
{
int retval, numevents = 0;
}
int init(int nevent);
- int add_event(int fd, int mask);
+ int add_event(int fd, int cur_mask, int add_mask);
void del_event(int fd, int cur_mask, int del_mask);
- int event_wait(FiredEvent &fired_events, struct timeval *tp);
+ int event_wait(vector<FiredEvent> &fired_events, struct timeval *tp);
};
#endif
};
CephContext *cct;
- Messenger *msgr;
+ SimpleMessenger *msgr;
Mutex lock;
Cond cond;
void shutdown();
bool is_started() {return dispatch_thread.is_started();}
- DispatchQueue(CephContext *cct, Messenger *msgr)
+ DispatchQueue(CephContext *cct, SimpleMessenger *msgr)
: cct(cct), msgr(msgr),
lock("SimpleMessenger::DispatchQeueu::lock"),
mqueue(cct->_conf->ms_pq_max_tokens_per_priority,