/* mds client */
case CEPH_MSG_MDS_MAP:
- ceph_mdsc_handle_map(&client->mdsc, msg);
+ //ceph_mdsc_handle_map(&client->mdsc, msg);
+ ceph_msg_put(msg);
break;
case CEPH_MSG_CLIENT_REPLY:
ceph_mdsc_handle_reply(&client->mdsc, msg);
struct msghdr msg = {.msg_flags = 0};
int rlen = 0; /* length read */
- dout(30, "ceph_tcp_recvmsg %p len %d\n", sock, (int)len);
+ //dout(30, "ceph_tcp_recvmsg %p len %d\n", sock, (int)len);
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
/* receive one kvec for now... */
rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
- dout(30, "ceph_tcp_recvmsg %p len %d ret = %d\n", sock, (int)len, rlen);
+ //dout(30, "ceph_tcp_recvmsg %p len %d ret = %d\n", sock, (int)len, rlen);
return(rlen);
}
struct msghdr msg = {.msg_flags = 0};
int rlen = 0;
- dout(30, "ceph_tcp_sendmsg %p len %d\n", sock, (int)len);
+ //dout(30, "ceph_tcp_sendmsg %p len %d\n", sock, (int)len);
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
- dout(30, "ceph_tcp_sendmsg %p len %d ret = %d\n", sock, (int)len, rlen);
+ //dout(30, "ceph_tcp_sendmsg %p len %d ret = %d\n", sock, (int)len, rlen);
return(rlen);
}
static void register_session(struct ceph_mds_client *mdsc, int mds)
{
+ struct ceph_mds_session *s;
+
/* register */
if (mds >= mdsc->max_sessions) {
/* realloc */
+ struct ceph_mds_session **sa;
+ sa = kzalloc(mds * sizeof(struct ceph_mds_session), GFP_KERNEL);
+ BUG_ON(sa == NULL); /* i am lazy */
+ if (mdsc->sessions) {
+ memcpy(sa, mdsc->sessions,
+ mdsc->max_sessions*sizeof(struct ceph_mds_session));
+ kfree(mdsc->sessions);
+ }
+ mdsc->sessions = sa;
}
- mdsc->sessions[mds] = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
- mdsc->sessions[mds]->s_state = 0;
- mdsc->sessions[mds]->s_cap_seq = 0;
- init_completion(&mdsc->sessions[mds]->s_completion);
- atomic_set(&mdsc->sessions[mds]->s_ref, 1);
+ s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
+ s->s_state = 0;
+ s->s_cap_seq = 0;
+ init_completion(&s->s_completion);
+ atomic_set(&s->s_ref, 1);
+ mdsc->sessions[mds] = s;
}
static struct ceph_mds_session *get_session(struct ceph_mds_client *mdsc, int mds)
{
spin_lock_init(&mdsc->lock);
mdsc->client = client;
- mdsc->mdsmap = 0; /* none yet */
+ mdsc->mdsmap = 0; /* none yet */
mdsc->sessions = 0;
mdsc->max_sessions = 0;
mdsc->last_tid = 0;
/* do we need it? */
spin_lock(&mdsc->lock);
if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
- dout(2, "ceph_mdsc_handle_map epoch %llu < our %llu\n", epoch, mdsc->mdsmap->m_epoch);
+ dout(2, "ceph_mdsc_handle_map epoch %llu < our %llu\n",
+ epoch, mdsc->mdsmap->m_epoch);
spin_unlock(&mdsc->lock);
goto out;
}
oldmap = mdsc->mdsmap;
mdsc->mdsmap = newmap;
spin_unlock(&mdsc->lock);
- ceph_mdsmap_destroy(oldmap);
+ if (oldmap)
+ ceph_mdsmap_destroy(oldmap);
} else {
spin_unlock(&mdsc->lock);
dout(2, "ceph_mdsc_handle_map lost decode race?\n");
for (i=0; i<n; i++) {
if ((err = ceph_decode_32(p, end, &mds)) != 0)
goto bad;
+ if (mds >= m->m_max_mds)
+ goto bad;
if ((err = ceph_decode_32(p, end, &m->m_state[mds])) != 0)
goto bad;
}
for (i=0; i<n; i++) {
if ((err = ceph_decode_32(p, end, &mds)) != 0)
goto bad;
+ if (mds >= m->m_max_mds)
+ goto bad;
*p += sizeof(struct ceph_entity_name);
if ((err = ceph_decode_addr(p, end, &m->m_addr[mds])) != 0)
goto bad;
}
/* ok, we don't care about the rest. */
+ dout(30, "mdsmap_decode success epoch %llu\n", m->m_epoch);
return m;
bad:
dout(30, "write_partial_kvec %p left %d vec %d bytes\n", con,
con->out_kvec_left, con->out_kvec_bytes);
while (con->out_kvec_bytes > 0) {
- ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes);
+ ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+ con->out_kvec_left, con->out_kvec_bytes);
if (ret <= 0) goto out;
con->out_kvec_bytes -= ret;
if (con->out_kvec_bytes == 0)
/* move to sending/sent list */
list_del(&m->list_head);
list_add_tail(&m->list_head, &con->out_sent);
- con->out_msg = m;
+ con->out_msg = m; /* FIXME: do we want to take a reference here? */
/* encode header */
ceph_encode_header(&con->out_hdr, &m->hdr);
*/
static int prepare_read_message(struct ceph_connection *con)
{
+ BUG_ON(con->in_msg != NULL);
con->in_tag = CEPH_MSGR_TAG_MSG;
con->in_base_pos = 0;
con->in_msg = kzalloc(sizeof(*con->in_msg), GFP_KERNEL);
m->front.iov_base = kmalloc(m->hdr.front_len, GFP_KERNEL);
if (m->front.iov_base == NULL)
return -ENOMEM;
+ dout(50, "front is %p\n", m->front.iov_base);
}
left = m->hdr.front_len - m->front.iov_len;
ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left);
static int read_accept_partial(struct ceph_connection *con)
{
int ret;
+ int to;
/* peer addr */
- while (con->in_base_pos < sizeof(con->peer_addr)) {
- int left = sizeof(con->peer_addr) - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_addr + con->in_base_pos, left);
+ to = sizeof(con->peer_addr);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = con->in_base_pos;
+ ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_addr + have, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
/* connect_seq */
- while (con->in_base_pos < sizeof(con->peer_addr) + sizeof(con->connect_seq)) {
- int off = con->in_base_pos - sizeof(con->peer_addr);
- int left = sizeof(con->peer_addr) + sizeof(con->connect_seq) - con->in_base_pos;
- ret = ceph_tcp_recvmsg(con->sock, (char*)&con->connect_seq + off, left);
+ to += sizeof(con->connect_seq);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(con->peer_addr) - left;
+ ret = ceph_tcp_recvmsg(con->sock, (char*)&con->connect_seq + have, left);
if (ret <= 0) return ret;
con->in_base_pos += ret;
}
existing = get_connection(con->msgr, &con->peer_addr);
if (existing) {
spin_lock(&existing->lock);
+ /* replace existing connection? */
if ((test_bit(CONNECTING, &existing->state) &&
compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
(test_bit(OPEN, &existing->state) &&
/* create listening socket */
ret = ceph_tcp_listen(msgr);
- if(ret < 0) {
+ if (ret < 0) {
kfree(msgr);
- return ERR_PTR(ret);
+ return ERR_PTR(ret);
}
if (myaddr)
msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
m->front.iov_base = kmalloc(front_len, GFP_KERNEL);
if (m->front.iov_base == NULL)
goto out2;
+ dout(50, "ceph_msg_new front is %p\n", m->front.iov_base);
m->front.iov_len = front_len;
/* pages */
if (m->pages) {
for (i=0; i<m->nr_pages; i++)
if (m->pages[i])
- kfree(m->pages[i]);
+ __free_pages(m->pages[i], 0);
kfree(m->pages);
}
- if (m->front.iov_base)
+ if (m->front.iov_base) {
kfree(m->front.iov_base);
+ }
kfree(m);
}
}
goto bad;
if ((err = ceph_decode_64(p, end, &epoch)) < 0)
goto bad;
+ BUG_ON(epoch != map->epoch+1);
if ((err = ceph_decode_64(p, end, &mon_epoch)) < 0)
goto bad;
if ((err = ceph_decode_32(p, end, &ctime.tv_sec)) < 0)
newmap = osdmap_decode(p, min(*p+len, end));
return newmap; /* error or not */
}
- if (!map || epoch != map->epoch+1)
- return 0; /* old or new, or no existing; done */
/* new crush? */
if ((err = ceph_decode_32(p, end, &len)) < 0)
{
void *p, *end, *next;
__u32 nr_maps, maplen;
+ __u64 epoch;
struct ceph_osdmap *newmap = 0;
int err;
if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0)
goto bad;
while (nr_maps--) {
+ if ((err = ceph_decode_64(&p, end, &epoch)) < 0)
+ goto bad;
if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
goto bad;
next = p + maplen;
- newmap = apply_incremental(p, min(p+maplen,end), osdc->osdmap);
- if (IS_ERR(newmap)) {
- err = PTR_ERR(newmap);
- goto bad;
- }
- if (newmap != osdc->osdmap) {
- osdmap_destroy(osdc->osdmap);
- osdc->osdmap = newmap;
+ if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
+ newmap = apply_incremental(p, min(p+maplen,end), osdc->osdmap);
+ if (IS_ERR(newmap)) {
+ err = PTR_ERR(newmap);
+ goto bad;
+ }
+ if (newmap != osdc->osdmap) {
+ osdmap_destroy(osdc->osdmap);
+ osdc->osdmap = newmap;
+ }
+ dout(1, "applied incremental map %llu\n", epoch);
+ } else {
+ dout(1, "ignored incremental map %llu\n", epoch);
}
- dout(1, "got incremental map %llu\n", newmap->epoch);
p = next;
}
if (newmap)
if ((err = ceph_decode_32(&p, end, &nr_maps)) < 0)
goto bad;
while (nr_maps > 1) {
- dout(5, "ignoring non-latest full map\n");
+ dout(5, "skipping non-latest full map\n");
+ if ((err = ceph_decode_64(&p, end, &epoch)) < 0)
+ goto bad;
if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
goto bad;
p += maplen;
}
if (nr_maps) {
- if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
+ if ((err = ceph_decode_64(&p, end, &epoch)) < 0)
goto bad;
- newmap = osdmap_decode(&p, min(p+maplen,end));
- if (IS_ERR(newmap)) {
- err = PTR_ERR(newmap);
+ if ((err = ceph_decode_32(&p, end, &maplen)) < 0)
goto bad;
+ if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
+ dout(10, "full map %llu is older than our %llu\n",
+ epoch, osdc->osdmap->epoch);
+ } else {
+ newmap = osdmap_decode(&p, min(p+maplen,end));
+ if (IS_ERR(newmap)) {
+ err = PTR_ERR(newmap);
+ goto bad;
+ }
+ if (osdc->osdmap)
+ osdmap_destroy(osdc->osdmap);
+ osdc->osdmap = newmap;
+ dout(10, "took full map %llu\n", newmap->epoch);
}
- osdmap_destroy(osdc->osdmap);
- osdc->osdmap = newmap;
- dout(1, "got full map %llu\n", newmap->epoch);
}
out:
ret = set_anon_super(s, 0); /* what is the second arg for? */
if (ret != 0)
- goto out;
+ goto bail;
return ret;
-out:
+bail:
kfree(s->s_fs_info);
s->s_fs_info = 0;
return ret;