get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
- // wait? (this may block!)
- objectcacher->wait_for_write(size, client_lock);
-
// async, caching, non-blocking.
objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
offset, size, bl, ceph_clock_now(cct), 0);
+ // wait? (this may block!)
+ objectcacher->wait_for_write(size, client_lock);
+
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
} else {
// simple, non-atomic sync write
cct(cct_), writeback_handler(wb), name(name), lock(l),
flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
flusher_stop(false), flusher_thread(this),
- stat_waiter(0),
- stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0)
+ stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0), stat_dirty_waiting(0)
{
perf_start();
}
utime_t start = ceph_clock_now(cct);
// wait for writeback?
- while (get_stat_dirty() + get_stat_tx() >= conf->client_oc_max_dirty) {
+ // - wait for dirty and tx bytes (relative to the max_dirty threshold)
+ // - do not wait for bytes other waiters are waiting on. this means that
+ // threads do not wait for each other. this effectively allows the cache size
+ // to balloon proportional to the data that is in flight.
+ while (get_stat_dirty() + get_stat_tx() >= conf->client_oc_max_dirty + get_stat_dirty_waiting()) {
ldout(cct, 10) << "wait_for_write waiting on " << len << ", dirty|tx "
- << (get_stat_dirty() + get_stat_tx())
- << " >= " << conf->client_oc_max_dirty
- << dendl;
+ << (get_stat_dirty() + get_stat_tx())
+ << " >= max " << conf->client_oc_max_dirty << " + dirty_waiting " << get_stat_dirty_waiting()
+ << dendl;
flusher_cond.Signal();
- stat_waiter++;
+ stat_dirty_waiting += len;
stat_cond.Wait(lock);
- stat_waiter--;
+ stat_dirty_waiting -= len;
blocked++;
ldout(cct, 10) << "wait_for_write woke up" << dendl;
}
// start writeback anyway?
if (get_stat_dirty() > conf->client_oc_target_dirty) {
ldout(cct, 10) << "wait_for_write " << get_stat_dirty() << " > target "
- << conf->client_oc_target_dirty << ", nudging flusher" << dendl;
+ << conf->client_oc_target_dirty << ", nudging flusher" << dendl;
flusher_cond.Signal();
}
if (blocked && perfcounter) {
<< conf->client_oc_target_dirty << " target, "
<< conf->client_oc_max_dirty << " max)"
<< dendl;
- if (get_stat_dirty() > conf->client_oc_target_dirty) {
+ if (get_stat_dirty() + get_stat_dirty_waiting() > conf->client_oc_target_dirty) {
// flush some dirty pages
ldout(cct, 10) << "flusher "
- << get_stat_dirty() << " dirty > target "
- << conf->client_oc_target_dirty
- << ", flushing some dirty bhs" << dendl;
- flush(get_stat_dirty() - conf->client_oc_target_dirty);
+ << get_stat_dirty() << " dirty + " << get_stat_dirty_waiting()
+ << " dirty_waiting > target "
+ << conf->client_oc_target_dirty
+ << ", flushing some dirty bhs" << dendl;
+ flush(get_stat_dirty() + get_stat_dirty_waiting() - conf->client_oc_target_dirty);
}
else {
// check tail of lru for old dirty items
stat_rx += bh->length();
break;
}
- if (stat_waiter) stat_cond.Signal();
+ if (get_stat_dirty_waiting() > 0)
+ stat_cond.Signal();
}
void ObjectCacher::bh_stat_sub(BufferHead *bh)