*/
static void reset_connection(struct ceph_connection *con)
{
+ derr(1, "%s%d %u.%u.%u.%u:%u connection reset\n",
+ ENTITY_NAME(con->peer_name),
+ IPQUADPORT(con->peer_addr.ipaddr));
+
/* reset connection, out_queue, msg_ and connect_seq */
/* discard existing out_queue and msg_seq */
while (!list_empty(&con->out_queue)) {
BUG_ON(!dup);
memcpy(dup->front.iov_base, old->front.iov_base,
le32_to_cpu(old->hdr.front_len));
- if (old->pages)
- derr(0, "WARNING: unsafely referenced old pages for %p\n",
- old);
+
+ /* revoke old message's pages */
+ mutex_lock(&old->page_mutex);
+ old->pages = 0;
+ mutex_unlock(&old->page_mutex);
+
ceph_msg_put(old);
return dup;
}
req = kmalloc(sizeof(*req) + nr_pages*sizeof(void *), GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
+ req->r_aborted = 0;
req->r_request = msg;
req->r_nr_pages = nr_pages;
atomic_set(&req->r_ref, 1);
int osd;
int ret = 0;
-more:
spin_lock(&osdc->request_lock);
+more:
got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
next_tid, 1);
if (got == 0)
dout(20, "kicking tid %llu osd%d\n", req->r_tid, osd);
get_request(req);
spin_unlock(&osdc->request_lock);
- req->r_request = ceph_msg_maybe_dup(req->r_request);
- req->r_flags |= CEPH_OSD_OP_RETRY;
- send_request(osdc, req, osd);
+ req->r_request = ceph_msg_maybe_dup(req->r_request);
+ if (req->r_aborted) {
+ req->r_flags |= CEPH_OSD_OP_RETRY;
+ send_request(osdc, req, osd);
+ }
put_request(req);
goto more;
}
unregister_request(osdc, req);
if (rc < 0) {
- struct ceph_msg *msg = req->r_request;
+ struct ceph_msg *msg;
dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
- rc, msg);
+ rc, req->r_request);
+ /*
+ * mark req aborted _before_ revoking pages, so that
+ * if a racing kick_request _does_ dup the page vec
+ * pointer, it will definitely then see the aborted
+ * flag and not send the request.
+ */
+ req->r_aborted = 1;
+ msg = req->r_request;
mutex_lock(&msg->page_mutex);
msg->pages = 0;
mutex_unlock(&msg->page_mutex);