if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(req);
} else {
- c->start_op();
process_io(req, false);
finish_in_flight_io();
}
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(req);
} else {
- c->start_op();
process_io(req, false);
finish_in_flight_io();
}
if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
queue(req);
} else {
- c->start_op();
process_io(req, false);
finish_in_flight_io();
}
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(req);
} else {
- c->start_op();
process_io(req, false);
finish_in_flight_io();
}
if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(req);
} else {
- c->start_op();
process_io(req, false);
finish_in_flight_io();
}
if (!m_blocked_ios.empty()) {
auto it = m_blocked_ios.begin();
while (it != m_blocked_ios.end()) {
- auto next_blocked_object_ios_it = it;
- ++next_blocked_object_ios_it;
auto blocked_io = *it;
- if (block_overlapping_io(&m_in_flight_extents, offset, length)) {
+ const auto& extents = blocked_io->get_image_extents();
+ uint64_t off = extents.size() ? extents.front().first : 0;
+ uint64_t len = extents.size() ? extents.front().second : 0;
+
+ if (block_overlapping_io(&m_in_flight_extents, off, len)) {
break;
}
- ldout(cct, 20) << "unblocking off: " << offset << ", "
- << "len: " << length << dendl;
+ ldout(cct, 20) << "unblocking off: " << off << ", "
+ << "len: " << len << dendl;
AioCompletion *aio_comp = blocked_io->get_aio_completion();
m_blocked_ios.erase(it);
}
template <typename I>
-void ImageRequestWQ<I>::unblock_flushes(uint64_t tid) {
+void ImageRequestWQ<I>::unblock_flushes() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
std::unique_lock locker{m_lock};
return nullptr;
}
- if (!lock_required && !refresh_required) {
+ if (!lock_required && !refresh_required && !peek_item->blocked) {
// completed ops will requeue the IO -- don't count it as in-progress
m_in_flight_writes++;
}
return nullptr;
}
- item->start_op();
return item;
}
const auto& extents = req->get_image_extents();
bool write_op = req->is_write_op();
uint64_t tid = req->get_tid();
- uint64_t offset = 0;
- uint64_t length = 0;
+ uint64_t offset = extents.size() ? extents.front().first : 0;
+ uint64_t length = extents.size() ? extents.front().second : 0;
- if (write_op) {
+ if (write_op && !req->blocked) {
std::lock_guard locker{m_lock};
- offset = extents.size() ? extents.front().first : 0;
- length = extents.size() ? extents.front().second : 0;
bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
if (blocked) {
ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
<< &m_image_ctx << ", "
<< "off=" << offset << ", len=" << length << dendl;
+ req->blocked = true;
m_blocked_ios.push_back(req);
- --m_in_flight_ios;
return;
}
}
+ req->start_op();
req->send();
if (write_op) {
finish_in_flight_write();
}
unblock_overlapping_io(offset, length, tid);
- unblock_flushes(tid);
+ unblock_flushes();
}
delete req;
}
if(!m_in_flight_extents.empty()) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << "erasing in flight extents with tid:"
- << tid << dendl;
+ << tid << ", offset: " << offset << dendl;
ImageExtentIntervals extents;
extents.insert(offset, length);
ImageExtentIntervals intersect;
struct ImageDispatchSpec<librbd::MockTestImageCtx> {
static ImageDispatchSpec* s_instance;
AioCompletion *aio_comp = nullptr;
+ bool blocked = false;
static ImageDispatchSpec* create_write_request(
librbd::MockTestImageCtx &image_ctx, AioCompletion *aio_comp,
expect_is_refresh_request(mock_image_ctx, false);
expect_is_write_op(mock_queued_image_request, true);
expect_dequeue(mock_image_request_wq, &mock_queued_image_request);
- expect_start_op(mock_queued_image_request);
ASSERT_TRUE(mock_image_request_wq.invoke_dequeue() == &mock_queued_image_request);
}