From c4d21606351e72de4774643a5733d6471e6ee18e Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 13 Oct 2008 11:56:31 -0700 Subject: [PATCH] msgr: track lossy mode independently for self, peer The policy will be asymmetrical for the OSDs, so we need to track it independently. The various assertions aren't all worked out yet. Notably, connection races aren't quite right. But the basic bits are there. --- src/TODO | 3 +++ src/cfuse.cc | 7 +++-- src/cmds.cc | 8 +++--- src/cmon.cc | 11 ++++---- src/cosd.cc | 11 ++++++-- src/csyn.cc | 7 +++-- src/include/ceph_fs.h | 11 +++++++- src/kernel/messenger.c | 55 ++++++++++++++++++++++++++++++++------ src/kernel/messenger.h | 45 ++++++++++++++++++------------- src/msg/SimpleMessenger.cc | 37 ++++++++++++++++++++----- src/msg/SimpleMessenger.h | 25 +++++++++++++---- 11 files changed, 161 insertions(+), 59 deletions(-) diff --git a/src/TODO b/src/TODO index 99d53edd7bb26..4b08d832f9acb 100644 --- a/src/TODO +++ b/src/TODO @@ -1,3 +1,6 @@ +- objecter retry +- kclient retry + v0.5 - debug restart, cosd reformat, etc. - finish btrfs ioctl interface diff --git a/src/cfuse.cc b/src/cfuse.cc index e701feb96290e..d21fb1db5da6b 100644 --- a/src/cfuse.cc +++ b/src/cfuse.cc @@ -71,10 +71,9 @@ int main(int argc, const char **argv, const char *envp[]) { cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl; rank.start(); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever()); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever()); + rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); // start client Client *client = new Client(rank.register_entity(entity_name_t::CLIENT()), &monmap); diff --git a/src/cmds.cc b/src/cmds.cc index 676d282d9d1d6..e60daabf5848b 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -68,10 +68,10 @@ int main(int argc, const char **argv) cout << "starting mds? at " << rank.get_rank_addr() << std::endl; rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever()); // mds does its own timeout/markdown - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever()); + rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0)); + rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless()); // mds does its own timeout/markdown // start mds Messenger *m = rank.register_entity(entity_name_t::MDS(whoami)); diff --git a/src/cmon.cc b/src/cmon.cc index 0233f9559cc13..8afc77de04bff 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -109,11 +109,12 @@ int main(int argc, const char **argv) rank.start(); // may daemonize - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fail_after(g_conf.mon_lease_timeout * 2)); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::fast_fail()); + rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(g_conf.mon_lease_timeout * 2)); + + rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail()); mon->init(); diff --git a/src/cosd.cc b/src/cosd.cc index 3f22b45b1a5b0..1736dfc75bafa 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -119,8 +119,15 @@ int main(int argc, const char **argv) rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever()); + rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + + // make a _reasonable_ effort to send acks/replies to requests, but + // don't get carried away, as the sender may go away and we won't + // ever hear about it. + // FIXME: not until objecter/osd_client have a retry of some sort... + //rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fail_after(10.0)); + //rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fail_after(10.0)); // start osd Messenger *m = rank.register_entity(entity_name_t::OSD(whoami)); diff --git a/src/csyn.cc b/src/csyn.cc index 12cbbd7b4ace8..7d8ba9c360749 100644 --- a/src/csyn.cc +++ b/src/csyn.cc @@ -59,10 +59,9 @@ int main(int argc, const char **argv, char *envp[]) cout << "starting csyn at " << rank.get_rank_addr() << std::endl; rank.start(); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever()); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever()); + rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); list clients; list synclients; diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 442d25e1818ba..4e7db81773647 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -399,7 +399,10 @@ struct ceph_entity_inst { struct ceph_msg_connect { __le32 global_seq; __le32 connect_seq; -}; + __u8 flags; +} __attribute__ ((packed)); + +#define CEPH_MSG_CONNECT_LOSSYTX 1 /* msg i send may be safely dropped */ /* @@ -469,6 +472,12 @@ struct ceph_msg_footer { #define CEPH_MSG_OSD_OPREPLY 43 +struct ceph_ping { + __le64 seq; + struct ceph_timespec stamp; +}; + + /* for statfs_reply. units are KB, objects. */ struct ceph_statfs { __le64 f_total; diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 74dc92f161a3a..5021c26a60065 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -581,8 +581,8 @@ static void ceph_fault(struct ceph_connection *con) dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n", con, con->state, IPQUADPORT(con->peer_addr.ipaddr)); - if (test_bit(LOSSY, &con->state)) { - dout(30, "fault on LOSSY channel\n"); + if (test_bit(LOSSYTX, &con->state)) { + dout(30, "fault on LOSSYTX channel\n"); remove_connection(con->msgr, con); return; } @@ -860,6 +860,9 @@ static void prepare_write_connect(struct ceph_messenger *msgr, { con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(con->global_seq); + con->out_connect.flags = 0; + if (test_bit(LOSSYTX, &con->state)) + con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX; con->out_kvec[0].iov_base = CEPH_BANNER; con->out_kvec[0].iov_len = strlen(CEPH_BANNER); @@ -918,6 +921,23 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag) set_bit(WRITE_PENDING, &con->state); } +static void prepare_write_accept_ready(struct ceph_connection *con) +{ + con->out_connect.flags = 0; + if (test_bit(LOSSYTX, &con->state)) + con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX; + + con->out_kvec[0].iov_base = &tag_ready; + con->out_kvec[0].iov_len = 1; + con->out_kvec[1].iov_base = &con->out_connect.flags; + con->out_kvec[1].iov_len = 1; + con->out_kvec_left = 2; + con->out_kvec_bytes = 2; + con->out_kvec_cur = con->out_kvec; + con->out_more = 0; + set_bit(WRITE_PENDING, &con->state); +} + static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag, u32 *pseq) { @@ -1309,6 +1329,17 @@ static int read_connect_partial(struct ceph_connection *con) con->in_base_pos += ret; } + if (con->in_tag == CEPH_MSGR_TAG_READY) { + to++; + if (con->in_base_pos < to) { + ret = ceph_tcp_recvmsg(con->sock, + (char *)&con->in_flags, 1); + if (ret <= 0) + goto out; + con->in_base_pos += ret; + } + } + if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) { /* peer's connect_seq */ to += sizeof(con->in_connect.connect_seq); @@ -1437,6 +1468,7 @@ static int process_connect(struct ceph_connection *con) case CEPH_MSGR_TAG_READY: dout(10, "process_connect got READY, now open\n"); clear_bit(CONNECTING, &con->state); + con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX; con->delay = 0; /* reset backoffmemory */ break; default: @@ -1523,7 +1555,7 @@ static void __replace_connection(struct ceph_messenger *msgr, put_connection(old); /* dec reference count */ clear_bit(ACCEPTING, &new->state); - prepare_write_accept_reply(new, &tag_ready); + prepare_write_accept_ready(new); } /* @@ -1539,6 +1571,9 @@ static int process_accept(struct ceph_connection *con) if (verify_hello(con) < 0) return -1; + /* note flags */ + con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX; + /* connect */ /* do we have an existing connection for this peer? */ if (radix_tree_preload(GFP_NOFS) < 0) { @@ -1557,8 +1592,8 @@ static int process_accept(struct ceph_connection *con) prepare_write_accept_retry(con, &tag_retry_global, &con->out_connect.global_seq); - } else if (test_bit(LOSSY, &existing->state)) { - dout(20, "process_accept replacing existing LOSSY %p\n", + } else if (test_bit(LOSSYTX, &existing->state)) { + dout(20, "process_accept replacing existing LOSSYTX %p\n", existing); reset_connection(existing); __replace_connection(msgr, existing, con); @@ -1615,7 +1650,7 @@ static int process_accept(struct ceph_connection *con) __register_connection(msgr, con); con->global_seq = peer_gseq; con->connect_seq = peer_cseq + 1; - prepare_write_accept_reply(con, &tag_ready); + prepare_write_accept_ready(con); } spin_unlock(&msgr->con_lock); radix_tree_preload_end(); @@ -1992,6 +2027,10 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, if (IS_ERR(newcon)) return PTR_ERR(con); + newcon->out_connect.flags = 0; + if (!timeout) + newcon->out_connect.flags |= CEPH_MSG_CONNECT_LOSSYTX; + ret = radix_tree_preload(GFP_NOFS); if (ret < 0) { derr(10, "ENOMEM in ceph_msg_send\n"); @@ -2024,8 +2063,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, } if (!timeout) { - dout(10, "ceph_msg_send setting LOSSY\n"); - set_bit(LOSSY, &con->state); + dout(10, "ceph_msg_send setting LOSSYTX\n"); + set_bit(LOSSYTX, &con->state); } /* queue */ diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 316aff2887494..2b6586a1fac42 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -77,18 +77,19 @@ struct ceph_msg_pos { #define MAX_DELAY_INTERVAL (5 * 60 * HZ) /* ceph_connection state bit flags */ -#define LOSSY 0 /* close channel on errors */ -#define CONNECTING 1 -#define ACCEPTING 2 -#define WRITE_PENDING 3 /* we have data to send */ -#define QUEUED 4 /* there is work to be done */ -#define BUSY 5 /* work is being done */ -#define BACKOFF 6 /* backing off; will retry */ -#define STANDBY 7 /* standby, when socket state close, no messages */ -#define WAIT 8 /* wait for peer to connect */ -#define CLOSED 9 /* we've closed the connection */ -#define SOCK_CLOSED 10 /* socket state changed to closed */ -#define REGISTERED 11 +#define LOSSYTX 0 /* close channel on errors */ +#define LOSSYRX 1 /* close channel on errors */ +#define CONNECTING 2 +#define ACCEPTING 3 +#define WRITE_PENDING 4 /* we have data to send */ +#define QUEUED 5 /* there is work to be done */ +#define BUSY 6 /* work is being done */ +#define BACKOFF 7 /* backing off; will retry */ +#define STANDBY 8 /* standby, when socket state close, no messages */ +#define WAIT 9 /* wait for peer to connect */ +#define CLOSED 10 /* we've closed the connection */ +#define SOCK_CLOSED 11 /* socket state changed to closed */ +#define REGISTERED 12 struct ceph_connection { @@ -105,28 +106,34 @@ struct ceph_connection { struct ceph_entity_addr peer_addr; /* peer address */ struct ceph_entity_name peer_name; /* peer name */ __u32 connect_seq, global_seq; - char in_banner[CEPH_BANNER_MAX_LEN]; - struct ceph_msg_connect out_connect, in_connect; - struct ceph_entity_addr actual_peer_addr; - __u32 out_seq; /* last message queued for send */ - __u32 in_seq, in_seq_acked; /* last message received, acked */ + bool lossy_rx; /* true if sender is lossy */ /* out queue */ spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */ struct list_head out_queue; struct list_head out_sent; /* sending/sent but unacked */ + __u32 out_seq; /* last message queued for send */ + __u32 in_seq, in_seq_acked; /* last message received, acked */ + + /* negotiation temps */ + char in_banner[CEPH_BANNER_MAX_LEN]; + struct ceph_msg_connect out_connect, in_connect; + struct ceph_entity_addr actual_peer_addr; + + /* out */ + struct ceph_msg *out_msg; + struct ceph_msg_pos out_msg_pos; __le32 out32; struct kvec out_kvec[6], *out_kvec_cur; int out_kvec_left; /* kvec's left */ int out_kvec_bytes; /* bytes left */ int out_more; /* there is more data after this kvec */ - struct ceph_msg *out_msg; - struct ceph_msg_pos out_msg_pos; /* partially read message contents */ char in_tag; + u8 in_flags; int in_base_pos; /* for ack seq, or msg headers, or handshake */ __u32 in_partial_ack; struct ceph_msg *in_msg; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 000301e65b82e..b244e3471d631 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -448,6 +448,12 @@ void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) pipe = 0; } else { dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; + + // if this pipe was created by an incoming connection, but we haven't received + // a message yet, then it won't have the policy set. + if (pipe->get_out_seq() == 0) + pipe->policy = policy_map[m->get_dest().type()]; + pipe->_send(m); pipe->lock.Unlock(); } @@ -839,6 +845,9 @@ int Rank::Pipe::accept() rank.lock.Lock(); + // note peer's flags + lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSYTX; + // existing? if (rank.rank_pipe.count(peer_addr)) { existing = rank.rank_pipe[peer_addr]; @@ -859,7 +868,7 @@ int Rank::Pipe::accept() continue; } - if (existing->policy.is_lossy()) { + if (existing->policy.lossy_tx) { dout(-10) << "accept replacing existing (lossy) channel" << dendl; existing->was_session_reset(); goto replace; @@ -978,11 +987,16 @@ int Rank::Pipe::accept() peer_global_seq = connect.global_seq; dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; - // send READY + // send READY + flags { char tag = CEPH_MSGR_TAG_READY; if (tcp_write(sd, &tag, 1) < 0) goto fail; + __u8 flags = 0; + if (policy.lossy_tx) + flags |= CEPH_MSG_CONNECT_LOSSYTX; + if (tcp_write(sd, (const char *)&flags, 1) < 0) + goto fail; } if (state != STATE_CLOSED) { @@ -1198,12 +1212,21 @@ int Rank::Pipe::connect() goto stop_locked; } + // read flags + __u8 flags; + if (tcp_read(newsd, (char *)&flags, 1) < 0) { + dout(2) << "connect read tag, seq, " << strerror(errno) << dendl; + goto fail; + } + lossy_rx = flags & CEPH_MSG_CONNECT_LOSSYTX; + + // hooray! state = STATE_OPEN; sd = newsd; connect_seq = cseq+1; first_fault = last_attempt = utime_t(); - dout(20) << "connect success " << connect_seq << dendl; + dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl; if (!reader_running) { dout(20) << "connect starting reader" << dendl; @@ -1267,14 +1290,14 @@ void Rank::Pipe::fault(bool onconnect) sd = -1; // lossy channel? - if (policy.is_lossy()) { + if (policy.lossy_tx) { dout(10) << "fault on lossy channel, failing" << dendl; fail(); return; } if (q.empty()) { - if (state == STATE_CLOSING || onconnect || policy.is_lossy()) { + if (state == STATE_CLOSING || onconnect) { dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; state = STATE_CLOSED; } else { @@ -1478,10 +1501,10 @@ void Rank::Pipe::reader() } in_seq++; - if (in_seq == 1) + if (in_seq == 1) policy = rank.policy_map[m->get_source().type()]; /* apply policy */ - if (!policy.is_lossy() && in_seq != m->get_seq()) { + if (!lossy_rx && in_seq != m->get_seq()) { dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq << " for " << *m << " from " << m->get_source() << dendl; derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 8d23745177eac..bdc78e1d7bb71 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -39,30 +39,42 @@ using namespace __gnu_cxx; class Rank { public: struct Policy { - float retry_interval; // (initial). <0 => lossy channel, fail immediately. - float fail_interval; // before we call ms_handle_failure <0 => retry forever. + bool lossy_tx; // + float retry_interval; // initial retry interval. 0 => fail immediately (lossy_tx=true) + float fail_interval; // before we call ms_handle_failure (lossy_tx=true) bool drop_msg_callback; bool fail_callback; bool remote_reset_callback; Policy() : + lossy_tx(false), retry_interval(g_conf.ms_retry_interval), fail_interval(g_conf.ms_fail_interval), drop_msg_callback(true), fail_callback(true), remote_reset_callback(true) {} - Policy(float r, float f, bool dmc, bool fc, bool rrc) : + Policy(bool tx, float r, float f, bool dmc, bool fc, bool rrc) : + lossy_tx(tx), retry_interval(r), fail_interval(f), drop_msg_callback(dmc), fail_callback(fc), remote_reset_callback(rrc) {} - bool is_lossy() { - return retry_interval < 0; + static Policy lossless() { return Policy(false, + g_conf.ms_retry_interval, 0, + true, true, true); } + static Policy lossy_fail_after(float f) { + return Policy(true, + MIN(g_conf.ms_retry_interval, f), f, + true, true, true); } + static Policy lossy_fast_fail() { return Policy(true, -1, -1, true, true, true); } + + /* static Policy fast_fail() { return Policy(-1, -1, true, true, true); } static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); } static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); } + */ }; @@ -107,6 +119,7 @@ private: entity_addr_t peer_addr; entity_name_t last_dest_name; Policy policy; + bool lossy_rx; Mutex lock; int state; @@ -186,6 +199,8 @@ private: entity_addr_t& get_peer_addr() { return peer_addr; } + __u32 get_out_seq() { return out_seq; } + void register_pipe(); void unregister_pipe(); void dirty_close(); -- 2.39.5