{
perf_start();
finisher.start();
+ scattered_write = writeback_handler.can_scattered_write();
}
ObjectCacher::~ObjectCacher()
read_cond.Signal();
}
+void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+ int64_t *max_amount, int *max_count)
+{
+ list<BufferHead*> blist;
+
+ int count = 0;
+ int64_t total_len = 0;
+ set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
+ assert(it != dirty_or_tx_bh.end());
+ for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
+ p != dirty_or_tx_bh.end();
+ ++p) {
+ BufferHead *obh = *p;
+ if (obh->ob != bh->ob)
+ break;
+ if (obh->is_dirty() && obh->last_write < cutoff) {
+ blist.push_back(obh);
+ ++count;
+ total_len += obh->length();
+ if ((max_count && count > *max_count) ||
+ (max_amount && total_len > *max_amount))
+ break;
+ }
+ }
+
+ while (it != dirty_or_tx_bh.begin()) {
+ --it;
+ BufferHead *obh = *it;
+ if (obh->ob != bh->ob)
+ break;
+ if (obh->is_dirty() && obh->last_write < cutoff) {
+ blist.push_front(obh);
+ ++count;
+ total_len += obh->length();
+ if ((max_count && count > *max_count) ||
+ (max_amount && total_len > *max_amount))
+ break;
+ }
+ }
+ if (max_count)
+ *max_count -= count;
+ if (max_amount)
+ *max_amount -= total_len;
+
+ bh_write_scattered(blist);
+}
+
+void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
+{
+ assert(lock.is_locked());
+
+ Object *ob = blist.front()->ob;
+ ob->get();
+
+ ceph::real_time last_write;
+ SnapContext snapc;
+ vector<pair<loff_t, uint64_t> > ranges;
+ vector<pair<uint64_t, bufferlist> > io_vec;
+
+ ranges.reserve(blist.size());
+ io_vec.reserve(blist.size());
+
+ uint64_t total_len = 0;
+ for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+ BufferHead *bh = *p;
+ ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
+ assert(bh->ob == ob);
+ assert(bh->bl.length() == bh->length());
+ ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
+
+ int n = io_vec.size();
+ io_vec.resize(n + 1);
+ io_vec[n].first = bh->start();
+ io_vec[n].second = bh->bl;
+
+ total_len += bh->length();
+ if (bh->snapc.seq > snapc.seq)
+ snapc = bh->snapc;
+ if (bh->last_write > last_write)
+ bh->last_write = bh->last_write;
+ }
+
+ C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
+
+ ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
+ io_vec, snapc, last_write,
+ ob->truncate_size, ob->truncate_seq,
+ oncommit);
+ oncommit->tid = tid;
+ ob->last_write_tid = tid;
+ for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+ BufferHead *bh = *p;
+ bh->last_write_tid = tid;
+ mark_tx(bh);
+ }
+
+ if (perfcounter)
+ perfcounter->inc(l_objectcacher_data_flushed, total_len);
+}
void ObjectCacher::bh_write(BufferHead *bh)
{
mark_tx(bh);
}
-void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
- uint64_t length, ceph_tid_t tid, int r)
+void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
+ vector<pair<loff_t, uint64_t> >& ranges,
+ ceph_tid_t tid, int r)
{
assert(lock.is_locked());
ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
- << " " << start << "~" << length << " returned " << r
- << dendl;
+ << " ranges " << ranges << " returned " << r << dendl;
if (objects[poolid].count(oid) == 0) {
ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
- } else {
- Object *ob = objects[poolid][oid];
- int was_dirty_or_tx = ob->oset->dirty_or_tx;
+ return;
+ }
+
+ Object *ob = objects[poolid][oid];
+ int was_dirty_or_tx = ob->oset->dirty_or_tx;
+ for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
+ p != ranges.end();
+ ++p) {
+ loff_t start = p->first;
+ uint64_t length = p->second;
if (!ob->exists) {
ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
ob->exists = true;
assert(*bh);
ob->try_merge_bh(*bh);
}
+ }
- // update last_commit.
- assert(ob->last_commit_tid < tid);
- ob->last_commit_tid = tid;
-
- // waiters?
- list<Context*> ls;
- if (ob->waitfor_commit.count(tid)) {
- ls.splice(ls.begin(), ob->waitfor_commit[tid]);
- ob->waitfor_commit.erase(tid);
- }
+ // update last_commit.
+ assert(ob->last_commit_tid < tid);
+ ob->last_commit_tid = tid;
- // is the entire object set now clean and fully committed?
- ObjectSet *oset = ob->oset;
- ob->put();
+ // waiters?
+ list<Context*> ls;
+ if (ob->waitfor_commit.count(tid)) {
+ ls.splice(ls.begin(), ob->waitfor_commit[tid]);
+ ob->waitfor_commit.erase(tid);
+ }
- if (flush_set_callback &&
- was_dirty_or_tx > 0 &&
- oset->dirty_or_tx == 0) {
- // nothing dirty/tx
- flush_set_callback(flush_set_callback_arg, oset);
- }
+ // is the entire object set now clean and fully committed?
+ ObjectSet *oset = ob->oset;
+ ob->put();
- if (!ls.empty())
- finish_contexts(cct, ls, r);
+ if (flush_set_callback &&
+ was_dirty_or_tx > 0 &&
+ oset->dirty_or_tx == 0) { // nothing dirty/tx
+ flush_set_callback(flush_set_callback_arg, oset);
}
+
+ if (!ls.empty())
+ finish_contexts(cct, ls, r);
}
void ObjectCacher::flush(loff_t amount)
* to the other LRU, so that we can call
* lru_dirty.lru_get_next_expire() again.
*/
- loff_t did = 0;
- while (amount == 0 || did < amount) {
+ int64_t left = amount;
+ while (amount == 0 || left > 0) {
BufferHead *bh = static_cast<BufferHead*>(
bh_lru_dirty.lru_get_next_expire());
if (!bh) break;
if (bh->last_write > cutoff) break;
- did += bh->length();
- bh_write(bh);
- }
+ if (scattered_write) {
+ bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
+ } else {
+ left -= bh->length();
+ bh_write(bh);
+ }
+ }
}
ex_it->length, soid.snap)) {
ldout(cct, 20) << "readx may copy on write" << dendl;
bool wait = false;
+ list<BufferHead*> blist;
for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
bh_it != o->data.end();
++bh_it) {
if (bh->is_dirty() || bh->is_tx()) {
ldout(cct, 10) << "readx flushing " << *bh << dendl;
wait = true;
- if (bh->is_dirty())
- bh_write(bh);
+ if (bh->is_dirty()) {
+ if (scattered_write)
+ blist.push_back(bh);
+ else
+ bh_write(bh);
+ }
}
}
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
if (wait) {
ldout(cct, 10) << "readx waiting on tid " << o->last_write_tid
<< " on " << *o << dendl;
while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
lru_get_next_expire())) != 0 &&
bh->last_write < cutoff &&
- --max > 0) {
+ max > 0) {
ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
- bh_write(bh);
+ if (scattered_write) {
+ bh_write_adjacencies(bh, cutoff, NULL, &max);
+ } else {
+ bh_write(bh);
+ --max;
+ }
}
if (!max) {
// back off the lock to avoid starving other threads
bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
{
assert(lock.is_locked());
+ list<BufferHead*> blist;
bool clean = true;
ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
for (map<loff_t,BufferHead*>::iterator p = ob->data_lower_bound(offset);
if (!bh->is_dirty()) {
continue;
}
- bh_write(bh);
+
+ if (scattered_write)
+ blist.push_back(bh);
+ else
+ bh_write(bh);
clean = false;
}
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+
return clean;
}
C_GatherBuilder gather(cct);
set<Object*> waitfor_commit;
+ list<BufferHead*> blist;
+ Object *last_ob = NULL;
set<BufferHead*, BufferHead::ptr_lt>::iterator it, p, q;
// Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
if (bh->ob->oset != oset)
break;
waitfor_commit.insert(bh->ob);
- if (bh->is_dirty())
- bh_write(bh);
+ if (bh->is_dirty()) {
+ if (scattered_write) {
+ if (last_ob != bh->ob) {
+ if (!blist.empty()) {
+ bh_write_scattered(blist);
+ blist.clear();
+ }
+ last_ob = bh->ob;
+ }
+ blist.push_back(bh);
+ } else {
+ bh_write(bh);
+ }
+ }
}
if (backwards) {
if (bh->ob->oset != oset)
break;
waitfor_commit.insert(bh->ob);
- if (bh->is_dirty())
- bh_write(bh);
+ if (bh->is_dirty()) {
+ if (scattered_write) {
+ if (last_ob != bh->ob) {
+ if (!blist.empty()) {
+ bh_write_scattered(blist);
+ blist.clear();
+ }
+ last_ob = bh->ob;
+ }
+ blist.push_front(bh);
+ } else {
+ bh_write(bh);
+ }
+ }
if (!backwards)
break;
}
}
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+
for (set<Object*>::iterator i = waitfor_commit.begin();
i != waitfor_commit.end(); ++i) {
Object *ob = *i;
C_GatherBuilder gather(cct);
set<Object*> waitfor_commit;
+ list<BufferHead*> blist;
+ Object *last_ob = NULL;
set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
next = it = dirty_or_tx_bh.begin();
while (it != dirty_or_tx_bh.end()) {
BufferHead *bh = *it;
waitfor_commit.insert(bh->ob);
- if (bh->is_dirty())
- bh_write(bh);
+ if (bh->is_dirty()) {
+ if (scattered_write) {
+ if (last_ob != bh->ob) {
+ if (!blist.empty()) {
+ bh_write_scattered(blist);
+ blist.clear();
+ }
+ last_ob = bh->ob;
+ }
+ blist.push_back(bh);
+ } else {
+ bh_write(bh);
+ }
+ }
it = next;
}
+ if (scattered_write && !blist.empty())
+ bh_write_scattered(blist);
+
for (set<Object*>::iterator i = waitfor_commit.begin();
i != waitfor_commit.end();
++i) {
// ObjectCacher fields
private:
WritebackHandler& writeback_handler;
+ bool scattered_write;
string name;
Mutex& lock;
// io
void bh_read(BufferHead *bh, int op_flags);
void bh_write(BufferHead *bh);
+ void bh_write_scattered(list<BufferHead*>& blist);
+ void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+ int64_t *amount, int *max_count);
void trim();
void flush(loff_t amount=0);
loff_t offset, uint64_t length,
bufferlist &bl, int r,
bool trust_enoent);
- void bh_write_commit(int64_t poolid, sobject_t oid, loff_t offset,
- uint64_t length, ceph_tid_t t, int r);
+ void bh_write_commit(int64_t poolid, sobject_t oid,
+ vector<pair<loff_t, uint64_t> >& ranges,
+ ceph_tid_t t, int r);
+
class C_ReadFinish : public Context {
ObjectCacher *oc;
ObjectCacher *oc;
int64_t poolid;
sobject_t oid;
- loff_t start;
- uint64_t length;
+ vector<pair<loff_t, uint64_t> > ranges;
public:
ceph_tid_t tid;
C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
uint64_t l) :
- oc(c), poolid(_poolid), oid(o), start(s), length(l), tid(0) {}
+ oc(c), poolid(_poolid), oid(o), tid(0) {
+ ranges.push_back(make_pair(s, l));
+ }
+ C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
+ vector<pair<loff_t, uint64_t> >& _ranges) :
+ oc(c), poolid(_poolid), oid(o), tid(0) {
+ ranges.swap(_ranges);
+ }
void finish(int r) {
- oc->bh_write_commit(poolid, oid, start, length, tid, r);
+ oc->bh_write_commit(poolid, oid, ranges, tid, r);
}
- };
+ };
class C_WaitForWrite : public Context {
public: