From 46cfda7b8245d4f44d1194943cccb379840f3aa8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 19 Sep 2011 18:23:10 -0700 Subject: [PATCH] osd: preserve ordering when throttling races with missing/degraded requeue When we delay an op because the op_queue is full, we can violate the op order: - op1 comes in, waits because object is missing - op2 comes in, throttles on op queue - op1 is requeued (no longer missing) - queue drains, op2 happens - op1 happens To avoid this, if we delay, requeue ourselves... after whatever else is on the queue. Fixes: #1490 Signed-off-by: Sage Weil --- src/osd/OSD.cc | 18 ++++++++++++++---- src/osd/OSD.h | 7 ++++++- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 51abe07924a8e..8a34acf28ec01 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4982,7 +4982,8 @@ void OSD::handle_op(MOSDOp *op) (Session *)op->get_connection()->get_priv()); // ... - throttle_op_queue(); + if (throttle_op_queue(op)) + return; int r = init_op_flags(op); if (r) { @@ -5165,7 +5166,8 @@ void OSD::handle_sub_op(MOSDSubOp *op) return; } - throttle_op_queue(); + if (throttle_op_queue(op)) + return; PG *pg = _lookup_lock_pg(pgid); @@ -5305,14 +5307,22 @@ void OSD::dequeue_op(PG *pg) -void OSD::throttle_op_queue() +bool OSD::throttle_op_queue(Message *op) { // throttle? FIXME PROBABLY! while (pending_ops > g_conf->osd_max_opq) { - dout(10) << "enqueue_op waiting for pending_ops " << pending_ops + dout(10) << "throttle_op_queue waiting for pending_ops " << pending_ops << " to drop to " << g_conf->osd_max_opq << dendl; op_queue_cond.Wait(osd_lock); + + // while we waited, something may have been requeued (e.g., + // because it was on waiting_for_{missing,degraded} list). we + // need to queue ourselves too so we don't sneak in ahead of them + // and violate the ordering. + take_waiter(op); + return true; } + return false; } void OSD::wait_for_no_ops() diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 9f54484da21e2..7eb7afec59e3b 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -303,6 +303,11 @@ private: finished.splice(finished.end(), ls); finished_lock.Unlock(); } + void take_waiter(Message *o) { + finished_lock.Lock(); + finished.push_back(o); + finished_lock.Unlock(); + } void push_waiters(list& ls) { assert(osd_lock.is_locked()); // currently, at least. be careful if we change this (see #743) finished_lock.Lock(); @@ -351,7 +356,7 @@ private: Cond op_queue_cond; void wait_for_no_ops(); - void throttle_op_queue(); + bool throttle_op_queue(Message *op); void enqueue_op(PG *pg, Message *op); void dequeue_op(PG *pg); static void static_dequeueop(OSD *o, PG *pg) { -- 2.39.5