From: David Zafman Date: Tue, 11 Aug 2015 19:51:39 +0000 (-0700) Subject: osd: Send reads to other shards if erasure coded chunk reads fail X-Git-Tag: v9.1.0~244^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c09c1192dbe20e190d91efd3f58dd3951b856987;p=ceph.git osd: Send reads to other shards if erasure coded chunk reads fail Handle errors in a common way whether redundant reads or not Signed-off-by: David Zafman --- diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 15c7666d9742..47594cb93288 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -472,7 +472,7 @@ void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) priority, m.reads, OpRequestRef(), - false); + false, true); } void ECBackend::continue_recovery_op( @@ -1044,8 +1044,6 @@ dout(0) << __func__ << " to_read skipping" << dendl; from, i->second)); dout(0) << __func__ << " shard=" << from << " error=" << i->second << dendl; - if (!rop.do_redundant_reads && rop.complete[i->first].r == 0) - rop.complete[i->first].r = i->second; } map >::iterator siter = @@ -1056,8 +1054,10 @@ dout(0) << __func__ << " shard=" << from << " error=" << i->second << dendl; assert(rop.in_progress.count(from)); rop.in_progress.erase(from); - bool is_complete = rop.in_progress.empty(); - if (rop.do_redundant_reads) { + unsigned is_complete = 0; + // For redundant reads check for completion as each shard comes in, + // or in a non-recovery read check for completion once all the shards read. + if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) { for (map::const_iterator iter = rop.complete.begin(); iter != rop.complete.end(); @@ -1073,9 +1073,20 @@ dout(0) << __func__ << " have shard=" << j->first.shard << dendl; set want_to_read, dummy_minimum; get_want_to_read_shards(&want_to_read); int err; + // XXX: Could just do if (have.size < ec_impl->get_data_chunk_count()) if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) { dout(0) << __func__ << " minimum_to_decode failed" << dendl; - if (is_complete) { + if (rop.in_progress.empty()) { + // If we don't have enough copies and we haven't sent reads for all shards + // we can send the rest of the reads, if any. + if (!rop.do_redundant_reads) { + int r = objects_remaining_read_async(iter->first, rop); + if (r == 0) { + // We added to in_progress and not incrementing is_complete + continue; + } + // Couldn't read any additional shards so handle as completed with errors + } if (rop.complete[iter->first].errors.empty()) { dout(0) << __func__ << " simply not enough copies err=" << err << dendl; } else { @@ -1084,17 +1095,17 @@ dout(0) << __func__ << " simply not enough copies err=" << err << dendl; dout(0) << __func__ << ": Use one of the shard errors err=" << err << dendl; } rop.complete[iter->first].r = err; + ++is_complete; } - break; } else { -dout(0) << __func__ << " Enough copies have come in ignore errors" << dendl; - is_complete = true; +dout(0) << __func__ << " Enough copies for " << iter->first << " (ignore errors)" << dendl; + ++is_complete; rop.complete[iter->first].errors.clear(); assert(rop.complete[iter->first].r == 0); } } } - if (is_complete) { + if (rop.in_progress.empty() || is_complete == rop.complete.size()) { dout(0) << __func__ << " Complete: " << rop << dendl; complete_read_op(rop, m); } else { @@ -1470,11 +1481,50 @@ int ECBackend::get_min_avail_to_read_shards( return 0; } +int ECBackend::get_remaining_shards( + const hobject_t &hoid, + const set &avail, + set *to_read) +{ + map >::const_iterator miter = + get_parent()->get_missing_loc_shards().find(hoid); + + set need; + map shards; + + for (set::const_iterator i = + get_parent()->get_acting_shards().begin(); + i != get_parent()->get_acting_shards().end(); + ++i) { + dout(10) << __func__ << ": checking acting " << *i << dendl; + const pg_missing_t &missing = get_parent()->get_shard_missing(*i); + if (!missing.is_missing(hoid)) { + assert(!need.count(i->shard)); + need.insert(i->shard); + assert(!shards.count(i->shard)); + shards.insert(make_pair(i->shard, *i)); + } + } + + if (!to_read) + return 0; + + for (set::iterator i = need.begin(); + i != need.end(); + ++i) { + assert(shards.count(shard_id_t(*i))); + if (avail.find(*i) == avail.end()) + to_read->insert(shards[shard_id_t(*i)]); + } + return 0; +} + void ECBackend::start_read_op( int priority, map &to_read, OpRequestRef _op, - bool do_redundant_reads) + bool do_redundant_reads, + bool for_recovery) { ceph_tid_t tid = get_parent()->get_tid(); assert(!tid_to_read_map.count(tid)); @@ -1484,6 +1534,7 @@ void ECBackend::start_read_op( op.to_read.swap(to_read); op.op = _op; op.do_redundant_reads = do_redundant_reads; + op.for_recovery = for_recovery; dout(10) << __func__ << ": starting " << op << dendl; map messages; @@ -1549,6 +1600,71 @@ void ECBackend::start_read_op( dout(10) << __func__ << ": started " << op << dendl; } +void ECBackend::start_remaining_read_op( + ReadOp &op, + map &to_read) +{ + int priority = op.priority; + ceph_tid_t tid = op.tid; + op.to_read.swap(to_read); + + dout(10) << __func__ << ": starting additional " << op << dendl; + + map messages; + for (map::iterator i = op.to_read.begin(); + i != op.to_read.end(); + ++i) { + bool need_attrs = i->second.want_attrs; + for (set::const_iterator j = i->second.need.begin(); + j != i->second.need.end(); + ++j) { + if (need_attrs) { + messages[*j].attrs_to_read.insert(i->first); + need_attrs = false; + } + op.obj_to_source[i->first].insert(*j); + op.source_to_obj[*j].insert(i->first); + } + for (list >::const_iterator j = + i->second.to_read.begin(); + j != i->second.to_read.end(); + ++j) { + pair chunk_off_len = + sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>())); + for (set::const_iterator k = i->second.need.begin(); + k != i->second.need.end(); + ++k) { + messages[*k].to_read[i->first].push_back(boost::make_tuple(chunk_off_len.first, + chunk_off_len.second, + j->get<2>())); + } + assert(!need_attrs); + } + } + + for (map::iterator i = messages.begin(); + i != messages.end(); + ++i) { + op.in_progress.insert(i->first); + shard_to_read_map[i->first].insert(op.tid); + i->second.tid = tid; + MOSDECSubOpRead *msg = new MOSDECSubOpRead; + msg->set_priority(priority); + msg->pgid = spg_t( + get_parent()->whoami_spg_t().pgid, + i->first.shard); + msg->map_epoch = get_parent()->get_epoch(); + msg->op = i->second; + msg->op.from = get_parent()->whoami_shard(); + msg->op.tid = tid; + get_parent()->send_message_osd_cluster( + i->first.osd, + msg, + get_parent()->get_epoch()); + } + dout(10) << __func__ << ": started additional " << op << dendl; +} + ECUtil::HashInfoRef ECBackend::get_hash_info( const hobject_t &hoid, bool checks) { @@ -1820,11 +1936,47 @@ void ECBackend::objects_read_async( cct->_conf->osd_client_op_priority, for_read_op, OpRequestRef(), - fast_read); + fast_read, false); return; } +int ECBackend::objects_remaining_read_async( + const hobject_t &hoid, + ReadOp &rop) +{ + set already_read; + set ots = rop.obj_to_source[hoid]; + for (set::iterator i = ots.begin(); i != ots.end(); ++i) + already_read.insert(i->shard); + dout(10) << __func__ << " have/error shards=" << already_read << dendl; + set shards; + int r = get_remaining_shards(hoid, already_read, &shards); + if (r) + return r; + if (shards.empty()) + return -EIO; + + dout(10) << __func__ << " Read remaining shards " << shards << dendl; + + list > offsets = rop.to_read.find(hoid)->second.to_read; + GenContext &> *c = rop.to_read.find(hoid)->second.cb; + + map for_read_op; + for_read_op.insert( + make_pair( + hoid, + read_request_t( + hoid, + offsets, + shards, + false, + c))); + + start_remaining_read_op(rop, for_read_op); + return 0; +} + int ECBackend::objects_get_attrs( const hobject_t &hoid, map *out) diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index f416e30f3f8d..a039b70c8a8d 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -297,6 +297,9 @@ public: // this is useful to tradeoff some resources (redundant ops) for // low latency read, especially on relatively idle cluster bool do_redundant_reads; + // True if reading for recovery which could possibly reading only a subset + // of the available shards. + bool for_recovery; map to_read; map complete; @@ -320,7 +323,13 @@ public: int priority, map &to_read, OpRequestRef op, - bool do_redundant_reads); + bool do_redundant_reads, bool for_recovery); + + void start_remaining_read_op(ReadOp &rop, + map &to_read); + int objects_remaining_read_async( + const hobject_t &hoid, + ReadOp &rop); /** @@ -470,6 +479,11 @@ public: set *to_read ///< [out] shards to read ); ///< @return error code, 0 on success + int get_remaining_shards( + const hobject_t &hoid, + const set &avail, + set *to_read); + int objects_get_attrs( const hobject_t &hoid, map *out);