}
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
- __u32 trunc_seq, Context *oncommit) {
+ uint64_t off, uint64_t len, const SnapContext& snapc,
+ const bufferlist &bl, utime_t mtime,
+ uint64_t trunc_size, __u32 trunc_seq,
+ ceph_tid_t journal_tid, Context *oncommit) {
return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0,
trunc_size, trunc_seq, NULL,
new C_OnFinisher(new C_Lock(m_lock, oncommit),
int fadvise_flags) {
snap_lock.get_read();
ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
- utime_t(), fadvise_flags);
+ utime_t(),
+ fadvise_flags,
+ 0);
snap_lock.put_read();
ObjectExtent extent(o, 0, off, len, 0);
extent.oloc.pool = data_ctx.get_id();
const SnapContext& snapc,
const bufferlist &bl, utime_t mtime,
uint64_t trunc_size, __u32 trunc_seq,
- Context *oncommit)
+ ceph_tid_t journal_tid, Context *oncommit)
{
assert(m_ictx->owner_lock.is_locked());
uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
return ++m_tid;
}
+
+ void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
+ uint64_t len, ceph_tid_t journal_tid) {
+ assert(journal_tid != 0);
+
+ // TODO inform the journal that we no longer expect to receive writebacks
+ // for the specified extent
+ }
+
void LibrbdWriteback::get_client_lock() {
m_ictx->owner_lock.get_read();
}
// Note that oloc, trunc_size, and trunc_seq are ignored
virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
- uint64_t off, uint64_t len, const SnapContext& snapc,
- const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
- __u32 trunc_seq, Context *oncommit);
+ uint64_t off, uint64_t len,
+ const SnapContext& snapc, const bufferlist &bl,
+ utime_t mtime, uint64_t trunc_size,
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ Context *oncommit);
+
+ virtual void overwrite_extent(const object_t& oid, uint64_t off,
+ uint64_t len, ceph_tid_t journal_tid);
virtual void get_client_lock();
virtual void put_client_lock();
right->last_read_tid = left->last_read_tid;
right->set_state(left->get_state());
right->snapc = left->snapc;
+ right->set_journal_tid(left->journal_tid);
loff_t newleftlen = off - left->start();
right->set_start(off);
assert(oc->lock.is_locked());
assert(left->end() == right->start());
assert(left->get_state() == right->get_state());
+ assert(left->can_merge_journal(right));
ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
+ if (left->get_journal_tid() == 0) {
+ left->set_journal_tid(right->get_journal_tid());
+ }
+ right->set_journal_tid(0);
+
oc->bh_remove(this, right);
oc->bh_stat_sub(left);
left->set_length(left->length() + right->length());
// data
left->bl.claim_append(right->bl);
-
- // version
+
+ // version
// note: this is sorta busted, but should only be used for dirty buffers
left->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
left->last_write = MAX( left->last_write, right->last_write );
if (p != data.begin()) {
--p;
if (p->second->end() == bh->start() &&
- p->second->get_state() == bh->get_state()) {
+ p->second->get_state() == bh->get_state() &&
+ p->second->can_merge_journal(bh)) {
merge_left(p->second, bh);
bh = p->second;
} else {
++p;
if (p != data.end() &&
p->second->start() == bh->end() &&
- p->second->get_state() == bh->get_state())
+ p->second->get_state() == bh->get_state() &&
+ p->second->can_merge_journal(bh))
merge_left(bh, p->second);
}
oc->bh_add(this, final);
ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
} else {
+ replace_journal_tid(final, wr->journal_tid);
oc->bh_stat_sub(final);
final->set_length(final->length() + max);
oc->bh_stat_add(final);
cur += max;
continue;
}
-
+
ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
//oc->verify_stats();
if (p->first <= cur) {
BufferHead *bh = p->second;
ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
-
+
if (p->first < cur) {
assert(final == 0);
if (cur + max >= bh->end()) {
oc->mark_dirty(final);
--p; // move iterator back to final
assert(p->second == final);
+ replace_journal_tid(bh, 0);
merge_left(final, bh);
} else {
final = bh;
}
}
-
+
// keep going.
loff_t lenfromcur = final->end() - cur;
cur += lenfromcur;
left -= lenfromcur;
++p;
- continue;
+ continue;
} else {
// gap!
loff_t next = p->first;
loff_t glen = MIN(next - cur, max);
ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
if (final) {
+ replace_journal_tid(final, wr->journal_tid);
oc->bh_stat_sub(final);
final->set_length(final->length() + glen);
oc->bh_stat_add(final);
final->set_length( glen );
oc->bh_add(this, final);
}
-
+
cur += glen;
left -= glen;
continue; // more?
}
}
}
-
- // set versoin
+
+ // set version
assert(final);
+ replace_journal_tid(final, wr->journal_tid);
ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
return final;
}
+void ObjectCacher::Object::replace_journal_tid(BufferHead *bh, ceph_tid_t tid) {
+ ceph_tid_t bh_tid = bh->get_journal_tid();
+
+ assert(tid == 0 || bh_tid <= tid);
+ if (bh_tid != 0 && bh_tid != tid) {
+ // inform journal that it should not expect a writeback from this extent
+ oc->writeback_handler.overwrite_extent(get_oid(), bh->start(), bh->length(),
+ bh_tid);
+ }
+ bh->set_journal_tid(tid);
+}
+
void ObjectCacher::Object::truncate(loff_t s)
{
assert(oc->lock.is_locked());
// remove bh entirely
assert(bh->start() >= s);
assert(bh->waitfor_read.empty());
+ replace_journal_tid(bh, 0);
oc->bh_remove(this, bh);
delete bh;
}
++p;
ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
assert(bh->waitfor_read.empty());
+ replace_journal_tid(bh, 0);
oc->bh_remove(this, bh);
delete bh;
}
bh->ob->get_soid(), bh->start(), bh->length());
// go
ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(), bh->ob->get_oloc(),
- bh->start(), bh->length(),
- bh->snapc, bh->bl, bh->last_write,
- bh->ob->truncate_size, bh->ob->truncate_seq,
- oncommit);
+ bh->start(), bh->length(),
+ bh->snapc, bh->bl, bh->last_write,
+ bh->ob->truncate_size,
+ bh->ob->truncate_seq,
+ bh->journal_tid, oncommit);
ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
// set bh last_write_tid
if (r >= 0) {
// ok! mark bh clean and error-free
mark_clean(bh);
+ bh->set_journal_tid(0);
if (bh->get_nocache())
bh_lru_rest.lru_bottouch(bh);
hit.push_back(bh);
void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
{
assert(lock.is_locked());
+ assert(bh->get_journal_tid() == 0);
ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
ob->remove_bh(bh);
if (bh->is_dirty()) {
bufferlist bl;
utime_t mtime;
int fadvise_flags;
- OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f)
- : snapc(sc), bl(b), mtime(mt), fadvise_flags(f) {}
+ ceph_tid_t journal_tid;
+ OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f,
+ ceph_tid_t _journal_tid)
+ : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
+ journal_tid(_journal_tid) {}
};
OSDWrite *prepare_write(const SnapContext& sc, const bufferlist &b,
- utime_t mt, int f) {
- return new OSDWrite(sc, b, mt, f);
+ utime_t mt, int f, ceph_tid_t journal_tid) {
+ return new OSDWrite(sc, b, mt, f, journal_tid);
}
ceph_tid_t last_read_tid; // tid of last read op (if any)
utime_t last_write;
SnapContext snapc;
+ ceph_tid_t journal_tid;
int error; // holds return value for failed reads
map< loff_t, list<Context*> > waitfor_read;
ob(o),
last_write_tid(0),
last_read_tid(0),
+ journal_tid(0),
error(0) {
ex.start = ex.length = 0;
}
state = s;
}
int get_state() const { return state; }
-
+
+ inline ceph_tid_t get_journal_tid() const {
+ return journal_tid;
+ }
+ inline void set_journal_tid(ceph_tid_t _journal_tid) {
+
+ journal_tid = _journal_tid;
+ }
+
bool is_missing() { return state == STATE_MISSING; }
bool is_dirty() { return state == STATE_DIRTY; }
bool is_clean() { return state == STATE_CLEAN; }
bool get_nocache() {
return nocache;
}
+
+ inline bool can_merge_journal(BufferHead *bh) const {
+ return (get_journal_tid() == 0 || bh->get_journal_tid() == 0 ||
+ get_journal_tid() == bh->get_journal_tid());
+ }
};
// ******* Object *********
map<loff_t, BufferHead*>& rx,
map<loff_t, BufferHead*>& errors);
BufferHead *map_write(OSDWrite *wr);
-
+
+ void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
void truncate(loff_t s);
void discard(loff_t off, loff_t len);
int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
loff_t offset, uint64_t len,
bufferlist& bl, utime_t mtime, int flags) {
- OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
+ OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, wr->extents);
return writex(wr, oset, NULL);
}
<< " " << bh.ob
<< " (" << bh.bl.length() << ")"
<< " v " << bh.last_write_tid;
+ if (bh.get_journal_tid() != 0) {
+ out << " j " << bh.get_journal_tid();
+ }
if (bh.is_tx()) out << " tx";
if (bh.is_rx()) out << " rx";
if (bh.is_dirty()) out << " dirty";
uint64_t off, uint64_t len, const SnapContext& snapc,
const bufferlist &bl, utime_t mtime,
uint64_t trunc_size, __u32 trunc_seq,
- Context *oncommit) = 0;
+ ceph_tid_t journal_tid, Context *oncommit) = 0;
+
+ virtual void overwrite_extent(const object_t& oid, uint64_t off, uint64_t len,
+ ceph_tid_t journal_tid) {}
virtual void get_client_lock() {}
virtual void put_client_lock() {}
const SnapContext& snapc,
const bufferlist &bl, utime_t mtime,
uint64_t trunc_size, __u32 trunc_seq,
- Context *oncommit)
+ ceph_tid_t journal_tid, Context *oncommit)
{
C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL, m_delay_ns);
m_finisher->queue(wrapper, 0);
uint64_t off, uint64_t len,
const SnapContext& snapc, const bufferlist &bl,
utime_t mtime, uint64_t trunc_size,
- __u32 trunc_seq, Context *oncommit);
+ __u32 trunc_seq, ceph_tid_t journal_tid,
+ Context *oncommit);
virtual bool may_copy_on_write(const object_t&, uint64_t, uint64_t, snapid_t);
private:
else
assert(r == 0);
} else {
- ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0);
+ ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0,
+ random());
wr->extents.push_back(op->extent);
lock.Lock();
obc.writex(wr, &object_set, NULL);