priority,
m.reads,
OpRequestRef(),
- false);
+ false, true);
}
void ECBackend::continue_recovery_op(
from,
i->second));
dout(0) << __func__ << " shard=" << from << " error=" << i->second << dendl;
- if (!rop.do_redundant_reads && rop.complete[i->first].r == 0)
- rop.complete[i->first].r = i->second;
}
map<pg_shard_t, set<ceph_tid_t> >::iterator siter =
assert(rop.in_progress.count(from));
rop.in_progress.erase(from);
- bool is_complete = rop.in_progress.empty();
- if (rop.do_redundant_reads) {
+ unsigned is_complete = 0;
+ // For redundant reads check for completion as each shard comes in,
+ // or in a non-recovery read check for completion once all the shards read.
+ if (rop.do_redundant_reads || (!rop.for_recovery && rop.in_progress.empty())) {
for (map<hobject_t, read_result_t>::const_iterator iter =
rop.complete.begin();
iter != rop.complete.end();
set<int> want_to_read, dummy_minimum;
get_want_to_read_shards(&want_to_read);
int err;
+ // XXX: Could just do if (have.size < ec_impl->get_data_chunk_count())
if ((err = ec_impl->minimum_to_decode(want_to_read, have, &dummy_minimum)) < 0) {
dout(0) << __func__ << " minimum_to_decode failed" << dendl;
- if (is_complete) {
+ if (rop.in_progress.empty()) {
+ // If we don't have enough copies and we haven't sent reads for all shards
+ // we can send the rest of the reads, if any.
+ if (!rop.do_redundant_reads) {
+ int r = objects_remaining_read_async(iter->first, rop);
+ if (r == 0) {
+ // We added to in_progress and not incrementing is_complete
+ continue;
+ }
+ // Couldn't read any additional shards so handle as completed with errors
+ }
if (rop.complete[iter->first].errors.empty()) {
dout(0) << __func__ << " simply not enough copies err=" << err << dendl;
} else {
dout(0) << __func__ << ": Use one of the shard errors err=" << err << dendl;
}
rop.complete[iter->first].r = err;
+ ++is_complete;
}
- break;
} else {
-dout(0) << __func__ << " Enough copies have come in ignore errors" << dendl;
- is_complete = true;
+dout(0) << __func__ << " Enough copies for " << iter->first << " (ignore errors)" << dendl;
+ ++is_complete;
rop.complete[iter->first].errors.clear();
assert(rop.complete[iter->first].r == 0);
}
}
}
- if (is_complete) {
+ if (rop.in_progress.empty() || is_complete == rop.complete.size()) {
dout(0) << __func__ << " Complete: " << rop << dendl;
complete_read_op(rop, m);
} else {
return 0;
}
+int ECBackend::get_remaining_shards(
+ const hobject_t &hoid,
+ const set<int> &avail,
+ set<pg_shard_t> *to_read)
+{
+ map<hobject_t, set<pg_shard_t> >::const_iterator miter =
+ get_parent()->get_missing_loc_shards().find(hoid);
+
+ set<int> need;
+ map<shard_id_t, pg_shard_t> shards;
+
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_shards().begin();
+ i != get_parent()->get_acting_shards().end();
+ ++i) {
+ dout(10) << __func__ << ": checking acting " << *i << dendl;
+ const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+ if (!missing.is_missing(hoid)) {
+ assert(!need.count(i->shard));
+ need.insert(i->shard);
+ assert(!shards.count(i->shard));
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+
+ if (!to_read)
+ return 0;
+
+ for (set<int>::iterator i = need.begin();
+ i != need.end();
+ ++i) {
+ assert(shards.count(shard_id_t(*i)));
+ if (avail.find(*i) == avail.end())
+ to_read->insert(shards[shard_id_t(*i)]);
+ }
+ return 0;
+}
+
void ECBackend::start_read_op(
int priority,
map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read,
OpRequestRef _op,
- bool do_redundant_reads)
+ bool do_redundant_reads,
+ bool for_recovery)
{
ceph_tid_t tid = get_parent()->get_tid();
assert(!tid_to_read_map.count(tid));
op.to_read.swap(to_read);
op.op = _op;
op.do_redundant_reads = do_redundant_reads;
+ op.for_recovery = for_recovery;
dout(10) << __func__ << ": starting " << op << dendl;
map<pg_shard_t, ECSubRead> messages;
dout(10) << __func__ << ": started " << op << dendl;
}
+void ECBackend::start_remaining_read_op(
+ ReadOp &op,
+ map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read)
+{
+ int priority = op.priority;
+ ceph_tid_t tid = op.tid;
+ op.to_read.swap(to_read);
+
+ dout(10) << __func__ << ": starting additional " << op << dendl;
+
+ map<pg_shard_t, ECSubRead> messages;
+ for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
+ i != op.to_read.end();
+ ++i) {
+ bool need_attrs = i->second.want_attrs;
+ for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
+ j != i->second.need.end();
+ ++j) {
+ if (need_attrs) {
+ messages[*j].attrs_to_read.insert(i->first);
+ need_attrs = false;
+ }
+ op.obj_to_source[i->first].insert(*j);
+ op.source_to_obj[*j].insert(i->first);
+ }
+ for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::const_iterator j =
+ i->second.to_read.begin();
+ j != i->second.to_read.end();
+ ++j) {
+ pair<uint64_t, uint64_t> chunk_off_len =
+ sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
+ for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
+ k != i->second.need.end();
+ ++k) {
+ messages[*k].to_read[i->first].push_back(boost::make_tuple(chunk_off_len.first,
+ chunk_off_len.second,
+ j->get<2>()));
+ }
+ assert(!need_attrs);
+ }
+ }
+
+ for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
+ i != messages.end();
+ ++i) {
+ op.in_progress.insert(i->first);
+ shard_to_read_map[i->first].insert(op.tid);
+ i->second.tid = tid;
+ MOSDECSubOpRead *msg = new MOSDECSubOpRead;
+ msg->set_priority(priority);
+ msg->pgid = spg_t(
+ get_parent()->whoami_spg_t().pgid,
+ i->first.shard);
+ msg->map_epoch = get_parent()->get_epoch();
+ msg->op = i->second;
+ msg->op.from = get_parent()->whoami_shard();
+ msg->op.tid = tid;
+ get_parent()->send_message_osd_cluster(
+ i->first.osd,
+ msg,
+ get_parent()->get_epoch());
+ }
+ dout(10) << __func__ << ": started additional " << op << dendl;
+}
+
ECUtil::HashInfoRef ECBackend::get_hash_info(
const hobject_t &hoid, bool checks)
{
cct->_conf->osd_client_op_priority,
for_read_op,
OpRequestRef(),
- fast_read);
+ fast_read, false);
return;
}
+int ECBackend::objects_remaining_read_async(
+ const hobject_t &hoid,
+ ReadOp &rop)
+{
+ set<int> already_read;
+ set<pg_shard_t> ots = rop.obj_to_source[hoid];
+ for (set<pg_shard_t>::iterator i = ots.begin(); i != ots.end(); ++i)
+ already_read.insert(i->shard);
+ dout(10) << __func__ << " have/error shards=" << already_read << dendl;
+ set<pg_shard_t> shards;
+ int r = get_remaining_shards(hoid, already_read, &shards);
+ if (r)
+ return r;
+ if (shards.empty())
+ return -EIO;
+
+ dout(10) << __func__ << " Read remaining shards " << shards << dendl;
+
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets = rop.to_read.find(hoid)->second.to_read;
+ GenContext<pair<RecoveryMessages *, read_result_t& > &> *c = rop.to_read.find(hoid)->second.cb;
+
+ map<hobject_t, read_request_t, hobject_t::BitwiseComparator> for_read_op;
+ for_read_op.insert(
+ make_pair(
+ hoid,
+ read_request_t(
+ hoid,
+ offsets,
+ shards,
+ false,
+ c)));
+
+ start_remaining_read_op(rop, for_read_op);
+ return 0;
+}
+
int ECBackend::objects_get_attrs(
const hobject_t &hoid,
map<string, bufferlist> *out)