}
}
- ls.splice(ls.end(), waitfor_read);
- waitfor_read.clear();
-
// apply to bh's!
loff_t opos = start;
while (true) {
ldout(cct, 20) << "finishing waiters " << ls << dendl;
finish_contexts(cct, ls, err);
+ retry_waiting_reads();
+
--reads_outstanding;
read_cond.Signal();
}
for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
++bh_it) {
- loff_t clean = get_stat_clean() + get_stat_rx() +
- bh_it->second->length();
- if (get_stat_rx() > 0 && static_cast<uint64_t>(clean) > max_size) {
- // cache is full -- wait for rx's to complete
- ldout(cct, 10) << "readx missed, waiting on cache to free "
- << (clean - max_size) << " bytes" << dendl;
- if (success) {
- waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
- }
- bh_remove(o, bh_it->second);
- delete bh_it->second;
- } else {
- bh_read(bh_it->second);
- if (success && onfinish) {
- ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
- << " off " << bh_it->first << dendl;
+ uint64_t rx_bytes = static_cast<uint64_t>(
+ stat_rx + bh_it->second->length());
+ if (!waitfor_read.empty() || rx_bytes > max_size) {
+ // cache is full with concurrent reads -- wait for rx's to complete
+ // to constrain memory growth (especially during copy-ups)
+ if (success) {
+ ldout(cct, 10) << "readx missed, waiting on cache to complete "
+ << waitfor_read.size() << " blocked reads, "
+ << (MAX(rx_bytes, max_size) - max_size)
+ << " read bytes" << dendl;
+ waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
+ }
+
+ bh_remove(o, bh_it->second);
+ delete bh_it->second;
+ } else {
+ bh_read(bh_it->second);
+ if (success && onfinish) {
+ ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
+ << " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
- }
- bytes_not_in_cache += bh_it->second->length();
- }
+ }
+ }
+ bytes_not_in_cache += bh_it->second->length();
success = false;
}
return ret;
}
+void ObjectCacher::retry_waiting_reads()
+{
+ list<Context *> ls;
+ ls.swap(waitfor_read);
+
+ while (!ls.empty() && waitfor_read.empty()) {
+ Context *ctx = ls.front();
+ ls.pop_front();
+ ctx->complete(0);
+ }
+ waitfor_read.splice(waitfor_read.end(), ls);
+}
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace)