publish_stats_to_osd();
assert(missing_loc.needs_recovery(hoid));
missing_loc.add_location(hoid, pg_whoami);
+ release_backoffs(hoid);
if (!is_unreadable_object(hoid)) {
auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
if (unreadable_object_entry != waiting_for_unreadable_object.end()) {
recovering.erase(i);
finish_recovery_op(soid);
+ release_backoffs(soid);
auto degraded_object_entry = waiting_for_degraded_object.find(soid);
if (degraded_object_entry != waiting_for_degraded_object.end()) {
dout(20) << " kicking degraded waiters on " << soid << dendl;
const hobject_t& soid, OpRequestRef op)
{
assert(is_unreadable_object(soid));
-
maybe_kick_recovery(soid);
waiting_for_unreadable_object[soid].push_back(op);
op->mark_delayed("waiting for missing object");
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
+ case CEPH_MSG_OSD_BACKOFF:
if (!is_active()) {
dout(20) << " peered, not active, waiting for active on " << op << dendl;
waiting_for_active.push_back(op);
op->mark_delayed("waiting for active");
return;
}
- // verify client features
- if ((pool.info.has_tiers() || pool.info.is_tier()) &&
- !op->has_feature(CEPH_FEATURE_OSD_CACHEPOOL)) {
- osd->reply_op_error(op, -EOPNOTSUPP);
- return;
+ switch (op->get_req()->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ // verify client features
+ if ((pool.info.has_tiers() || pool.info.is_tier()) &&
+ !op->has_feature(CEPH_FEATURE_OSD_CACHEPOOL)) {
+ osd->reply_op_error(op, -EOPNOTSUPP);
+ return;
+ }
+ do_op(op);
+ break;
+ case CEPH_MSG_OSD_BACKOFF:
+ // object-level backoff acks handled in osdop context
+ handle_backoff(op);
+ break;
}
- do_op(op); // do it now
break;
case MSG_OSD_SUBOP:
dout(20) << __func__ << ": op " << *m << dendl;
+ hobject_t head(m->get_oid(), m->get_object_locator().key,
+ CEPH_NOSNAP, m->get_pg().ps(),
+ info.pgid.pool(), m->get_object_locator().nspace);
+
+ bool can_backoff =
+ m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
+ SessionRef session;
+ if (can_backoff) {
+ session = ((Session *)m->get_connection()->get_priv());
+ if (!session.get()) {
+ dout(10) << __func__ << " no session" << dendl;
+ return;
+ }
+ session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
+
+ Backoff *b = session->have_backoff(head);
+ if (b) {
+ dout(10) << __func__ << " have backoff " << *b << " " << *m << dendl;
+ assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff);
+ return;
+ }
+ }
+
if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
// not implemented.
dout(20) << __func__ << ": PARALLELEXEC not implemented " << *m << dendl;
return;
}
- hobject_t head(m->get_oid(), m->get_object_locator().key,
- CEPH_NOSNAP, m->get_pg().ps(),
- info.pgid.pool(), m->get_object_locator().nspace);
-
// object name too long?
if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {
dout(4) << "do_op name is longer than "
<< " flags " << ceph_osd_flag_string(m->get_flags())
<< dendl;
- if (write_ordered &&
- scrubber.write_blocked_by_scrub(head, get_sort_bitwise())) {
- dout(20) << __func__ << ": waiting for scrub" << dendl;
- waiting_for_scrub.push_back(op);
- op->mark_delayed("waiting for scrub");
- return;
- }
-
// missing object?
if (is_unreadable_object(head)) {
- wait_for_unreadable_object(head, op);
+ if (can_backoff &&
+ (g_conf->osd_recovery_aggressive_backoff ||
+ missing_loc.is_unfound(head))) {
+ add_backoff(session, head, head);
+ maybe_kick_recovery(head);
+ } else {
+ wait_for_unreadable_object(head, op);
+ }
return;
}
// degraded object?
if (write_ordered && is_degraded_or_backfilling_object(head)) {
- wait_for_degraded_object(head, op);
+ if (can_backoff && g_conf->osd_recovery_aggressive_backoff) {
+ add_backoff(session, head, head);
+ } else {
+ wait_for_degraded_object(head, op);
+ }
+ return;
+ }
+
+ if (write_ordered &&
+ scrubber.write_blocked_by_scrub(head, get_sort_bitwise())) {
+ dout(20) << __func__ << ": waiting for scrub" << dendl;
+ waiting_for_scrub.push_back(op);
+ op->mark_delayed("waiting for scrub");
return;
}
[=]() {
requeue_ops(waiting_for_all_missing);
waiting_for_all_missing.clear();
+ for (auto& p : waiting_for_unreadable_object) {
+ release_backoffs(p.first);
+ }
requeue_object_waiters(waiting_for_unreadable_object);
queue_recovery();
clear_scrub_reserved();
- // requeues waiting_for_scrub
- scrub_clear_state();
-
cancel_copy_ops(is_primary());
cancel_flush_ops(is_primary());
cancel_proxy_ops(is_primary());
// requeue object waiters
+ for (auto& p : waiting_for_unreadable_object) {
+ release_backoffs(p.first);
+ }
if (is_primary()) {
requeue_object_waiters(waiting_for_unreadable_object);
} else {
for (map<hobject_t,list<OpRequestRef>, hobject_t::BitwiseComparator>::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
waiting_for_degraded_object.erase(p++)) {
+ release_backoffs(p->first);
if (is_primary())
requeue_ops(p->second);
else
p->second.clear();
finish_degraded_object(p->first);
}
+
+ // requeues waiting_for_scrub
+ scrub_clear_state();
+
for (map<hobject_t,list<OpRequestRef>, hobject_t::BitwiseComparator>::iterator p = waiting_for_blocked_object.begin();
p != waiting_for_blocked_object.end();
waiting_for_blocked_object.erase(p++)) {
}
objects_blocked_on_cache_full.clear();
-
for (list<pair<OpRequestRef, OpContext*> >::iterator i =
in_progress_async_reads.begin();
i != in_progress_async_reads.end();
}
recovering.erase(soid);
finish_recovery_op(soid);
+ release_backoffs(soid);
if (waiting_for_degraded_object.count(soid)) {
dout(20) << " kicking degraded waiters on " << soid << dendl;
requeue_ops(waiting_for_degraded_object[soid]);