From c3a68d7424c0296c5899d47ef15402d135c92922 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 8 Oct 2008 10:49:12 -0700 Subject: [PATCH] msgr: include priority in msg header, make dispatch a priority queue Generalizes previous hack that put messages from the monitor at the front of the dispatch queue. Monitor now just sets a (non-default) default_send_priority of CEPH_MSG_PRIO_HIGH. That value is used only if the message priority isn't set explicitly by set_priority() before being queued for send. --- src/TODO | 12 +++------ src/cmon.cc | 1 + src/include/ceph_fs.h | 19 ++++++++++---- src/msg/Message.h | 4 +++ src/msg/Messenger.h | 6 ++++- src/msg/SimpleMessenger.cc | 52 ++++++++++++++++++-------------------- src/msg/SimpleMessenger.h | 41 +++++++++++++++++------------- 7 files changed, 76 insertions(+), 59 deletions(-) diff --git a/src/TODO b/src/TODO index cf8ffda6e33a7..86749574e6ce2 100644 --- a/src/TODO +++ b/src/TODO @@ -8,6 +8,10 @@ v0.5 big items - ENOSPC + - space reservation in ObjectStore, redeemed by Transactions? + - reserved as PG goes active; reservation canceled when pg goes inactive + - something similar during recovery + - ? - repair - enforceable quotas? - mds security enforcement @@ -67,14 +71,6 @@ kclient items to review - cap changes are serialized by i_lock, but (thorough) cache invalidation may block.. -kclient items to review -- fill_trace locking -- async trunc -- async writeback -- cache invalidation race, locking problems - - cap changes are serialized by i_lock, but (thorough) cache invalidation may block.. - - vfs issues - real_lookup() race: 1- hash lookup find no dentry diff --git a/src/cmon.cc b/src/cmon.cc index 23ae5a98ed163..0233f9559cc13 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -104,6 +104,7 @@ int main(int argc, const char **argv) // start monitor Messenger *m = rank.register_entity(entity_name_t::MON(whoami)); + m->set_default_send_priority(CEPH_MSG_PRIO_HIGH); Monitor *mon = new Monitor(whoami, &store, m, &monmap); rank.start(); // may daemonize diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 03b4c591e57c6..442d25e1818ba 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -28,13 +28,16 @@ /* - * protocol versions. increment each time one of these changes. + * tcp connection banner. include a protocol version. and adjust + * whenever the wire protocol changes. try to keep this string the + * same length. */ -#define CEPH_BANNER "ceph\n2\n" /* second line is a protocol version. - adjust whenever the wire protocol - changes. */ +#define CEPH_BANNER "ceph 003\n" #define CEPH_BANNER_MAX_LEN 30 +/* + * subprotocol versions. + */ #define CEPH_OSD_PROTOCOL 1 #define CEPH_MDS_PROTOCOL 1 #define CEPH_MON_PROTOCOL 1 @@ -404,7 +407,8 @@ struct ceph_msg_connect { */ struct ceph_msg_header { __le64 seq; /* message seq# for this session */ - __le32 type; /* message type */ + __le16 type; /* message type */ + __le16 priority; /* priority. higher value == higher priority */ __le32 front_len; __le32 data_off; /* sender: include full offset; receiver: mask against ~PAGE_MASK */ __le32 data_len; /* bytes of data payload */ @@ -413,6 +417,11 @@ struct ceph_msg_header { __le32 crc; /* this goes at the end! */ } __attribute__ ((packed)); +#define CEPH_MSG_PRIO_LOW 100 +#define CEPH_MSG_PRIO_DEFAULT 200 +#define CEPH_MSG_PRIO_HIGH 300 +#define CEPH_MSG_PRIO_HIGHEST 400 + struct ceph_msg_footer { __le32 flags; __le32 front_crc; diff --git a/src/msg/Message.h b/src/msg/Message.h index 4717c62165d7f..8fba5c1a7d4b5 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -129,6 +129,7 @@ public: Message() { }; Message(int t) { header.type = t; + header.priority = 0; // undef header.data_off = 0; } virtual ~Message() { } @@ -170,6 +171,9 @@ public: unsigned get_seq() { return header.seq; } void set_seq(unsigned s) { header.seq = s; } + unsigned get_priority() { return header.priority; } + void set_priority(__s16 p) { header.priority = p; } + // source/dest entity_inst_t get_dest_inst() { return entity_inst_t(header.dst); } entity_name_t get_dest() { return entity_name_t(header.dst.name); } diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index e508fcb39913d..e5505fc024aba 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -37,9 +37,10 @@ class Messenger { protected: entity_inst_t _myinst; + int default_send_priority; public: - Messenger(entity_name_t w) : dispatcher(0) { + Messenger(entity_name_t w) : dispatcher(0), default_send_priority(CEPH_MSG_PRIO_DEFAULT) { _myinst.name = w; } virtual ~Messenger() { } @@ -52,6 +53,9 @@ protected: void _set_myname(entity_name_t m) { _myinst.name = m; } void _set_myaddr(entity_addr_t a) { _myinst.addr = a; } virtual void reset_myname(entity_name_t m) = 0; + + void set_default_send_priority(int p) { default_send_priority = p; } + int get_default_send_priority() { return default_send_priority; } // hrmpf. virtual int get_dispatch_queue_len() { return 0; }; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 57a56d372edda..000301e65b82e 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -539,22 +539,16 @@ void Rank::EntityMessenger::dispatch_entry() { lock.Lock(); while (!stop) { - if (!dispatch_queue.empty() || !prio_dispatch_queue.empty()) { + if (!dispatch_queue.empty()) { list ls; - if (!prio_dispatch_queue.empty()) { - ls.swap(prio_dispatch_queue); - pqlen = 0; - } else { - if (0) { - ls.swap(dispatch_queue); - qlen = 0; - } else { - // limit how much low-prio stuff we grab, to avoid starving high-prio messages! - ls.push_back(dispatch_queue.front()); - dispatch_queue.pop_front(); - qlen--; - } - } + + // take highest priority message off the queue + map >::reverse_iterator p = dispatch_queue.rbegin(); + ls.push_back(p->second.front()); + p->second.pop_front(); + if (p->second.empty()) + dispatch_queue.erase(p->first); + qlen--; lock.Unlock(); { @@ -666,6 +660,7 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest) m->set_orig_source_inst(_myinst); m->set_dest_inst(dest); m->calc_data_crc(); + if (!m->get_priority()) m->set_priority(get_default_send_priority()); dout(1) << m->get_source() << " --> " << dest.name << " " << dest.addr @@ -686,6 +681,7 @@ int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest) m->set_source_inst(_myinst); m->set_dest_inst(dest); m->calc_data_crc(); + if (!m->get_priority()) m->set_priority(get_default_send_priority()); dout(1) << m->get_source() << " **> " << dest.name << " " << dest.addr @@ -709,6 +705,7 @@ int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest) m->set_orig_source_inst(_myinst); m->set_dest_inst(dest); m->calc_data_crc(); + if (!m->get_priority()) m->set_priority(get_default_send_priority()); dout(1) << "lazy " << m->get_source() << " --> " << dest.name << " " << dest.addr @@ -963,9 +960,12 @@ int Rank::Pipe::accept() out_seq = existing->out_seq; if (!existing->sent.empty()) { out_seq = existing->sent.front()->get_seq()-1; - q.splice(q.begin(), existing->sent); + q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), existing->sent); } - q.splice(q.end(), existing->q); + for (map >::iterator p = existing->q.begin(); + p != existing->q.end(); + p++) + q[p->first].splice(q[p->first].end(), p->second); existing->lock.Unlock(); @@ -1335,11 +1335,7 @@ void Rank::Pipe::was_session_reset() if (rank.local[i] && rank.local[i]->get_dispatcher()) rank.local[i]->queue_remote_reset(peer_addr, last_dest_name); - // renumber outgoing seqs out_seq = 0; - for (list::iterator p = q.begin(); p != q.end(); p++) - (*p)->set_seq(++out_seq); - in_seq = 0; connect_seq = 0; } @@ -1347,10 +1343,11 @@ void Rank::Pipe::was_session_reset() void Rank::Pipe::report_failures() { // report failures - q.splice(q.begin(), sent); - while (!q.empty()) { - Message *m = q.front(); - q.pop_front(); + q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent); + while (1) { + Message *m = _get_next_outgoing(); + if (!m) + break; if (policy.drop_msg_callback) { unsigned srcrank = m->get_source_inst().addr.erank; @@ -1636,9 +1633,8 @@ void Rank::Pipe::writer() } // grab outgoing message - if (!q.empty()) { - Message *m = q.front(); - q.pop_front(); + Message *m = _get_next_outgoing(); + if (m) { m->set_seq(++out_seq); lock.Unlock(); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 44c3863d7b4a5..8d23745177eac 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -119,7 +119,7 @@ private: bool reader_running; bool writer_running; - list q; + map > q; // priority queue list sent; Cond cond; @@ -201,10 +201,23 @@ private: lock.Unlock(); } void _send(Message *m) { - q.push_back(m); + q[m->get_priority()].push_back(m); last_dest_name = m->get_dest(); cond.Signal(); } + Message *_get_next_outgoing() { + Message *m = 0; + while (!m && !q.empty()) { + map >::reverse_iterator p = q.rbegin(); + if (!p->second.empty()) { + m = p->second.front(); + p->second.pop_front(); + } + if (p->second.empty()) + q.erase(p->first); + } + return m; + } void force_close() { if (sd >= 0) ::close(sd); @@ -216,10 +229,9 @@ private: class EntityMessenger : public Messenger { Mutex lock; Cond cond; - list dispatch_queue; - list prio_dispatch_queue; + map > dispatch_queue; bool stop; - int qlen, pqlen; + int qlen; int my_rank; public: bool need_addr; @@ -244,13 +256,8 @@ private: m->set_recv_stamp(g_clock.now()); lock.Lock(); - if (m->get_source().is_mon()) { - prio_dispatch_queue.push_back(m); - pqlen++; - } else { - qlen++; - dispatch_queue.push_back(m); - } + qlen++; + dispatch_queue[m->get_priority()].push_back(m); cond.Signal(); lock.Unlock(); } @@ -263,21 +270,21 @@ private: void queue_remote_reset(entity_addr_t a, entity_name_t n) { lock.Lock(); remote_reset_q.push_back(pair(a,n)); - dispatch_queue.push_back((Message*)BAD_REMOTE_RESET); + dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET); cond.Signal(); lock.Unlock(); } void queue_reset(entity_addr_t a, entity_name_t n) { lock.Lock(); reset_q.push_back(pair(a,n)); - dispatch_queue.push_back((Message*)BAD_RESET); + dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET); cond.Signal(); lock.Unlock(); } void queue_failure(Message *m, entity_inst_t i) { lock.Lock(); failed_q.push_back(pair(m,i)); - dispatch_queue.push_back((Message*)BAD_FAILED); + dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED); cond.Signal(); lock.Unlock(); } @@ -286,7 +293,7 @@ private: EntityMessenger(entity_name_t name, int r) : Messenger(name), stop(false), - qlen(0), pqlen(0), + qlen(0), my_rank(r), need_addr(false), dispatch_thread(this) { } @@ -303,7 +310,7 @@ private: dispatch_thread.join(); } - int get_dispatch_queue_len() { return qlen + pqlen; } + int get_dispatch_queue_len() { return qlen; } void reset_myname(entity_name_t m); -- 2.39.5