+- objecter retry
+- kclient retry
+
v0.5
- debug restart, cosd reformat, etc.
- finish btrfs ioctl interface
cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl;
rank.start();
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
// start client
Client *client = new Client(rank.register_entity(entity_name_t::CLIENT()), &monmap);
cout << "starting mds? at " << rank.get_rank_addr() << std::endl;
rank.start();
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever()); // mds does its own timeout/markdown
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0));
+ rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless()); // mds does its own timeout/markdown
// start mds
Messenger *m = rank.register_entity(entity_name_t::MDS(whoami));
rank.start(); // may daemonize
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fail_after(g_conf.mon_lease_timeout * 2));
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(g_conf.mon_lease_timeout * 2));
+
+ rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail());
mon->init();
rank.start();
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
+
+ // make a _reasonable_ effort to send acks/replies to requests, but
+ // don't get carried away, as the sender may go away and we won't
+ // ever hear about it.
+ // FIXME: not until objecter/osd_client have a retry of some sort...
+ //rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fail_after(10.0));
+ //rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fail_after(10.0));
// start osd
Messenger *m = rank.register_entity(entity_name_t::OSD(whoami));
cout << "starting csyn at " << rank.get_rank_addr() << std::endl;
rank.start();
- rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::retry_forever());
- rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::fast_fail());
- rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::retry_forever());
- rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::retry_forever());
+ rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless());
list<Client*> clients;
list<SyntheticClient*> synclients;
struct ceph_msg_connect {
__le32 global_seq;
__le32 connect_seq;
-};
+ __u8 flags;
+} __attribute__ ((packed));
+
+#define CEPH_MSG_CONNECT_LOSSYTX 1 /* msg i send may be safely dropped */
/*
#define CEPH_MSG_OSD_OPREPLY 43
+struct ceph_ping {
+ __le64 seq;
+ struct ceph_timespec stamp;
+};
+
+
/* for statfs_reply. units are KB, objects. */
struct ceph_statfs {
__le64 f_total;
dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
- if (test_bit(LOSSY, &con->state)) {
- dout(30, "fault on LOSSY channel\n");
+ if (test_bit(LOSSYTX, &con->state)) {
+ dout(30, "fault on LOSSYTX channel\n");
remove_connection(con->msgr, con);
return;
}
{
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(con->global_seq);
+ con->out_connect.flags = 0;
+ if (test_bit(LOSSYTX, &con->state))
+ con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX;
con->out_kvec[0].iov_base = CEPH_BANNER;
con->out_kvec[0].iov_len = strlen(CEPH_BANNER);
set_bit(WRITE_PENDING, &con->state);
}
+static void prepare_write_accept_ready(struct ceph_connection *con)
+{
+ con->out_connect.flags = 0;
+ if (test_bit(LOSSYTX, &con->state))
+ con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX;
+
+ con->out_kvec[0].iov_base = &tag_ready;
+ con->out_kvec[0].iov_len = 1;
+ con->out_kvec[1].iov_base = &con->out_connect.flags;
+ con->out_kvec[1].iov_len = 1;
+ con->out_kvec_left = 2;
+ con->out_kvec_bytes = 2;
+ con->out_kvec_cur = con->out_kvec;
+ con->out_more = 0;
+ set_bit(WRITE_PENDING, &con->state);
+}
+
static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
u32 *pseq)
{
con->in_base_pos += ret;
}
+ if (con->in_tag == CEPH_MSGR_TAG_READY) {
+ to++;
+ if (con->in_base_pos < to) {
+ ret = ceph_tcp_recvmsg(con->sock,
+ (char *)&con->in_flags, 1);
+ if (ret <= 0)
+ goto out;
+ con->in_base_pos += ret;
+ }
+ }
+
if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
/* peer's connect_seq */
to += sizeof(con->in_connect.connect_seq);
case CEPH_MSGR_TAG_READY:
dout(10, "process_connect got READY, now open\n");
clear_bit(CONNECTING, &con->state);
+ con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
con->delay = 0; /* reset backoffmemory */
break;
default:
put_connection(old); /* dec reference count */
clear_bit(ACCEPTING, &new->state);
- prepare_write_accept_reply(new, &tag_ready);
+ prepare_write_accept_ready(new);
}
/*
if (verify_hello(con) < 0)
return -1;
+ /* note flags */
+ con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
+
/* connect */
/* do we have an existing connection for this peer? */
if (radix_tree_preload(GFP_NOFS) < 0) {
prepare_write_accept_retry(con,
&tag_retry_global,
&con->out_connect.global_seq);
- } else if (test_bit(LOSSY, &existing->state)) {
- dout(20, "process_accept replacing existing LOSSY %p\n",
+ } else if (test_bit(LOSSYTX, &existing->state)) {
+ dout(20, "process_accept replacing existing LOSSYTX %p\n",
existing);
reset_connection(existing);
__replace_connection(msgr, existing, con);
__register_connection(msgr, con);
con->global_seq = peer_gseq;
con->connect_seq = peer_cseq + 1;
- prepare_write_accept_reply(con, &tag_ready);
+ prepare_write_accept_ready(con);
}
spin_unlock(&msgr->con_lock);
radix_tree_preload_end();
if (IS_ERR(newcon))
return PTR_ERR(con);
+ newcon->out_connect.flags = 0;
+ if (!timeout)
+ newcon->out_connect.flags |= CEPH_MSG_CONNECT_LOSSYTX;
+
ret = radix_tree_preload(GFP_NOFS);
if (ret < 0) {
derr(10, "ENOMEM in ceph_msg_send\n");
}
if (!timeout) {
- dout(10, "ceph_msg_send setting LOSSY\n");
- set_bit(LOSSY, &con->state);
+ dout(10, "ceph_msg_send setting LOSSYTX\n");
+ set_bit(LOSSYTX, &con->state);
}
/* queue */
#define MAX_DELAY_INTERVAL (5 * 60 * HZ)
/* ceph_connection state bit flags */
-#define LOSSY 0 /* close channel on errors */
-#define CONNECTING 1
-#define ACCEPTING 2
-#define WRITE_PENDING 3 /* we have data to send */
-#define QUEUED 4 /* there is work to be done */
-#define BUSY 5 /* work is being done */
-#define BACKOFF 6 /* backing off; will retry */
-#define STANDBY 7 /* standby, when socket state close, no messages */
-#define WAIT 8 /* wait for peer to connect */
-#define CLOSED 9 /* we've closed the connection */
-#define SOCK_CLOSED 10 /* socket state changed to closed */
-#define REGISTERED 11
+#define LOSSYTX 0 /* close channel on errors */
+#define LOSSYRX 1 /* close channel on errors */
+#define CONNECTING 2
+#define ACCEPTING 3
+#define WRITE_PENDING 4 /* we have data to send */
+#define QUEUED 5 /* there is work to be done */
+#define BUSY 6 /* work is being done */
+#define BACKOFF 7 /* backing off; will retry */
+#define STANDBY 8 /* standby, when socket state close, no messages */
+#define WAIT 9 /* wait for peer to connect */
+#define CLOSED 10 /* we've closed the connection */
+#define SOCK_CLOSED 11 /* socket state changed to closed */
+#define REGISTERED 12
struct ceph_connection {
struct ceph_entity_addr peer_addr; /* peer address */
struct ceph_entity_name peer_name; /* peer name */
__u32 connect_seq, global_seq;
- char in_banner[CEPH_BANNER_MAX_LEN];
- struct ceph_msg_connect out_connect, in_connect;
- struct ceph_entity_addr actual_peer_addr;
- __u32 out_seq; /* last message queued for send */
- __u32 in_seq, in_seq_acked; /* last message received, acked */
+ bool lossy_rx; /* true if sender is lossy */
/* out queue */
spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */
struct list_head out_queue;
struct list_head out_sent; /* sending/sent but unacked */
+ __u32 out_seq; /* last message queued for send */
+ __u32 in_seq, in_seq_acked; /* last message received, acked */
+
+ /* negotiation temps */
+ char in_banner[CEPH_BANNER_MAX_LEN];
+ struct ceph_msg_connect out_connect, in_connect;
+ struct ceph_entity_addr actual_peer_addr;
+
+ /* out */
+ struct ceph_msg *out_msg;
+ struct ceph_msg_pos out_msg_pos;
__le32 out32;
struct kvec out_kvec[6],
*out_kvec_cur;
int out_kvec_left; /* kvec's left */
int out_kvec_bytes; /* bytes left */
int out_more; /* there is more data after this kvec */
- struct ceph_msg *out_msg;
- struct ceph_msg_pos out_msg_pos;
/* partially read message contents */
char in_tag;
+ u8 in_flags;
int in_base_pos; /* for ack seq, or msg headers, or handshake */
__u32 in_partial_ack;
struct ceph_msg *in_msg;
pipe = 0;
} else {
dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
+
+ // if this pipe was created by an incoming connection, but we haven't received
+ // a message yet, then it won't have the policy set.
+ if (pipe->get_out_seq() == 0)
+ pipe->policy = policy_map[m->get_dest().type()];
+
pipe->_send(m);
pipe->lock.Unlock();
}
rank.lock.Lock();
+ // note peer's flags
+ lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSYTX;
+
// existing?
if (rank.rank_pipe.count(peer_addr)) {
existing = rank.rank_pipe[peer_addr];
continue;
}
- if (existing->policy.is_lossy()) {
+ if (existing->policy.lossy_tx) {
dout(-10) << "accept replacing existing (lossy) channel" << dendl;
existing->was_session_reset();
goto replace;
peer_global_seq = connect.global_seq;
dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
- // send READY
+ // send READY + flags
{
char tag = CEPH_MSGR_TAG_READY;
if (tcp_write(sd, &tag, 1) < 0)
goto fail;
+ __u8 flags = 0;
+ if (policy.lossy_tx)
+ flags |= CEPH_MSG_CONNECT_LOSSYTX;
+ if (tcp_write(sd, (const char *)&flags, 1) < 0)
+ goto fail;
}
if (state != STATE_CLOSED) {
goto stop_locked;
}
+ // read flags
+ __u8 flags;
+ if (tcp_read(newsd, (char *)&flags, 1) < 0) {
+ dout(2) << "connect read tag, seq, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ lossy_rx = flags & CEPH_MSG_CONNECT_LOSSYTX;
+
+
// hooray!
state = STATE_OPEN;
sd = newsd;
connect_seq = cseq+1;
first_fault = last_attempt = utime_t();
- dout(20) << "connect success " << connect_seq << dendl;
+ dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
if (!reader_running) {
dout(20) << "connect starting reader" << dendl;
sd = -1;
// lossy channel?
- if (policy.is_lossy()) {
+ if (policy.lossy_tx) {
dout(10) << "fault on lossy channel, failing" << dendl;
fail();
return;
}
if (q.empty()) {
- if (state == STATE_CLOSING || onconnect || policy.is_lossy()) {
+ if (state == STATE_CLOSING || onconnect) {
dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
state = STATE_CLOSED;
} else {
}
in_seq++;
- if (in_seq == 1)
+ if (in_seq == 1)
policy = rank.policy_map[m->get_source().type()]; /* apply policy */
- if (!policy.is_lossy() && in_seq != m->get_seq()) {
+ if (!lossy_rx && in_seq != m->get_seq()) {
dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
<< " for " << *m << " from " << m->get_source() << dendl;
derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
class Rank {
public:
struct Policy {
- float retry_interval; // (initial). <0 => lossy channel, fail immediately.
- float fail_interval; // before we call ms_handle_failure <0 => retry forever.
+ bool lossy_tx; //
+ float retry_interval; // initial retry interval. 0 => fail immediately (lossy_tx=true)
+ float fail_interval; // before we call ms_handle_failure (lossy_tx=true)
bool drop_msg_callback;
bool fail_callback;
bool remote_reset_callback;
Policy() :
+ lossy_tx(false),
retry_interval(g_conf.ms_retry_interval),
fail_interval(g_conf.ms_fail_interval),
drop_msg_callback(true),
fail_callback(true),
remote_reset_callback(true) {}
- Policy(float r, float f, bool dmc, bool fc, bool rrc) :
+ Policy(bool tx, float r, float f, bool dmc, bool fc, bool rrc) :
+ lossy_tx(tx),
retry_interval(r), fail_interval(f),
drop_msg_callback(dmc),
fail_callback(fc),
remote_reset_callback(rrc) {}
- bool is_lossy() {
- return retry_interval < 0;
+ static Policy lossless() { return Policy(false,
+ g_conf.ms_retry_interval, 0,
+ true, true, true); }
+ static Policy lossy_fail_after(float f) {
+ return Policy(true,
+ MIN(g_conf.ms_retry_interval, f), f,
+ true, true, true);
}
+ static Policy lossy_fast_fail() { return Policy(true, -1, -1, true, true, true); }
+
+ /*
static Policy fast_fail() { return Policy(-1, -1, true, true, true); }
static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); }
+ */
};
entity_addr_t peer_addr;
entity_name_t last_dest_name;
Policy policy;
+ bool lossy_rx;
Mutex lock;
int state;
entity_addr_t& get_peer_addr() { return peer_addr; }
+ __u32 get_out_seq() { return out_seq; }
+
void register_pipe();
void unregister_pipe();
void dirty_close();