lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
__u64 size;
- client->objecter->stat(oid, &size, layout, 0, new C_SafeCond(&lock, &cond, &ack));
+ client->objecter->stat(oid, layout, &size, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
bufferlist bl;
- client->objecter->read(oid, off, len, layout, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
+ client->objecter->read(oid, layout, off, len, &bl, 0, new C_SafeCond(&lock, &cond, &ack));
while (!ack) cond.Wait(lock);
lock.Unlock();
}
bufferlist bl;
bl.push_back(bp);
SnapContext snapc;
- client->objecter->write(oid, off, len, layout, snapc, bl, 0,
+ client->objecter->write(oid, layout, off, len, snapc, bl, 0,
new C_SafeCond(&lock, &cond, &ack),
safeg->new_sub());
while (!ack) cond.Wait(lock);
lock.Lock();
ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0);
SnapContext snapc;
- client->objecter->zero(oid, off, len, layout, snapc, 0,
+ client->objecter->zero(oid, layout, off, len, snapc, 0,
new C_SafeCond(&lock, &cond, &ack),
safeg->new_sub());
while (!ack) cond.Wait(lock);
starts.push_back(g_clock.now());
client->client_lock.Lock();
- client->objecter->write(oid, 0, osize, layout, snapc, bl, 0,
+ client->objecter->write(oid, layout, 0, osize, snapc, bl, 0,
new C_Ref(lock, cond, &unack),
new C_Ref(lock, cond, &unsafe));
client->client_lock.Unlock();
utime_t start = g_clock.now();
if (write) {
dout(10) << "write to " << oid << dendl;
- client->objecter->write(oid, 0, osize, layout, snapc, bl, 0,
+ client->objecter->write(oid, layout, 0, osize, snapc, bl, 0,
new C_Ref(lock, cond, &unack),
new C_Ref(lock, cond, &unsafe));
} else {
dout(10) << "read from " << oid << dendl;
bufferlist inbl;
- client->objecter->read(oid, 0, osize, layout, &inbl, 0,
+ client->objecter->read(oid, layout, 0, osize, &inbl, 0,
new C_Ref(lock, cond, &unack));
}
client->client_lock.Unlock();
// start by reading the first hunk of it
C_Dir_Fetch *fin = new C_Dir_Fetch(this);
cache->mds->objecter->read( get_ondisk_object(),
- 0, 0, // whole object
cache->mds->objecter->osdmap->file_to_object_layout( get_ondisk_object(),
g_default_mds_dir_layout ),
+ 0, 0, // whole object
&fin->bl, 0,
fin );
}
C_MT_Load *c = new C_MT_Load(this, onfinish);
object_t oid(ino, 0);
mds->objecter->read(oid,
- 0, 0, // whole object
mds->objecter->osdmap->file_to_object_layout(oid,
g_default_mds_dir_layout),
+ 0, 0, // whole object
&c->bl, 0, c);
}
C_SM_Load *c = new C_SM_Load(this);
object_t oid(inode.ino, 0);
mds->objecter->read(oid,
- 0, 0, // whole object
mds->objecter->osdmap->file_to_object_layout(oid,
g_default_mds_dir_layout),
+ 0, 0, // whole object
&c->bl, 0,
c);
p++) {
dout(10) << "_probe probing " << p->oid << dendl;
C_Probe *c = new C_Probe(this, probe, p->oid);
- probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c);
+ probe->ops[p->oid] = objecter->stat(p->oid, p->layout, &c->size, probe->flags, c);
}
}
vector<ObjectExtent> extents;
file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents);
if (extents.size() == 1) {
- objecter->zero(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+ objecter->zero(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
snapc, flags, onack, oncommit);
} else {
C_Gather *gack = 0, *gcom = 0;
if (oncommit)
gcom = new C_Gather(oncommit);
for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
- objecter->zero(p->oid, p->offset, p->length, p->layout,
+ objecter->zero(p->oid, p->layout, p->offset, p->length,
snapc, flags,
gack ? gack->new_sub():0,
gcom ? gcom->new_sub():0);
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
+ objecter->read(bh->ob->get_oid(), bh->ob->get_layout(),
+ bh->start(), bh->length(),
&onfinish->bl, 0,
onfinish);
}
C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(),
+ tid_t tid = objecter->write(bh->ob->get_oid(), bh->ob->get_layout(),
+ bh->start(), bh->length(),
bh->snapc, bh->bl, 0,
onack, oncommit);
Cond cond;
bool done = false;
//objecter->readx(rd, new C_SafeCond(&lock, &cond, &done));
- objecter->read(rd->extents[0].oid, rd->extents[0].offset, rd->extents[0].length,
- rd->extents[0].layout, rd->bl, 0,
+ objecter->read(rd->extents[0].oid, rd->extents[0].layout,
+ rd->extents[0].offset, rd->extents[0].length,
+ rd->bl, 0,
new C_SafeCond(&lock, &cond, &done));
// block
commit->tid =
ack->tid =
- o->last_write_tid = objecter->lock(CEPH_OSD_OP_RDLOCK, o->get_oid(), 0, o->get_layout(), ack, commit);
+ o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDLOCK, 0, ack, commit);
}
// stake our claim.
commit->tid =
ack->tid =
- o->last_write_tid = objecter->lock(op, o->get_oid(), 0, o->get_layout(), ack, commit);
+ o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, ack, commit);
}
// stake our claim.
C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
commit->tid =
lockack->tid =
- o->last_write_tid = objecter->lock(CEPH_OSD_OP_RDUNLOCK, o->get_oid(), 0, o->get_layout(), lockack, commit);
+ o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDUNLOCK, 0, lockack, commit);
}
void ObjectCacher::wrunlock(Object *o)
C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0);
commit->tid =
lockack->tid =
- o->last_write_tid = objecter->lock(op, o->get_oid(), 0, o->get_layout(), lockack, commit);
+ o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, lockack, commit);
}
tid_t tid = *p;
if (op_modify.count(tid)) {
- OSDModify *wr = op_modify[tid];
+ ModifyOp *wr = op_modify[tid];
op_modify.erase(tid);
if (wr->onack)
num_uncommitted--;
// WRITE
- if (wr->waitfor_ack.count(tid)) {
+ if (wr->onack) {
dout(3) << "kick_requests missing ack, resub write " << tid << dendl;
- modifyx_submit(wr, wr->waitfor_ack[tid], tid);
+ modify_submit(wr);
} else {
- assert(wr->waitfor_commit.count(tid));
-
- if (wr->tid_version.count(tid)) {
- if ((wr->op == CEPH_OSD_OP_WRITE || wr->op == CEPH_OSD_OP_WRITEFULL) &&
- !g_conf.objecter_buffer_uncommitted) {
- dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl;
- assert(0); // crap. fixme.
- } else {
- dout(3) << "kick_requests missing commit, replay write " << tid
- << " v " << wr->tid_version[tid] << dendl;
- }
- } else {
- dout(3) << "kick_requests missing commit, resub write " << tid << dendl;
- }
- modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+ assert(wr->oncommit);
+ dout(3) << "kick_requests missing commit, resub write " << tid << dendl;
+ modify_submit(wr);
}
}
else if (op_read.count(tid)) {
// READ
- OSDRead *rd = op_read[tid];
+ ReadOp *rd = op_read[tid];
op_read.erase(tid);
dout(3) << "kick_requests resub read " << tid << dendl;
// resubmit
- readx_submit(rd, rd->ops[tid], true);
- rd->ops.erase(tid);
+ read_submit(rd);
}
-
- else if (op_stat.count(tid)) {
- OSDStat *st = op_stat[tid];
- op_stat.erase(tid);
-
- dout(3) << "kick_requests resub stat " << tid << dendl;
-
- // resubmit
- stat_submit(st);
- }
-
else
assert(0);
}
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
{
- assert(m->ops.size() >= 1);
- int op = m->ops[0].op;
-
- // read or modify?
- switch (op) {
- case CEPH_OSD_OP_READ:
- handle_osd_read_reply(m);
- break;
-
- case CEPH_OSD_OP_STAT:
- handle_osd_stat_reply(m);
- break;
-
- default:
- assert(m->is_modify());
+ if (m->is_modify())
handle_osd_modify_reply(m);
- break;
- }
-}
-
-
-
-// stat -----------------------------------
-
-tid_t Objecter::stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish)
-{
- OSDStat *st = prepare_stat(size, flags);
- st->extents.push_back(ObjectExtent(oid, 0, 0));
- st->extents.front().layout = ol;
- st->onfinish = onfinish;
-
- return stat_submit(st);
-}
-
-tid_t Objecter::stat_submit(OSDStat *st)
-{
- // find OSD
- ObjectExtent &ex = st->extents.front();
- PG &pg = get_pg( pg_t(ex.layout.ol_pgid) );
-
- // pick tid
- last_tid++;
- assert(client_inc >= 0);
-
- // add to gather set
- st->tid = last_tid;
- op_stat[last_tid] = st;
-
- pg.active_tids.insert(last_tid);
-
- // send?
-
- dout(10) << "stat_submit " << st << " tid " << last_tid
- << " oid " << ex.oid
- << " " << ex.layout
- << " osd" << pg.acker()
- << dendl;
-
- if (pg.acker() >= 0) {
- int flags = st->flags;
- if (st->onfinish) flags |= CEPH_OSD_OP_ACK;
-
- MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
- ex.oid, ex.layout, osdmap->get_epoch(),
- flags);
- m->stat();
- if (inc_lock > 0) {
- st->inc_lock = inc_lock;
- m->set_inc_lock(inc_lock);
- }
-
- messenger->send_message(m, osdmap->get_inst(pg.acker()));
- }
-
- return last_tid;
+ else
+ handle_osd_read_reply(m);
}
-void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
-{
- // get pio
- tid_t tid = m->get_tid();
-
- if (op_stat.count(tid) == 0) {
- dout(7) << "handle_osd_stat_reply " << tid << " ... stray" << dendl;
- delete m;
- return;
- }
-
- ceph_osd_op& op = m->ops[0];
-
- dout(7) << "handle_osd_stat_reply " << tid
- << " r=" << m->get_result()
- << " size=" << op.length
- << dendl;
- OSDStat *st = op_stat[ tid ];
- op_stat.erase( tid );
-
- // remove from osd/tid maps
- PG& pg = get_pg( m->get_pg() );
- assert(pg.active_tids.count(tid));
- pg.active_tids.erase(tid);
- if (pg.active_tids.empty()) close_pg( m->get_pg() );
-
- // success?
- if (m->get_result() == -EINCLOCKED &&
- (st->flags & CEPH_OSD_OP_INCLOCK_FAIL) == 0) {
- dout(7) << " got -EINCLOCKED, resubmitting" << dendl;
- stat_submit(st);
- delete m;
- return;
- }
- if (m->get_result() == -EAGAIN) {
- dout(7) << " got -EAGAIN, resubmitting" << dendl;
- stat_submit(st);
- delete m;
- return;
- }
-
- // ok!
- if (m->get_result() < 0) {
- *st->size = 0;
- } else {
- *st->size = op.length;
- }
-
- // finish, clean up
- Context *onfinish = st->onfinish;
-
- // done
- delete st;
- if (onfinish) {
- onfinish->finish(m->get_result());
- delete onfinish;
- }
-
- delete m;
-}
// read -----------------------------------
-
-tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol,
- bufferlist *bl, int flags,
- Context *onfinish)
-{
- OSDRead *rd = prepare_read(bl, flags);
- rd->extents.push_back(ObjectExtent(oid, off, len));
- rd->extents.front().layout = ol;
- readx(rd, onfinish);
- return last_tid;
-}
-
-
-tid_t Objecter::readx(OSDRead *rd, Context *onfinish)
-{
- rd->onfinish = onfinish;
-
- // issue reads
- for (list<ObjectExtent>::iterator it = rd->extents.begin();
- it != rd->extents.end();
- it++)
- readx_submit(rd, *it);
-
- return last_tid;
-}
-
-tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
+tid_t Objecter::read_submit(ReadOp *rd)
{
// find OSD
- PG &pg = get_pg( pg_t(ex.layout.ol_pgid) );
+ PG &pg = get_pg( pg_t(rd->layout.ol_pgid) );
// pick tid
- last_tid++;
- assert(client_inc >= 0);
-
- // add to gather set
- rd->ops[last_tid] = ex;
+ rd->tid = ++last_tid;
op_read[last_tid] = rd;
+ assert(client_inc >= 0);
+
pg.active_tids.insert(last_tid);
pg.last = g_clock.now();
// send?
- dout(10) << "readx_submit " << rd << " tid " << last_tid
- << " oid " << ex.oid << " " << ex.offset << "~" << ex.length
- << " (" << ex.buffer_extents.size() << " buffer fragments)"
- << " " << ex.layout
+ dout(10) << "read_submit " << rd << " tid " << last_tid
+ << " oid " << rd->oid
+ << " " << rd->ops
+ << " " << rd->layout
<< " osd" << pg.acker()
<< dendl;
if (pg.acker() >= 0) {
int flags = rd->flags;
- if (rd->onfinish) flags |= CEPH_OSD_OP_ACK;
+ if (rd->onfinish)
+ flags |= CEPH_OSD_OP_ACK;
MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
- ex.oid, ex.layout, osdmap->get_epoch(),
+ rd->oid, rd->layout, osdmap->get_epoch(),
flags);
- m->read(ex.offset, ex.length);
+ m->ops = rd->ops;
if (inc_lock > 0) {
rd->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
- m->set_retry_attempt(retry);
+ m->set_retry_attempt(rd->attempts++);
int who = pg.acker();
if (rd->flags & CEPH_OSD_OP_BALANCE_READS) {
int replica = messenger->get_myname().num() % pg.acting.size();
who = pg.acting[replica];
- dout(-10) << "readx_submit reading from random replica " << replica
+ dout(-10) << "read_submit reading from random replica " << replica
<< " = osd" << who << dendl;
}
messenger->send_message(m, osdmap->get_inst(who));
}
dout(7) << "handle_osd_read_reply " << tid << dendl;
- OSDRead *rd = op_read[ tid ];
+ ReadOp *rd = op_read[ tid ];
op_read.erase( tid );
- ceph_osd_op& op = m->ops[0];
-
// remove from osd/tid maps
PG& pg = get_pg( m->get_pg() );
assert(pg.active_tids.count(tid));
pg.active_tids.erase(tid);
if (pg.active_tids.empty()) close_pg( m->get_pg() );
- // our op finished
- rd->ops.erase(tid);
-
// fail?
if (m->get_result() == -EINCLOCKED &&
rd->flags & CEPH_OSD_OP_INCLOCK_FAIL) {
if (m->get_result() == -EAGAIN ||
m->get_result() == -EINCLOCKED) {
dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
- readx_submit(rd, rd->ops[tid], true);
+ read_submit(rd);
delete m;
return;
}
// what buffer offset are we?
- dout(7) << " got frag from " << m->get_oid() << " "
- << op.offset << "~" << op.length
- << ", still have " << rd->ops.size() << " more ops" << dendl;
-
- if (rd->ops.empty()) {
- // all done
- size_t bytes_read = 0;
-
- if (rd->read_data.size()) {
- dout(15) << " assembling frags" << dendl;
-
- /** FIXME This doesn't handle holes efficiently.
- * It allocates zero buffers to fill whole buffer, and
- * then discards trailing ones at the end.
- *
- * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over
- * the heap.
- */
-
- // we have other fragments, assemble them all... blech!
- rd->read_data[m->get_oid()] = new bufferlist;
- rd->read_data[m->get_oid()]->claim( m->get_data() );
-
- // map extents back into buffer
- map<__u64, bufferlist*> by_off; // buffer offset -> bufferlist
-
- // for each object extent...
- for (list<ObjectExtent>::iterator eit = rd->extents.begin();
- eit != rd->extents.end();
- eit++) {
- bufferlist *ox_buf = rd->read_data[eit->oid];
- unsigned ox_len = ox_buf->length();
- unsigned ox_off = 0;
- assert(ox_len <= eit->length);
-
- // for each buffer extent we're mapping into...
- for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin();
- bit != eit->buffer_extents.end();
- bit++) {
- dout(21) << " object " << eit->oid
- << " extent " << eit->offset << "~" << eit->length
- << " : ox offset " << ox_off
- << " -> buffer extent " << bit->first << "~" << bit->second << dendl;
- by_off[bit->first] = new bufferlist;
-
- if (ox_off + bit->second <= ox_len) {
- // we got the whole bx
- by_off[bit->first]->substr_of(*ox_buf, ox_off, bit->second);
- if (bytes_read < bit->first + bit->second)
- bytes_read = bit->first + bit->second;
- } else if (ox_off + bit->second > ox_len && ox_off < ox_len) {
- // we got part of this bx
- by_off[bit->first]->substr_of(*ox_buf, ox_off, (ox_len-ox_off));
- if (bytes_read < bit->first + ox_len-ox_off)
- bytes_read = bit->first + ox_len-ox_off;
-
- // zero end of bx
- dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << dendl;
- bufferptr z(ox_off + bit->second - ox_len);
- z.zero();
- by_off[bit->first]->append( z );
- } else {
- // we got none of this bx. zero whole thing.
- assert(ox_off >= ox_len);
- dout(21) << " adding all zeros for this bit " << bit->second << dendl;
- bufferptr z(bit->second);
- z.zero();
- by_off[bit->first]->append( z );
- }
- ox_off += bit->second;
- }
- assert(ox_off == eit->length);
- }
+ dout(7) << " got reply on " << rd->ops << dendl;
- // sort and string bits together
- for (map<__u64, bufferlist*>::iterator it = by_off.begin();
- it != by_off.end();
- it++) {
- assert(it->second->length());
- if (it->first < (__u64)bytes_read) {
- dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << dendl;
- rd->bl->claim_append(*(it->second));
- } else {
- dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << dendl;
- }
- delete it->second;
- }
-
- // trim trailing zeros?
- if (rd->bl->length() > bytes_read) {
- dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read
- << " len=" << rd->bl->length() << dendl;
- rd->bl->splice(bytes_read, rd->bl->length() - bytes_read);
- assert(bytes_read == rd->bl->length());
- }
-
- // hose p->read_data bufferlist*'s
- for (map<object_t, bufferlist*>::iterator it = rd->read_data.begin();
- it != rd->read_data.end();
- it++) {
- delete it->second;
- }
- } else {
- dout(15) << " only one frag" << dendl;
+ int bytes_read = m->get_data().length();
- // only one fragment, easy
- rd->bl->claim( m->get_data() );
- bytes_read = rd->bl->length();
- }
-
- // finish, clean up
- Context *onfinish = rd->onfinish;
-
- dout(7) << " " << bytes_read << " bytes "
- << rd->bl->length()
- << dendl;
-
- // done
- delete rd;
- if (onfinish) {
- onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
- delete onfinish;
- }
- } else {
- // store my bufferlist for later assembling
- rd->read_data[m->get_oid()] = new bufferlist;
- rd->read_data[m->get_oid()]->claim( m->get_data() );
+ if (rd->pbl)
+ rd->pbl->claim(m->get_data());
+ if (rd->psize) {
+ ceph_osd_op& op = m->ops[0];
+ *(rd->psize) = op.length;
}
+ // finish, clean up
+ Context *onfinish = rd->onfinish;
+ dout(7) << " " << bytes_read << " bytes " << dendl;
+
+ // done
+ delete rd;
+ if (onfinish) {
+ onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result());
+ delete onfinish;
+ }
delete m;
}
// write ------------------------------------
-tid_t Objecter::write(object_t oid, __u64 off, size_t len,
- ceph_object_layout ol, const SnapContext& snapc,
- bufferlist &bl, int flags,
- Context *onack, Context *oncommit)
-{
- OSDWrite *wr = prepare_write(snapc, bl, flags);
- wr->extents.push_back(ObjectExtent(oid, off, len));
- wr->extents.front().layout = ol;
- wr->extents.front().buffer_extents[0] = len;
- modifyx(wr, onack, oncommit);
- return last_tid;
-}
-
-tid_t Objecter::write_full(object_t oid,
- ceph_object_layout ol, const SnapContext& snapc,
- bufferlist &bl, int flags,
- Context *onack, Context *oncommit)
-{
- OSDWrite *wr = prepare_write_full(snapc, bl, flags);
- wr->extents.push_back(ObjectExtent(oid, 0, bl.length()));
- wr->extents.front().layout = ol;
- wr->extents.front().buffer_extents[0] = bl.length();
- modifyx(wr, onack, oncommit);
- return last_tid;
-}
-
-
-// zero
-
-tid_t Objecter::zero(object_t oid, __u64 off, size_t len,
- ceph_object_layout ol, const SnapContext& snapc,
- int flags,
- Context *onack, Context *oncommit)
-{
- OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_ZERO, flags);
- z->extents.push_back(ObjectExtent(oid, off, len));
- z->extents.front().layout = ol;
- modifyx(z, onack, oncommit);
- return last_tid;
-}
-
-// remove
-
-tid_t Objecter::remove(object_t oid,
- ceph_object_layout ol, const SnapContext& snapc,
- int flags,
- Context *onack, Context *oncommit)
-{
- OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags);
- z->extents.push_back(ObjectExtent(oid, 0, 0));
- z->extents.front().layout = ol;
- modifyx(z, onack, oncommit);
- return last_tid;
-}
-
-
-// lock ops
-
-tid_t Objecter::lock(int op, object_t oid, int flags,
- ceph_object_layout ol,
- Context *onack, Context *oncommit)
-{
- SnapContext snapc; // null is fine
- OSDModify *l = prepare_modify(snapc, op, flags);
- l->extents.push_back(ObjectExtent(oid, 0, 0));
- l->extents.front().layout = ol;
- modifyx(l, onack, oncommit);
- return last_tid;
-}
-
-
-
-// generic modify -----------------------------------
-
-tid_t Objecter::modifyx(OSDModify *wr, Context *onack, Context *oncommit)
-{
- wr->onack = onack;
- wr->oncommit = oncommit;
-
- // issue writes/whatevers
- for (list<ObjectExtent>::iterator it = wr->extents.begin();
- it != wr->extents.end();
- it++)
- modifyx_submit(wr, *it);
-
- return last_tid;
-}
-
-
-tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
+tid_t Objecter::modify_submit(ModifyOp *wr)
{
// find
- PG &pg = get_pg( pg_t(ex.layout.ol_pgid) );
+ PG &pg = get_pg( pg_t(wr->layout.ol_pgid) );
// pick tid
- tid_t tid;
- if (usetid > 0)
- tid = usetid;
- else
- tid = ++last_tid;
+ if (!wr->tid)
+ wr->tid = ++last_tid;
assert(client_inc >= 0);
// add to gather set(s)
int flags = wr->flags;
if (wr->onack) {
flags |= CEPH_OSD_OP_ACK;
- wr->waitfor_ack[tid] = ex;
++num_unacked;
} else {
dout(20) << " note: not requesting ack" << dendl;
}
if (wr->oncommit) {
flags |= CEPH_OSD_OP_SAFE;
- wr->waitfor_commit[tid] = ex;
++num_uncommitted;
} else {
dout(20) << " note: not requesting commit" << dendl;
}
- op_modify[tid] = wr;
- pg.active_tids.insert(tid);
+ op_modify[wr->tid] = wr;
+ pg.active_tids.insert(wr->tid);
pg.last = g_clock.now();
// send?
- dout(10) << "modifyx_submit " << ceph_osd_op_name(wr->op) << " tid " << tid
- << " oid " << ex.oid
- << " " << ex.offset << "~" << ex.length
- << " " << ex.layout
+ dout(10) << "modify_submit oid " << wr->oid
+ << " " << wr->ops << " tid " << wr->tid
+ << " " << wr->layout
<< " osd" << pg.primary()
<< dendl;
if (pg.primary() >= 0) {
- MOSDOp *m = new MOSDOp(client_inc, tid, true,
- ex.oid, ex.layout, osdmap->get_epoch(),
+ MOSDOp *m = new MOSDOp(client_inc, wr->tid, true,
+ wr->oid, wr->layout, osdmap->get_epoch(),
flags);
- m->add_simple_op(wr->op, ex.offset, ex.length);
+ m->ops = wr->ops;
m->set_snap_seq(wr->snapc.seq);
m->get_snaps() = wr->snapc.snaps;
if (inc_lock > 0) {
wr->inc_lock = inc_lock;
m->set_inc_lock(inc_lock);
}
- if (usetid > 0)
- m->set_retry_attempt(true);
-
- if (wr->tid_version.count(tid))
- m->set_version(wr->tid_version[tid]); // we're replaying this op!
-
- // what type of op?
- switch (wr->op) {
- case CEPH_OSD_OP_WRITE:
- case CEPH_OSD_OP_WRITEFULL:
- {
- // map buffer segments into this extent
- // (may be fragmented bc of striping)
- bufferlist cur;
- for (map<__u32,__u32>::iterator bit = ex.buffer_extents.begin();
- bit != ex.buffer_extents.end();
- bit++)
- ((OSDWrite*)wr)->bl.copy(bit->first, bit->second, cur);
- assert(cur.length() == ex.length);
- m->set_data(cur);//.claim(cur);
- }
- break;
- }
+ m->set_retry_attempt(wr->attempts++);
+ if (wr->version != eversion_t())
+ m->set_version(wr->version); // we're replaying this op!
+
+ m->set_data(wr->bl);
+
messenger->send_message(m, osdmap->get_inst(pg.primary()));
} else
maybe_request_map();
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
- return tid;
+ return wr->tid;
}
<< (m->is_safe() ? " commit":" ack")
<< " v " << m->get_version() << " in " << m->get_pg()
<< dendl;
- OSDModify *wr = op_modify[ tid ];
+ ModifyOp *wr = op_modify[ tid ];
Context *onack = 0;
Context *oncommit = 0;
dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
if (wr->onack) num_unacked--;
if (wr->oncommit) num_uncommitted--;
- if (wr->waitfor_ack.count(tid))
- modifyx_submit(wr, wr->waitfor_ack[tid]);
- else if (wr->waitfor_commit.count(tid))
- modifyx_submit(wr, wr->waitfor_commit[tid]);
- else assert(0);
+ modify_submit(wr);
delete m;
return;
}
assert(m->get_result() >= 0); // FIXME
// ack|commit -> ack
- if (wr->waitfor_ack.count(tid)) {
- wr->waitfor_ack.erase(tid);
- num_unacked--;
+ if (wr->onack) {
dout(15) << "handle_osd_modify_reply ack" << dendl;
-
- /*
- osd uses v to reorder during replay, but doesn't preserve it
- if (wr->tid_version.count(tid) &&
- wr->tid_version[tid].version != m->get_version().version) {
- dout(-10) << "handle_osd_modify_reply WARNING: replay of tid " << tid
- << " did not achieve previous ordering" << dendl;
- }
- */
- wr->tid_version[tid] = m->get_version();
-
- if (wr->waitfor_ack.empty()) {
- onack = wr->onack;
- wr->onack = 0; // only do callback once
-
- // buffer uncommitted?
- if (!g_conf.objecter_buffer_uncommitted &&
- (wr->op == CEPH_OSD_OP_WRITE || wr->op == CEPH_OSD_OP_WRITEFULL)) {
- // discard buffer!
- ((OSDWrite*)wr)->bl.clear();
- }
- } else {
- dout(15) << "handle_osd_modify_reply still need "
- << wr->waitfor_ack.size() << " acks" << dendl;
- }
+ wr->version = m->get_version();
+ onack = wr->onack;
+ wr->onack = 0; // only do callback once
+ num_unacked--;
}
- if (m->is_safe()) {
- // safe
- /*
- osd uses v to reorder during replay, but doesn't preserve it
- assert(wr->tid_version.count(tid) == 0 ||
- m->get_version() == wr->tid_version[tid]);
- */
-
- wr->waitfor_commit.erase(tid);
- num_uncommitted--;
+ if (wr->oncommit) {
dout(15) << "handle_osd_modify_reply safe" << dendl;
-
- if (wr->waitfor_commit.empty()) {
- oncommit = wr->oncommit;
- wr->oncommit = 0;
- } else {
- dout(15) << "handle_osd_modify_reply still need "
- << wr->waitfor_commit.size() << " safes" << dendl;
- }
+ oncommit = wr->oncommit;
+ wr->oncommit = 0;
+ num_uncommitted--;
}
// done?
done:
// done with this tid?
- if (wr->waitfor_commit.count(tid) == 0 &&
- wr->waitfor_ack.count(tid) == 0) {
+ if (!wr->onack && !wr->oncommit) {
assert(pg.active_tids.count(tid));
pg.active_tids.erase(tid);
- dout(15) << "handle_osd_modify_reply pg " << m->get_pg()
+ dout(15) << "handle_osd_modify_reply completed tid " << tid << ", pg " << m->get_pg()
<< " still has " << pg.active_tids << dendl;
if (pg.active_tids.empty())
close_pg( m->get_pg() );
op_modify.erase( tid );
- }
-
- // done with this overall op?
- if (wr->onack == 0 && wr->oncommit == 0) {
- dout(15) << "handle_osd_modify_reply completed" << dendl;
delete wr;
}
{
dout(10) << "dump_active" << dendl;
- for (hash_map<tid_t,OSDStat*>::iterator p = op_stat.begin(); p != op_stat.end(); p++)
- dout(10) << " stat " << p->first << dendl;
- for (hash_map<tid_t,OSDRead*>::iterator p = op_read.begin(); p != op_read.end(); p++)
+ for (hash_map<tid_t,ReadOp*>::iterator p = op_read.begin(); p != op_read.end(); p++)
dout(10) << " read " << p->first << dendl;
- for (hash_map<tid_t,OSDModify*>::iterator p = op_modify.begin(); p != op_modify.end(); p++)
+ for (hash_map<tid_t,ModifyOp*>::iterator p = op_modify.begin(); p != op_modify.end(); p++)
dout(10) << " modify " << p->first << dendl;
}
/*** track pending operations ***/
// read
public:
- class OSDOp {
- public:
- list<ObjectExtent> extents;
- int inc_lock;
- OSDOp() : inc_lock(0) {}
- virtual ~OSDOp() {}
- };
-
- class OSDRead : public OSDOp {
- public:
- bufferlist *bl;
- Context *onfinish;
- map<tid_t, ObjectExtent> ops;
- map<object_t, bufferlist*> read_data; // bits of data as they come back
+ struct ReadOp {
+ object_t oid;
+ ceph_object_layout layout;
+ vector<ceph_osd_op> ops;
+ bufferlist *pbl;
+ __u64 *psize;
int flags;
+ Context *onfinish;
+ int inc_lock;
- OSDRead(bufferlist *b, int f) : OSDOp(), bl(b), onfinish(0), flags(f) {
- if (bl)
- bl->clear();
- }
- };
-
- OSDRead *prepare_read(bufferlist *b, int f) {
- return new OSDRead(b, f);
- }
-
- class OSDStat : public OSDOp {
- public:
tid_t tid;
- __u64 *size; // where the size goes.
- int flags;
- Context *onfinish;
- OSDStat(__u64 *s, int f) : OSDOp(), tid(0), size(s), flags(f), onfinish(0) { }
+ int attempts;
+
+ ReadOp(object_t o, ceph_object_layout& ol, int f, Context *of, int n=0) :
+ oid(o), layout(ol), ops(n),
+ pbl(0), psize(0), flags(f), onfinish(of), inc_lock(-1),
+ tid(0), attempts(0) {
+ for (int i=0; i<n; i++)
+ memset(&ops[i], 0,sizeof(ops[i]));
+ }
};
- OSDStat *prepare_stat(__u64 *s, int f) {
- return new OSDStat(s, f);
- }
- // generic modify
- class OSDModify : public OSDOp {
- public:
+ struct ModifyOp {
+ object_t oid;
+ ceph_object_layout layout;
SnapContext snapc;
- int op;
- list<ObjectExtent> extents;
+ vector<ceph_osd_op> ops;
+ bufferlist bl;
int flags;
- Context *onack;
- Context *oncommit;
- map<tid_t, ObjectExtent> waitfor_ack;
- map<tid_t, eversion_t> tid_version;
- map<tid_t, ObjectExtent> waitfor_commit;
-
- OSDModify(const SnapContext& sc, int o, int f) : OSDOp(), snapc(sc), op(o), flags(f), onack(0), oncommit(0) {}
- };
+ Context *onack, *oncommit;
+ int inc_lock;
- OSDModify *prepare_modify(const SnapContext& sc, int o, int f) {
- return new OSDModify(sc, o, f);
- }
-
- // write (includes the bufferlist)
- class OSDWrite : public OSDModify {
- public:
- bufferlist bl;
- OSDWrite(int op, const SnapContext& sc, bufferlist &b, int f) : OSDModify(sc, op, f), bl(b) {}
+ tid_t tid;
+ eversion_t version;
+ int attempts;
+
+ ModifyOp(object_t o, ceph_object_layout& l, const SnapContext& sc, int f, Context *ac, Context *co, int n=0) :
+ oid(o), layout(l), snapc(sc), ops(n), flags(f), onack(ac), oncommit(co), inc_lock(-1),
+ tid(0), attempts(0) {
+ for (int i=0; i<n; i++)
+ memset(&ops[i], 0,sizeof(ops[i]));
+ }
};
- OSDWrite *prepare_write(const SnapContext& sc, bufferlist &b, int f) {
- return new OSDWrite(CEPH_OSD_OP_WRITE, sc, b, f);
- }
- OSDWrite *prepare_write_full(const SnapContext& sc, bufferlist &b, int f) {
- return new OSDWrite(CEPH_OSD_OP_WRITEFULL, sc, b, f);
- }
-
-
-
private:
// pending ops
- hash_map<tid_t,OSDStat*> op_stat;
- hash_map<tid_t,OSDRead*> op_read;
- hash_map<tid_t,OSDModify*> op_modify;
+ hash_map<tid_t,ReadOp* > op_read;
+ hash_map<tid_t,ModifyOp*> op_modify;
/**
* track pending ops by pg
public:
void dispatch(Message *m);
void handle_osd_op_reply(class MOSDOpReply *m);
- void handle_osd_stat_reply(class MOSDOpReply *m);
void handle_osd_read_reply(class MOSDOpReply *m);
void handle_osd_modify_reply(class MOSDOpReply *m);
void handle_osd_lock_reply(class MOSDOpReply *m);
void handle_osd_map(class MOSDMap *m);
private:
- tid_t readx_submit(OSDRead *rd, ObjectExtent& ex, bool retry=false);
- tid_t modifyx_submit(OSDModify *wr, ObjectExtent& ex, tid_t tid=0);
- tid_t stat_submit(OSDStat *st);
+ tid_t read_submit(ReadOp *rd);
+ tid_t modify_submit(ModifyOp *wr);
// public interface
public:
void set_inc_lock(int l) { inc_lock = l; }
- // med level
- tid_t readx(OSDRead *read, Context *onfinish);
- tid_t modifyx(OSDModify *wr, Context *onack, Context *oncommit);
+ //
+ tid_t stat(object_t oid, ceph_object_layout ol,
+ __u64 *size, int flags,
+ Context *onfinish) {
+ ReadOp *rd = new ReadOp(oid, ol, flags, onfinish, 1);
+ rd->psize = size;
+ rd->ops[0].op = CEPH_OSD_OP_STAT;
+ return read_submit(rd);
+ }
- // even lazier
- tid_t read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags,
- Context *onfinish);
- tid_t stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish);
+ tid_t read(object_t oid, ceph_object_layout ol,
+ __u64 off, size_t len, bufferlist *bl, int flags,
+ Context *onfinish) {
+ ReadOp *rd = new ReadOp(oid, ol, flags, onfinish, 1);
+ rd->pbl = bl;
+ rd->ops[0].op = CEPH_OSD_OP_READ;
+ rd->ops[0].offset = off;
+ rd->ops[0].length = len;
+ return read_submit(rd);
+ }
- tid_t write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, int flags,
- Context *onack, Context *oncommit);
- tid_t write_full(object_t oid, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, int flags,
- Context *onack, Context *oncommit);
- tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, int flags,
- Context *onack, Context *oncommit);
- tid_t remove(object_t oid, ceph_object_layout ol, const SnapContext& snapc, int flags,
- Context *onack, Context *oncommit);
+ tid_t write(object_t oid, ceph_object_layout ol,
+ __u64 off, size_t len, const SnapContext& snapc, bufferlist &bl, int flags,
+ Context *onack, Context *oncommit) {
+ ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+ wr->bl = bl;
+ wr->ops[0].op = CEPH_OSD_OP_WRITE;
+ wr->ops[0].offset = off;
+ wr->ops[0].length = len;
+ return modify_submit(wr);
+ }
+ tid_t write_full(object_t oid, ceph_object_layout ol,
+ const SnapContext& snapc, bufferlist &bl, int flags,
+ Context *onack, Context *oncommit) {
+ ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+ wr->bl = bl;
+ wr->ops[0].op = CEPH_OSD_OP_WRITEFULL;
+ wr->ops[0].offset = 0;
+ wr->ops[0].length = bl.length();
+ return modify_submit(wr);
+ }
+ tid_t zero(object_t oid, ceph_object_layout ol,
+ __u64 off, size_t len, const SnapContext& snapc, int flags,
+ Context *onack, Context *oncommit) {
+ ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+ wr->ops[0].op = CEPH_OSD_OP_ZERO;
+ wr->ops[0].offset = 0;
+ wr->ops[0].length = len;
+ return modify_submit(wr);
+ }
+ tid_t remove(object_t oid, ceph_object_layout ol,
+ const SnapContext& snapc, int flags,
+ Context *onack, Context *oncommit) {
+ ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+ wr->ops[0].op = CEPH_OSD_OP_DELETE;
+ return modify_submit(wr);
+ }
- // no snapc for lock ops
- tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit);
+ tid_t lock(object_t oid, ceph_object_layout ol, int op, int flags, Context *onack, Context *oncommit) {
+ SnapContext snapc; // no snapc for lock ops
+ ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1);
+ wr->ops[0].op = op;
+ return modify_submit(wr);
+ }
void sg_read(vector<ObjectExtent>& extents, bufferlist *bl, int flags, Context *onfinish) {
if (extents.size() == 1) {
- read(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+ read(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
bl, flags, onfinish);
} else {
C_Gather *g = new C_Gather;
vector<bufferlist> resultbl(extents.size());
int i=0;
for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
- read(p->oid, p->offset, p->length, p->layout,
+ read(p->oid, p->layout, p->offset, p->length,
&resultbl[i++], flags, onfinish);
}
g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc, bufferlist bl,
int flags, Context *onack, Context *oncommit) {
if (extents.size() == 1) {
- write(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout,
+ write(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length,
snapc, bl, flags, onack, oncommit);
} else {
C_Gather *gack = 0, *gcom = 0;
bit++)
bl.copy(bit->first, bit->second, cur);
assert(cur.length() == p->length);
- write(p->oid, p->offset, p->length, p->layout,
+ write(p->oid, p->layout, p->offset, p->length,
snapc, cur, flags,
gack ? gack->new_sub():0,
gcom ? gcom->new_sub():0);