flush_set_callback_t flush_callback,
void *flush_callback_arg,
uint64_t max_bytes, uint64_t max_objects,
- uint64_t max_dirty, uint64_t target_dirty, double max_dirty_age)
+ uint64_t max_dirty, uint64_t target_dirty,
+ double max_dirty_age, bool block_writes_upfront)
: perfcounter(NULL),
cct(cct_), writeback_handler(wb), name(name), lock(l),
max_dirty(max_dirty), target_dirty(target_dirty),
max_size(max_bytes), max_objects(max_objects),
+ block_writes_upfront(block_writes_upfront),
flush_set_callback(flush_callback), flush_set_callback_arg(flush_callback_arg),
- flusher_stop(false), flusher_thread(this),
+ flusher_stop(false), flusher_thread(this), finisher(cct),
stat_clean(0), stat_zero(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0),
stat_error(0), stat_dirty_waiting(0)
{
this->max_dirty_age.set_from_double(max_dirty_age);
perf_start();
+ finisher.start();
}
ObjectCacher::~ObjectCacher()
{
+ finisher.stop();
perf_stop();
// we should be empty.
for (vector<hash_map<sobject_t, Object *> >::iterator i = objects.begin();
}
-int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock)
+int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
+ Context *onfreespace)
{
assert(lock.is_locked());
utime_t now = ceph_clock_now(cct);
}
}
- int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock);
-
+ int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock, onfreespace);
delete wr;
//verify_stats();
trim();
return r;
}
-
-// blocking wait for write.
-int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock)
+void ObjectCacher::C_WaitForWrite::finish(int r)
+{
+ Mutex::Locker l(m_oc->lock);
+ m_oc->maybe_wait_for_writeback(m_len);
+ m_onfinish->complete(r);
+}
+
+void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
{
assert(lock.is_locked());
- int blocked = 0;
utime_t start = ceph_clock_now(cct);
+ int blocked = 0;
+ // wait for writeback?
+ // - wait for dirty and tx bytes (relative to the max_dirty threshold)
+ // - do not wait for bytes other waiters are waiting on. this means that
+ // threads do not wait for each other. this effectively allows the cache
+ // size to balloon proportional to the data that is in flight.
+ while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) {
+ ldout(cct, 10) << __func__ << " waiting for dirty|tx "
+ << (get_stat_dirty() + get_stat_tx()) << " >= max "
+ << max_dirty << " + dirty_waiting "
+ << get_stat_dirty_waiting() << dendl;
+ flusher_cond.Signal();
+ stat_dirty_waiting += len;
+ stat_cond.Wait(lock);
+ stat_dirty_waiting -= len;
+ ++blocked;
+ ldout(cct, 10) << __func__ << " woke up" << dendl;
+ }
+ if (blocked && perfcounter) {
+ perfcounter->inc(l_objectcacher_write_ops_blocked);
+ perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
+ utime_t blocked = ceph_clock_now(cct) - start;
+ perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
+ }
+}
+
+// blocking wait for write.
+int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace)
+{
+ assert(lock.is_locked());
int ret = 0;
if (max_dirty > 0) {
- // wait for writeback?
- // - wait for dirty and tx bytes (relative to the max_dirty threshold)
- // - do not wait for bytes other waiters are waiting on. this means that
- // threads do not wait for each other. this effectively allows the cache size
- // to balloon proportional to the data that is in flight.
- while (get_stat_dirty() + get_stat_tx() >= max_dirty + get_stat_dirty_waiting()) {
- ldout(cct, 10) << "wait_for_write waiting on " << len << ", dirty|tx "
- << (get_stat_dirty() + get_stat_tx())
- << " >= max " << max_dirty << " + dirty_waiting " << get_stat_dirty_waiting()
- << dendl;
- flusher_cond.Signal();
- stat_dirty_waiting += len;
- stat_cond.Wait(lock);
- stat_dirty_waiting -= len;
- blocked++;
- ldout(cct, 10) << "wait_for_write woke up" << dendl;
+ if (block_writes_upfront) {
+ maybe_wait_for_writeback(len);
+ if (onfreespace)
+ onfreespace->complete(0);
+ } else {
+ assert(onfreespace);
+ finisher.queue(new C_WaitForWrite(this, len, onfreespace));
}
} else {
// write-thru! flush what we just wrote.
Cond cond;
bool done;
- C_Cond *fin = new C_Cond(&cond, &done, &ret);
+ Context *fin = block_writes_upfront ?
+ new C_Cond(&cond, &done, &ret) : onfreespace;
+ assert(fin);
bool flushed = flush_set(oset, wr->extents, fin);
assert(!flushed); // we just dirtied it, and didn't drop our lock!
ldout(cct, 10) << "wait_for_write waiting on write-thru of " << len << " bytes" << dendl;
- while (!done)
- cond.Wait(lock);
- ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
+ if (block_writes_upfront) {
+ while (!done)
+ cond.Wait(lock);
+ ldout(cct, 10) << "wait_for_write woke up, ret " << ret << dendl;
+ if (onfreespace)
+ onfreespace->complete(ret);
+ }
}
// start writeback anyway?
<< target_dirty << ", nudging flusher" << dendl;
flusher_cond.Signal();
}
- if (blocked && perfcounter) {
- perfcounter->inc(l_objectcacher_write_ops_blocked);
- perfcounter->inc(l_objectcacher_write_bytes_blocked, len);
- utime_t blocked = ceph_clock_now(cct) - start;
- perfcounter->tinc(l_objectcacher_write_time_blocked, blocked);
- }
return ret;
}
#include "include/xlist.h"
#include "common/Cond.h"
+#include "common/Finisher.h"
#include "common/Thread.h"
#include "Objecter.h"
int64_t max_dirty, target_dirty, max_size, max_objects;
utime_t max_dirty_age;
+ bool block_writes_upfront;
flush_set_callback_t flush_set_callback;
void *flush_set_callback_arg;
return 0;
}
} flusher_thread;
-
+
+ Finisher finisher;
// objects
Object *get_object_maybe(sobject_t oid, object_locator_t &l) {
}
};
+ class C_WaitForWrite : public Context {
+ public:
+ C_WaitForWrite(ObjectCacher *oc, uint64_t len, Context *onfinish) :
+ m_oc(oc), m_len(len), m_onfinish(onfinish) {}
+ void finish(int r);
+ private:
+ ObjectCacher *m_oc;
+ uint64_t m_len;
+ Context *m_onfinish;
+ };
+
void perf_start();
void perf_stop();
flush_set_callback_t flush_callback,
void *flush_callback_arg,
uint64_t max_bytes, uint64_t max_objects,
- uint64_t max_dirty, uint64_t target_dirty, double max_age);
+ uint64_t max_dirty, uint64_t target_dirty, double max_age,
+ bool block_writes_upfront);
~ObjectCacher();
void start() {
* the return value is total bytes read
*/
int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
- int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock);
+ int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
+ Context *onfreespace);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);
private:
// write blocking
- int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock);
-
+ int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock,
+ Context *onfreespace);
+ void maybe_wait_for_writeback(uint64_t len);
+
public:
bool set_is_cached(ObjectSet *oset);
bool set_is_dirty_or_committing(ObjectSet *oset);
Mutex& wait_on_lock) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
Striper::file_to_extents(cct, oset->ino, layout, offset, len, wr->extents);
- return writex(wr, oset, wait_on_lock);
+ return writex(wr, oset, wait_on_lock, NULL);
}
bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,