-- objecter retry
-- kclient retry
-
v0.5
- debug restart, cosd reformat, etc.
- finish btrfs ioctl interface
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
prepare_read_connect(con);
- con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
+ con->msgr->peer_reset(con->msgr->parent, &con->peer_addr,
+ &con->peer_name);
break;
case CEPH_MSGR_TAG_RETRY_SESSION:
dout(10,
/* replace connection */
__replace_connection(msgr, existing, con);
con->msgr->peer_reset(con->msgr->parent,
+ &con->peer_addr,
&con->peer_name);
} else {
/* old attempt or peer didn't get the READY */
extern struct workqueue_struct *ceph_msgr_wq; /* receive work queue */
typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
-typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_name *pn);
+typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
+ struct ceph_entity_name *pn);
typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
int want);
/*
* caller should hold read sem
*/
-static int kick_requests(struct ceph_osd_client *osdc)
+static void kick_requests(struct ceph_osd_client *osdc,
+ struct ceph_entity_addr *who)
{
u64 next_tid = 0;
struct ceph_osd_request *req;
int got;
int osd;
- int ret = 0;
+ int needmap = 0;
more:
spin_lock(&osdc->request_lock);
if (osd < 0) {
dout(20, "tid %llu maps to no osd\n",
req->r_tid);
- ret++; /* request a newer map */
+ needmap++; /* request a newer map */
memset(&req->r_last_osd, 0, sizeof(req->r_last_osd));
} else if (!ceph_entity_addr_equal(&req->r_last_osd,
- &osdc->osdmap->osd_addr[osd])) {
+ &osdc->osdmap->osd_addr[osd]) ||
+ (who && ceph_entity_addr_equal(&req->r_last_osd, who))) {
dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd);
get_request(req);
spin_unlock(&osdc->request_lock);
done:
spin_unlock(&osdc->request_lock);
- if (ret)
+ if (needmap) {
dout(10, "%d requests pending on down osds, need new map\n",
- ret);
- return ret;
+ needmap);
+ ceph_monc_request_osdmap(&osdc->client->monc,
+ osdc->osdmap->epoch);
+ }
}
void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
done:
downgrade_write(&osdc->map_sem);
ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
- if (newmap && kick_requests(osdc))
- ceph_monc_request_osdmap(&osdc->client->monc,
- osdc->osdmap->epoch);
+ if (newmap)
+ kick_requests(osdc, 0);
up_read(&osdc->map_sem);
return;
}
+void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
+ struct ceph_entity_addr *addr)
+{
+ down_read(&osdc->map_sem);
+ kick_requests(osdc, addr);
+ up_read(&osdc->map_sem);
+}
+
/*
* find pages for message payload to be read into.
struct ceph_client *client);
extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
+extern void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
+ struct ceph_entity_addr *addr);
+
extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
#include "mon_client.h"
void ceph_dispatch(void *p, struct ceph_msg *msg);
-void ceph_peer_reset(void *p, struct ceph_entity_name *peer_name);
+void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr,
+ struct ceph_entity_name *peer_name);
/*
return "unknown";
}
-void ceph_peer_reset(void *p, struct ceph_entity_name *peer_name)
+void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr,
+ struct ceph_entity_name *peer_name)
{
struct ceph_client *client = p;
- dout(30, "ceph_peer_reset peer_name = %s%d\n", ENTITY_NAME(*peer_name));
-
- /* we only care about mds disconnects */
- if (le32_to_cpu(peer_name->type) != CEPH_ENTITY_TYPE_MDS)
- return;
-
- ceph_mdsc_handle_reset(&client->mdsc, le32_to_cpu(peer_name->num));
+ dout(30, "ceph_peer_reset %s%d\n", ENTITY_NAME(*peer_name));
+ switch (le32_to_cpu(peer_name->type)) {
+ case CEPH_ENTITY_TYPE_MDS:
+ return ceph_mdsc_handle_reset(&client->mdsc,
+ le32_to_cpu(peer_name->num));
+ case CEPH_ENTITY_TYPE_OSD:
+ return ceph_osdc_handle_reset(&client->osdc, peer_addr);
+ }
}