]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: track lossy mode independently for self, peer
authorSage Weil <sage@newdream.net>
Mon, 13 Oct 2008 18:56:31 +0000 (11:56 -0700)
committerSage Weil <sage@newdream.net>
Mon, 13 Oct 2008 19:02:27 +0000 (12:02 -0700)
The policy will be asymmetrical for the OSDs, so we need to
track it independently.

The various assertions aren't all worked out yet.  Notably,
connection races aren't quite right.  But the basic bits
are there.

src/TODO
src/cfuse.cc
src/cmds.cc
src/cmon.cc
src/cosd.cc
src/csyn.cc
src/include/ceph_fs.h
src/kernel/messenger.c
src/kernel/messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 99d53edd7bb264c548810e0e05d2213dc99603d9..4b08d832f9acbb7069ba989a1dae44e15b160dee 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -1,3 +1,6 @@
+- objecter retry
+- kclient retry
+
 v0.5
 - debug restart, cosd reformat, etc.
 - finish btrfs ioctl interface
index e701feb96290ecc46398ebf2f23386933c5bd018..d21fb1db5da6bda8a7302225ee23bda49c108221 100644 (file)
@@ -71,10 +71,9 @@ int main(int argc, const char **argv, const char *envp[]) {
   cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl;
   rank.start();
 
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
 
   // start client
   Client *client = new Client(rank.register_entity(entity_name_t::CLIENT()), &monmap);
index 676d282d9d1d6d4a6bb12c0a745e6a0ce0811d07..e60daabf5848b9ac4d42988b7c3b361414148a04 100644 (file)
@@ -68,10 +68,10 @@ int main(int argc, const char **argv)
   cout << "starting mds? at " << rank.get_rank_addr() << std::endl;
   rank.start();
   
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());  // mds does its own timeout/markdown
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0));
+  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless());  // mds does its own timeout/markdown
 
   // start mds
   Messenger *m = rank.register_entity(entity_name_t::MDS(whoami));
index 0233f9559cc13721fc752cec63720e5681b1af47..8afc77de04bffb2e6085c482e64f3449579000d8 100644 (file)
@@ -109,11 +109,12 @@ int main(int argc, const char **argv)
 
   rank.start();  // may daemonize
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fail_after(g_conf.mon_lease_timeout * 2));
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(g_conf.mon_lease_timeout * 2));
+
+  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail());
 
 
   mon->init();
index 3f22b45b1a5b0ad18c4fa648df184ec30ffa8a14..1736dfc75bafa238de9175e46b3deab89d334d41 100644 (file)
@@ -119,8 +119,15 @@ int main(int argc, const char **argv)
 
   rank.start();
 
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+
+  // make a _reasonable_ effort to send acks/replies to requests, but
+  // don't get carried away, as the sender may go away and we won't
+  // ever hear about it.
+  // FIXME: not until objecter/osd_client have a retry of some sort...
+  //rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fail_after(10.0));
+  //rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fail_after(10.0));
 
   // start osd
   Messenger *m = rank.register_entity(entity_name_t::OSD(whoami));
index 12cbbd7b4ace82540f2ba9984fcef76a4432087e..7d8ba9c360749454b98c8d756be3002a30aa5f92 100644 (file)
@@ -59,10 +59,9 @@ int main(int argc, const char **argv, char *envp[])
   cout << "starting csyn at " << rank.get_rank_addr() << std::endl;
   rank.start();
 
-  rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());
-  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
-  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
-  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+  rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
 
   list<Client*> clients;
   list<SyntheticClient*> synclients;
index 442d25e1818bae1ebb0f41418cf162c3df61b308..4e7db817736470ba1ed9402276b11dd0718f4228 100644 (file)
@@ -399,7 +399,10 @@ struct ceph_entity_inst {
 struct ceph_msg_connect {
        __le32 global_seq;
        __le32 connect_seq;
-};
+       __u8  flags;
+} __attribute__ ((packed));
+
+#define CEPH_MSG_CONNECT_LOSSYTX  1  /* msg i send may be safely dropped */
 
 
 /*
@@ -469,6 +472,12 @@ struct ceph_msg_footer {
 #define CEPH_MSG_OSD_OPREPLY      43
 
 
+struct ceph_ping {
+       __le64 seq;
+       struct ceph_timespec stamp;
+};
+
+
 /* for statfs_reply.  units are KB, objects. */
 struct ceph_statfs {
        __le64 f_total;
index 74dc92f161a3a6d403d85fc68f8addddf8e3974b..5021c26a60065767b5b8855de4db059bdbd5a522 100644 (file)
@@ -581,8 +581,8 @@ static void ceph_fault(struct ceph_connection *con)
        dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
             con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
 
-       if (test_bit(LOSSY, &con->state)) {
-               dout(30, "fault on LOSSY channel\n");
+       if (test_bit(LOSSYTX, &con->state)) {
+               dout(30, "fault on LOSSYTX channel\n");
                remove_connection(con->msgr, con);
                return;
        }
@@ -860,6 +860,9 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
 {
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(con->global_seq);
+       con->out_connect.flags = 0;
+       if (test_bit(LOSSYTX, &con->state))
+               con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX;
 
        con->out_kvec[0].iov_base = CEPH_BANNER;
        con->out_kvec[0].iov_len = strlen(CEPH_BANNER);
@@ -918,6 +921,23 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
        set_bit(WRITE_PENDING, &con->state);
 }
 
+static void prepare_write_accept_ready(struct ceph_connection *con)
+{
+       con->out_connect.flags = 0;
+       if (test_bit(LOSSYTX, &con->state))
+               con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX;
+
+       con->out_kvec[0].iov_base = &tag_ready;
+       con->out_kvec[0].iov_len = 1;
+       con->out_kvec[1].iov_base = &con->out_connect.flags;
+       con->out_kvec[1].iov_len = 1;
+       con->out_kvec_left = 2;
+       con->out_kvec_bytes = 2;
+       con->out_kvec_cur = con->out_kvec;
+       con->out_more = 0;
+       set_bit(WRITE_PENDING, &con->state);
+}
+
 static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
                                       u32 *pseq)
 {
@@ -1309,6 +1329,17 @@ static int read_connect_partial(struct ceph_connection *con)
                con->in_base_pos += ret;
        }
 
+       if (con->in_tag == CEPH_MSGR_TAG_READY) {
+               to++;
+               if (con->in_base_pos < to) {
+                       ret = ceph_tcp_recvmsg(con->sock,
+                                              (char *)&con->in_flags, 1);
+                       if (ret <= 0)
+                               goto out;
+                       con->in_base_pos += ret;
+               }
+       }
+
        if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
                /* peer's connect_seq */
                to += sizeof(con->in_connect.connect_seq);
@@ -1437,6 +1468,7 @@ static int process_connect(struct ceph_connection *con)
        case CEPH_MSGR_TAG_READY:
                dout(10, "process_connect got READY, now open\n");
                clear_bit(CONNECTING, &con->state);
+               con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
                con->delay = 0;  /* reset backoffmemory */
                break;
        default:
@@ -1523,7 +1555,7 @@ static void __replace_connection(struct ceph_messenger *msgr,
        put_connection(old); /* dec reference count */
 
        clear_bit(ACCEPTING, &new->state);
-       prepare_write_accept_reply(new, &tag_ready);
+       prepare_write_accept_ready(new);
 }
 
 /*
@@ -1539,6 +1571,9 @@ static int process_accept(struct ceph_connection *con)
        if (verify_hello(con) < 0)
                return -1;
 
+       /* note flags */
+       con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
+
        /* connect */
        /* do we have an existing connection for this peer? */
        if (radix_tree_preload(GFP_NOFS) < 0) {
@@ -1557,8 +1592,8 @@ static int process_accept(struct ceph_connection *con)
                        prepare_write_accept_retry(con,
                                           &tag_retry_global,
                                           &con->out_connect.global_seq);
-               } else if (test_bit(LOSSY, &existing->state)) {
-                       dout(20, "process_accept replacing existing LOSSY %p\n",
+               } else if (test_bit(LOSSYTX, &existing->state)) {
+                       dout(20, "process_accept replacing existing LOSSYTX %p\n",
                             existing);
                        reset_connection(existing);
                        __replace_connection(msgr, existing, con);
@@ -1615,7 +1650,7 @@ static int process_accept(struct ceph_connection *con)
                __register_connection(msgr, con);
                con->global_seq = peer_gseq;
                con->connect_seq = peer_cseq + 1;
-               prepare_write_accept_reply(con, &tag_ready);
+               prepare_write_accept_ready(con);
        }
        spin_unlock(&msgr->con_lock);
        radix_tree_preload_end();
@@ -1992,6 +2027,10 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                if (IS_ERR(newcon))
                        return PTR_ERR(con);
 
+               newcon->out_connect.flags = 0;
+               if (!timeout)
+                       newcon->out_connect.flags |= CEPH_MSG_CONNECT_LOSSYTX;
+
                ret = radix_tree_preload(GFP_NOFS);
                if (ret < 0) {
                        derr(10, "ENOMEM in ceph_msg_send\n");
@@ -2024,8 +2063,8 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        }
 
        if (!timeout) {
-               dout(10, "ceph_msg_send setting LOSSY\n");
-               set_bit(LOSSY, &con->state);
+               dout(10, "ceph_msg_send setting LOSSYTX\n");
+               set_bit(LOSSYTX, &con->state);
        }
 
        /* queue */
index 316aff2887494f88840092387012674781175f0f..2b6586a1fac42a269e98514dacf24d4303a119dc 100644 (file)
@@ -77,18 +77,19 @@ struct ceph_msg_pos {
 #define MAX_DELAY_INTERVAL     (5 * 60 * HZ)
 
 /* ceph_connection state bit flags */
-#define LOSSY           0 /* close channel on errors */
-#define CONNECTING     1
-#define ACCEPTING      2
-#define WRITE_PENDING  3  /* we have data to send */
-#define QUEUED          4  /* there is work to be done */
-#define BUSY            5  /* work is being done */
-#define BACKOFF         6  /* backing off; will retry */
-#define STANDBY                7  /* standby, when socket state close, no messages */
-#define WAIT           8  /* wait for peer to connect */
-#define CLOSED         9  /* we've closed the connection */
-#define SOCK_CLOSED    10 /* socket state changed to closed */
-#define REGISTERED      11
+#define LOSSYTX         0 /* close channel on errors */
+#define LOSSYRX         1 /* close channel on errors */
+#define CONNECTING     2
+#define ACCEPTING      3
+#define WRITE_PENDING  4  /* we have data to send */
+#define QUEUED          5  /* there is work to be done */
+#define BUSY            6  /* work is being done */
+#define BACKOFF         7  /* backing off; will retry */
+#define STANDBY                8  /* standby, when socket state close, no messages */
+#define WAIT           9  /* wait for peer to connect */
+#define CLOSED         10  /* we've closed the connection */
+#define SOCK_CLOSED    11 /* socket state changed to closed */
+#define REGISTERED      12
 
 
 struct ceph_connection {
@@ -105,28 +106,34 @@ struct ceph_connection {
        struct ceph_entity_addr peer_addr; /* peer address */
        struct ceph_entity_name peer_name; /* peer name */
        __u32 connect_seq, global_seq;
-       char in_banner[CEPH_BANNER_MAX_LEN];
-       struct ceph_msg_connect out_connect, in_connect;
-       struct ceph_entity_addr actual_peer_addr;
-       __u32 out_seq;               /* last message queued for send */
-       __u32 in_seq, in_seq_acked;  /* last message received, acked */
+       bool lossy_rx;                     /* true if sender is lossy */
 
        /* out queue */
        spinlock_t out_queue_lock;   /* protects out_queue, out_sent, out_seq */
        struct list_head out_queue;
        struct list_head out_sent;   /* sending/sent but unacked */
 
+       __u32 out_seq;               /* last message queued for send */
+       __u32 in_seq, in_seq_acked;  /* last message received, acked */
+
+       /* negotiation temps */
+       char in_banner[CEPH_BANNER_MAX_LEN];
+       struct ceph_msg_connect out_connect, in_connect;
+       struct ceph_entity_addr actual_peer_addr;
+
+       /* out */
+       struct ceph_msg *out_msg;
+       struct ceph_msg_pos out_msg_pos;
        __le32 out32;
        struct kvec out_kvec[6],
                *out_kvec_cur;
        int out_kvec_left;   /* kvec's left */
        int out_kvec_bytes;  /* bytes left */
        int out_more;        /* there is more data after this kvec */
-       struct ceph_msg *out_msg;
-       struct ceph_msg_pos out_msg_pos;
 
        /* partially read message contents */
        char in_tag;
+       u8 in_flags;
        int in_base_pos;   /* for ack seq, or msg headers, or handshake */
        __u32 in_partial_ack;
        struct ceph_msg *in_msg;
index 000301e65b82e9e4d04f46e8aa6c9bb4c5539cdd..b244e3471d6313e002f7cb12552e27dc1541ddd6 100644 (file)
@@ -448,6 +448,12 @@ void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
          pipe = 0;
        } else {
          dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
+
+         // if this pipe was created by an incoming connection, but we haven't received
+         // a message yet, then it won't have the policy set.
+         if (pipe->get_out_seq() == 0)
+           pipe->policy = policy_map[m->get_dest().type()];
+
          pipe->_send(m);
          pipe->lock.Unlock();
        }
@@ -839,6 +845,9 @@ int Rank::Pipe::accept()
     
     rank.lock.Lock();
 
+    // note peer's flags
+    lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSYTX;
+
     // existing?
     if (rank.rank_pipe.count(peer_addr)) {
       existing = rank.rank_pipe[peer_addr];
@@ -859,7 +868,7 @@ int Rank::Pipe::accept()
        continue;
       }
       
-      if (existing->policy.is_lossy()) {
+      if (existing->policy.lossy_tx) {
        dout(-10) << "accept replacing existing (lossy) channel" << dendl;
        existing->was_session_reset();
        goto replace;
@@ -978,11 +987,16 @@ int Rank::Pipe::accept()
   peer_global_seq = connect.global_seq;
   dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
 
-  // send READY
+  // send READY + flags
   { 
     char tag = CEPH_MSGR_TAG_READY;
     if (tcp_write(sd, &tag, 1) < 0) 
       goto fail;
+    __u8 flags = 0;
+    if (policy.lossy_tx)
+      flags |= CEPH_MSG_CONNECT_LOSSYTX;
+    if (tcp_write(sd, (const char *)&flags, 1) < 0) 
+      goto fail;
   }
 
   if (state != STATE_CLOSED) {
@@ -1198,12 +1212,21 @@ int Rank::Pipe::connect()
        goto stop_locked;
       }
 
+      // read flags
+      __u8 flags;
+      if (tcp_read(newsd, (char *)&flags, 1) < 0) {
+       dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
+       goto fail;
+      }
+      lossy_rx = flags & CEPH_MSG_CONNECT_LOSSYTX;
+
+
       // hooray!
       state = STATE_OPEN;
       sd = newsd;
       connect_seq = cseq+1;
       first_fault = last_attempt = utime_t();
-      dout(20) << "connect success " << connect_seq << dendl;
+      dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
 
       if (!reader_running) {
        dout(20) << "connect starting reader" << dendl;
@@ -1267,14 +1290,14 @@ void Rank::Pipe::fault(bool onconnect)
   sd = -1;
 
   // lossy channel?
-  if (policy.is_lossy()) {
+  if (policy.lossy_tx) {
     dout(10) << "fault on lossy channel, failing" << dendl;
     fail();
     return;
   }
 
   if (q.empty()) {
-    if (state == STATE_CLOSING || onconnect || policy.is_lossy()) {
+    if (state == STATE_CLOSING || onconnect) {
       dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
       state = STATE_CLOSED;
     } else {
@@ -1478,10 +1501,10 @@ void Rank::Pipe::reader()
       }
       in_seq++;
 
-      if (in_seq == 1) 
+      if (in_seq == 1)
        policy = rank.policy_map[m->get_source().type()];  /* apply policy */
 
-      if (!policy.is_lossy() && in_seq != m->get_seq()) {
+      if (!lossy_rx && in_seq != m->get_seq()) {
        dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
                << " for " << *m << " from " << m->get_source() << dendl;
        derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
index 8d23745177eac5213c10d696b500c5d03ffd1933..bdc78e1d7bb713dda7ac9b7d5e10971babec69ca 100644 (file)
@@ -39,30 +39,42 @@ using namespace __gnu_cxx;
 class Rank {
 public:
   struct Policy {
-    float retry_interval;               // (initial).  <0 => lossy channel, fail immediately.
-    float fail_interval;                // before we call ms_handle_failure  <0 => retry forever.
+    bool lossy_tx;                // 
+    float retry_interval;         // initial retry interval.  0 => fail immediately (lossy_tx=true)
+    float fail_interval;          // before we call ms_handle_failure (lossy_tx=true)
     bool drop_msg_callback;
     bool fail_callback;
     bool remote_reset_callback;
     Policy() : 
+      lossy_tx(false),
       retry_interval(g_conf.ms_retry_interval),
       fail_interval(g_conf.ms_fail_interval),
       drop_msg_callback(true),
       fail_callback(true),
       remote_reset_callback(true) {}
 
-    Policy(float r, float f, bool dmc, bool fc, bool rrc) :
+    Policy(bool tx, float r, float f, bool dmc, bool fc, bool rrc) :
+      lossy_tx(tx),
       retry_interval(r), fail_interval(f),
       drop_msg_callback(dmc),
       fail_callback(fc),
       remote_reset_callback(rrc) {}
 
-    bool is_lossy() {
-      return retry_interval < 0;
+    static Policy lossless() { return Policy(false,
+                                            g_conf.ms_retry_interval, 0,
+                                            true, true, true); }
+    static Policy lossy_fail_after(float f) {
+      return Policy(true, 
+                   MIN(g_conf.ms_retry_interval, f), f,
+                   true, true, true);
     }
+    static Policy lossy_fast_fail() { return Policy(true, -1, -1, true, true, true); }
+
+    /*
     static Policy fast_fail() { return Policy(-1, -1, true, true, true); }
     static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
     static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); }
+    */
   };
 
 
@@ -107,6 +119,7 @@ private:
     entity_addr_t peer_addr;
     entity_name_t last_dest_name;
     Policy policy;
+    bool lossy_rx;
     
     Mutex lock;
     int state;
@@ -186,6 +199,8 @@ private:
 
     entity_addr_t& get_peer_addr() { return peer_addr; }
 
+    __u32 get_out_seq() { return out_seq; }
+
     void register_pipe();
     void unregister_pipe();
     void dirty_close();