From 1a0958f01fd3203b012a7ff6c4c707d90b0b1cb0 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sun, 9 Nov 2008 15:52:02 -0800 Subject: [PATCH] objecter: simplify objecter (no scatter/gather, generic ops vector) --- src/client/SyntheticClient.cc | 14 +- src/mds/CDir.cc | 2 +- src/mds/MDSTable.cc | 2 +- src/mds/SessionMap.cc | 2 +- src/osdc/Filer.cc | 2 +- src/osdc/Filer.h | 4 +- src/osdc/ObjectCacher.cc | 19 +- src/osdc/Objecter.cc | 614 +++++----------------------------- src/osdc/Objecter.h | 195 ++++++----- 9 files changed, 206 insertions(+), 648 deletions(-) diff --git a/src/client/SyntheticClient.cc b/src/client/SyntheticClient.cc index 3a094f285f708..ad30ecf878bff 100644 --- a/src/client/SyntheticClient.cc +++ b/src/client/SyntheticClient.cc @@ -1345,7 +1345,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) lock.Lock(); ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0); __u64 size; - client->objecter->stat(oid, &size, layout, 0, new C_SafeCond(&lock, &cond, &ack)); + client->objecter->stat(oid, layout, &size, 0, new C_SafeCond(&lock, &cond, &ack)); while (!ack) cond.Wait(lock); lock.Unlock(); } @@ -1358,7 +1358,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) lock.Lock(); ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0); bufferlist bl; - client->objecter->read(oid, off, len, layout, &bl, 0, new C_SafeCond(&lock, &cond, &ack)); + client->objecter->read(oid, layout, off, len, &bl, 0, new C_SafeCond(&lock, &cond, &ack)); while (!ack) cond.Wait(lock); lock.Unlock(); } @@ -1374,7 +1374,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) bufferlist bl; bl.push_back(bp); SnapContext snapc; - client->objecter->write(oid, off, len, layout, snapc, bl, 0, + client->objecter->write(oid, layout, off, len, snapc, bl, 0, new C_SafeCond(&lock, &cond, &ack), safeg->new_sub()); while (!ack) cond.Wait(lock); @@ -1389,7 +1389,7 @@ int SyntheticClient::play_trace(Trace& t, string& prefix, bool metadata_only) lock.Lock(); ceph_object_layout layout = client->osdmap->make_object_layout(oid, pg_t::TYPE_REP, 2, 0); SnapContext snapc; - client->objecter->zero(oid, off, len, layout, snapc, 0, + client->objecter->zero(oid, layout, off, len, snapc, 0, new C_SafeCond(&lock, &cond, &ack), safeg->new_sub()); while (!ack) cond.Wait(lock); @@ -2157,7 +2157,7 @@ int SyntheticClient::create_objects(int nobj, int osize, int inflight) starts.push_back(g_clock.now()); client->client_lock.Lock(); - client->objecter->write(oid, 0, osize, layout, snapc, bl, 0, + client->objecter->write(oid, layout, 0, osize, snapc, bl, 0, new C_Ref(lock, cond, &unack), new C_Ref(lock, cond, &unsafe)); client->client_lock.Unlock(); @@ -2258,13 +2258,13 @@ int SyntheticClient::object_rw(int nobj, int osize, int wrpc, utime_t start = g_clock.now(); if (write) { dout(10) << "write to " << oid << dendl; - client->objecter->write(oid, 0, osize, layout, snapc, bl, 0, + client->objecter->write(oid, layout, 0, osize, snapc, bl, 0, new C_Ref(lock, cond, &unack), new C_Ref(lock, cond, &unsafe)); } else { dout(10) << "read from " << oid << dendl; bufferlist inbl; - client->objecter->read(oid, 0, osize, layout, &inbl, 0, + client->objecter->read(oid, layout, 0, osize, &inbl, 0, new C_Ref(lock, cond, &unack)); } client->client_lock.Unlock(); diff --git a/src/mds/CDir.cc b/src/mds/CDir.cc index 093de51c22060..8980b3cbcf12d 100644 --- a/src/mds/CDir.cc +++ b/src/mds/CDir.cc @@ -1044,9 +1044,9 @@ void CDir::fetch(Context *c, bool ignore_authpinnability) // start by reading the first hunk of it C_Dir_Fetch *fin = new C_Dir_Fetch(this); cache->mds->objecter->read( get_ondisk_object(), - 0, 0, // whole object cache->mds->objecter->osdmap->file_to_object_layout( get_ondisk_object(), g_default_mds_dir_layout ), + 0, 0, // whole object &fin->bl, 0, fin ); } diff --git a/src/mds/MDSTable.cc b/src/mds/MDSTable.cc index a4c9bc107f6a5..92f4be5cbe6f9 100644 --- a/src/mds/MDSTable.cc +++ b/src/mds/MDSTable.cc @@ -121,9 +121,9 @@ void MDSTable::load(Context *onfinish) C_MT_Load *c = new C_MT_Load(this, onfinish); object_t oid(ino, 0); mds->objecter->read(oid, - 0, 0, // whole object mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_dir_layout), + 0, 0, // whole object &c->bl, 0, c); } diff --git a/src/mds/SessionMap.cc b/src/mds/SessionMap.cc index 5b27dc1ab4506..05a8540c6bdb8 100644 --- a/src/mds/SessionMap.cc +++ b/src/mds/SessionMap.cc @@ -67,9 +67,9 @@ void SessionMap::load(Context *onload) C_SM_Load *c = new C_SM_Load(this); object_t oid(inode.ino, 0); mds->objecter->read(oid, - 0, 0, // whole object mds->objecter->osdmap->file_to_object_layout(oid, g_default_mds_dir_layout), + 0, 0, // whole object &c->bl, 0, c); diff --git a/src/osdc/Filer.cc b/src/osdc/Filer.cc index 85c886987e4b4..309bc97db66d5 100644 --- a/src/osdc/Filer.cc +++ b/src/osdc/Filer.cc @@ -95,7 +95,7 @@ void Filer::_probe(Probe *probe) p++) { dout(10) << "_probe probing " << p->oid << dendl; C_Probe *c = new C_Probe(this, probe, p->oid); - probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, probe->flags, c); + probe->ops[p->oid] = objecter->stat(p->oid, p->layout, &c->size, probe->flags, c); } } diff --git a/src/osdc/Filer.h b/src/osdc/Filer.h index b2bbc06e6e497..7073a26cad71f 100644 --- a/src/osdc/Filer.h +++ b/src/osdc/Filer.h @@ -138,7 +138,7 @@ class Filer { vector extents; file_to_extents(ino, layout, CEPH_NOSNAP, offset, len, extents); if (extents.size() == 1) { - objecter->zero(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout, + objecter->zero(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length, snapc, flags, onack, oncommit); } else { C_Gather *gack = 0, *gcom = 0; @@ -147,7 +147,7 @@ class Filer { if (oncommit) gcom = new C_Gather(oncommit); for (vector::iterator p = extents.begin(); p != extents.end(); p++) { - objecter->zero(p->oid, p->offset, p->length, p->layout, + objecter->zero(p->oid, p->layout, p->offset, p->length, snapc, flags, gack ? gack->new_sub():0, gcom ? gcom->new_sub():0); diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 164182da61f2b..9a879900fcf01 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -404,7 +404,8 @@ void ObjectCacher::bh_read(BufferHead *bh) C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length()); // go - objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), + objecter->read(bh->ob->get_oid(), bh->ob->get_layout(), + bh->start(), bh->length(), &onfinish->bl, 0, onfinish); } @@ -494,7 +495,8 @@ void ObjectCacher::bh_write(BufferHead *bh) C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length()); // go - tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), + tid_t tid = objecter->write(bh->ob->get_oid(), bh->ob->get_layout(), + bh->start(), bh->length(), bh->snapc, bh->bl, 0, onack, oncommit); @@ -1031,8 +1033,9 @@ int ObjectCacher::atomic_sync_readx(OSDRead *rd, inodeno_t ino, Mutex& lock) Cond cond; bool done = false; //objecter->readx(rd, new C_SafeCond(&lock, &cond, &done)); - objecter->read(rd->extents[0].oid, rd->extents[0].offset, rd->extents[0].length, - rd->extents[0].layout, rd->bl, 0, + objecter->read(rd->extents[0].oid, rd->extents[0].layout, + rd->extents[0].offset, rd->extents[0].length, + rd->bl, 0, new C_SafeCond(&lock, &cond, &done)); // block @@ -1169,7 +1172,7 @@ void ObjectCacher::rdlock(Object *o) commit->tid = ack->tid = - o->last_write_tid = objecter->lock(CEPH_OSD_OP_RDLOCK, o->get_oid(), 0, o->get_layout(), ack, commit); + o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDLOCK, 0, ack, commit); } // stake our claim. @@ -1212,7 +1215,7 @@ void ObjectCacher::wrlock(Object *o) commit->tid = ack->tid = - o->last_write_tid = objecter->lock(op, o->get_oid(), 0, o->get_layout(), ack, commit); + o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, ack, commit); } // stake our claim. @@ -1255,7 +1258,7 @@ void ObjectCacher::rdunlock(Object *o) C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0); commit->tid = lockack->tid = - o->last_write_tid = objecter->lock(CEPH_OSD_OP_RDUNLOCK, o->get_oid(), 0, o->get_layout(), lockack, commit); + o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), CEPH_OSD_OP_RDUNLOCK, 0, lockack, commit); } void ObjectCacher::wrunlock(Object *o) @@ -1287,7 +1290,7 @@ void ObjectCacher::wrunlock(Object *o) C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0); commit->tid = lockack->tid = - o->last_write_tid = objecter->lock(op, o->get_oid(), 0, o->get_layout(), lockack, commit); + o->last_write_tid = objecter->lock(o->get_oid(), o->get_layout(), op, 0, lockack, commit); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 296a067eb5cd0..8dbbd98be6882 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -248,7 +248,7 @@ void Objecter::kick_requests(set& changed_pgs) tid_t tid = *p; if (op_modify.count(tid)) { - OSDModify *wr = op_modify[tid]; + ModifyOp *wr = op_modify[tid]; op_modify.erase(tid); if (wr->onack) @@ -257,49 +257,25 @@ void Objecter::kick_requests(set& changed_pgs) num_uncommitted--; // WRITE - if (wr->waitfor_ack.count(tid)) { + if (wr->onack) { dout(3) << "kick_requests missing ack, resub write " << tid << dendl; - modifyx_submit(wr, wr->waitfor_ack[tid], tid); + modify_submit(wr); } else { - assert(wr->waitfor_commit.count(tid)); - - if (wr->tid_version.count(tid)) { - if ((wr->op == CEPH_OSD_OP_WRITE || wr->op == CEPH_OSD_OP_WRITEFULL) && - !g_conf.objecter_buffer_uncommitted) { - dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl; - assert(0); // crap. fixme. - } else { - dout(3) << "kick_requests missing commit, replay write " << tid - << " v " << wr->tid_version[tid] << dendl; - } - } else { - dout(3) << "kick_requests missing commit, resub write " << tid << dendl; - } - modifyx_submit(wr, wr->waitfor_commit[tid], tid); + assert(wr->oncommit); + dout(3) << "kick_requests missing commit, resub write " << tid << dendl; + modify_submit(wr); } } else if (op_read.count(tid)) { // READ - OSDRead *rd = op_read[tid]; + ReadOp *rd = op_read[tid]; op_read.erase(tid); dout(3) << "kick_requests resub read " << tid << dendl; // resubmit - readx_submit(rd, rd->ops[tid], true); - rd->ops.erase(tid); + read_submit(rd); } - - else if (op_stat.count(tid)) { - OSDStat *st = op_stat[tid]; - op_stat.erase(tid); - - dout(3) << "kick_requests resub stat " << tid << dendl; - - // resubmit - stat_submit(st); - } - else assert(0); } @@ -338,215 +314,57 @@ void Objecter::tick() void Objecter::handle_osd_op_reply(MOSDOpReply *m) { - assert(m->ops.size() >= 1); - int op = m->ops[0].op; - - // read or modify? - switch (op) { - case CEPH_OSD_OP_READ: - handle_osd_read_reply(m); - break; - - case CEPH_OSD_OP_STAT: - handle_osd_stat_reply(m); - break; - - default: - assert(m->is_modify()); + if (m->is_modify()) handle_osd_modify_reply(m); - break; - } -} - - - -// stat ----------------------------------- - -tid_t Objecter::stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish) -{ - OSDStat *st = prepare_stat(size, flags); - st->extents.push_back(ObjectExtent(oid, 0, 0)); - st->extents.front().layout = ol; - st->onfinish = onfinish; - - return stat_submit(st); -} - -tid_t Objecter::stat_submit(OSDStat *st) -{ - // find OSD - ObjectExtent &ex = st->extents.front(); - PG &pg = get_pg( pg_t(ex.layout.ol_pgid) ); - - // pick tid - last_tid++; - assert(client_inc >= 0); - - // add to gather set - st->tid = last_tid; - op_stat[last_tid] = st; - - pg.active_tids.insert(last_tid); - - // send? - - dout(10) << "stat_submit " << st << " tid " << last_tid - << " oid " << ex.oid - << " " << ex.layout - << " osd" << pg.acker() - << dendl; - - if (pg.acker() >= 0) { - int flags = st->flags; - if (st->onfinish) flags |= CEPH_OSD_OP_ACK; - - MOSDOp *m = new MOSDOp(client_inc, last_tid, false, - ex.oid, ex.layout, osdmap->get_epoch(), - flags); - m->stat(); - if (inc_lock > 0) { - st->inc_lock = inc_lock; - m->set_inc_lock(inc_lock); - } - - messenger->send_message(m, osdmap->get_inst(pg.acker())); - } - - return last_tid; + else + handle_osd_read_reply(m); } -void Objecter::handle_osd_stat_reply(MOSDOpReply *m) -{ - // get pio - tid_t tid = m->get_tid(); - - if (op_stat.count(tid) == 0) { - dout(7) << "handle_osd_stat_reply " << tid << " ... stray" << dendl; - delete m; - return; - } - - ceph_osd_op& op = m->ops[0]; - - dout(7) << "handle_osd_stat_reply " << tid - << " r=" << m->get_result() - << " size=" << op.length - << dendl; - OSDStat *st = op_stat[ tid ]; - op_stat.erase( tid ); - - // remove from osd/tid maps - PG& pg = get_pg( m->get_pg() ); - assert(pg.active_tids.count(tid)); - pg.active_tids.erase(tid); - if (pg.active_tids.empty()) close_pg( m->get_pg() ); - - // success? - if (m->get_result() == -EINCLOCKED && - (st->flags & CEPH_OSD_OP_INCLOCK_FAIL) == 0) { - dout(7) << " got -EINCLOCKED, resubmitting" << dendl; - stat_submit(st); - delete m; - return; - } - if (m->get_result() == -EAGAIN) { - dout(7) << " got -EAGAIN, resubmitting" << dendl; - stat_submit(st); - delete m; - return; - } - - // ok! - if (m->get_result() < 0) { - *st->size = 0; - } else { - *st->size = op.length; - } - - // finish, clean up - Context *onfinish = st->onfinish; - - // done - delete st; - if (onfinish) { - onfinish->finish(m->get_result()); - delete onfinish; - } - - delete m; -} // read ----------------------------------- - -tid_t Objecter::read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, - bufferlist *bl, int flags, - Context *onfinish) -{ - OSDRead *rd = prepare_read(bl, flags); - rd->extents.push_back(ObjectExtent(oid, off, len)); - rd->extents.front().layout = ol; - readx(rd, onfinish); - return last_tid; -} - - -tid_t Objecter::readx(OSDRead *rd, Context *onfinish) -{ - rd->onfinish = onfinish; - - // issue reads - for (list::iterator it = rd->extents.begin(); - it != rd->extents.end(); - it++) - readx_submit(rd, *it); - - return last_tid; -} - -tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry) +tid_t Objecter::read_submit(ReadOp *rd) { // find OSD - PG &pg = get_pg( pg_t(ex.layout.ol_pgid) ); + PG &pg = get_pg( pg_t(rd->layout.ol_pgid) ); // pick tid - last_tid++; - assert(client_inc >= 0); - - // add to gather set - rd->ops[last_tid] = ex; + rd->tid = ++last_tid; op_read[last_tid] = rd; + assert(client_inc >= 0); + pg.active_tids.insert(last_tid); pg.last = g_clock.now(); // send? - dout(10) << "readx_submit " << rd << " tid " << last_tid - << " oid " << ex.oid << " " << ex.offset << "~" << ex.length - << " (" << ex.buffer_extents.size() << " buffer fragments)" - << " " << ex.layout + dout(10) << "read_submit " << rd << " tid " << last_tid + << " oid " << rd->oid + << " " << rd->ops + << " " << rd->layout << " osd" << pg.acker() << dendl; if (pg.acker() >= 0) { int flags = rd->flags; - if (rd->onfinish) flags |= CEPH_OSD_OP_ACK; + if (rd->onfinish) + flags |= CEPH_OSD_OP_ACK; MOSDOp *m = new MOSDOp(client_inc, last_tid, false, - ex.oid, ex.layout, osdmap->get_epoch(), + rd->oid, rd->layout, osdmap->get_epoch(), flags); - m->read(ex.offset, ex.length); + m->ops = rd->ops; if (inc_lock > 0) { rd->inc_lock = inc_lock; m->set_inc_lock(inc_lock); } - m->set_retry_attempt(retry); + m->set_retry_attempt(rd->attempts++); int who = pg.acker(); if (rd->flags & CEPH_OSD_OP_BALANCE_READS) { int replica = messenger->get_myname().num() % pg.acting.size(); who = pg.acting[replica]; - dout(-10) << "readx_submit reading from random replica " << replica + dout(-10) << "read_submit reading from random replica " << replica << " = osd" << who << dendl; } messenger->send_message(m, osdmap->get_inst(who)); @@ -569,20 +387,15 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) } dout(7) << "handle_osd_read_reply " << tid << dendl; - OSDRead *rd = op_read[ tid ]; + ReadOp *rd = op_read[ tid ]; op_read.erase( tid ); - ceph_osd_op& op = m->ops[0]; - // remove from osd/tid maps PG& pg = get_pg( m->get_pg() ); assert(pg.active_tids.count(tid)); pg.active_tids.erase(tid); if (pg.active_tids.empty()) close_pg( m->get_pg() ); - // our op finished - rd->ops.erase(tid); - // fail? if (m->get_result() == -EINCLOCKED && rd->flags & CEPH_OSD_OP_INCLOCK_FAIL) { @@ -600,141 +413,33 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) if (m->get_result() == -EAGAIN || m->get_result() == -EINCLOCKED) { dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl; - readx_submit(rd, rd->ops[tid], true); + read_submit(rd); delete m; return; } // what buffer offset are we? - dout(7) << " got frag from " << m->get_oid() << " " - << op.offset << "~" << op.length - << ", still have " << rd->ops.size() << " more ops" << dendl; - - if (rd->ops.empty()) { - // all done - size_t bytes_read = 0; - - if (rd->read_data.size()) { - dout(15) << " assembling frags" << dendl; - - /** FIXME This doesn't handle holes efficiently. - * It allocates zero buffers to fill whole buffer, and - * then discards trailing ones at the end. - * - * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over - * the heap. - */ - - // we have other fragments, assemble them all... blech! - rd->read_data[m->get_oid()] = new bufferlist; - rd->read_data[m->get_oid()]->claim( m->get_data() ); - - // map extents back into buffer - map<__u64, bufferlist*> by_off; // buffer offset -> bufferlist - - // for each object extent... - for (list::iterator eit = rd->extents.begin(); - eit != rd->extents.end(); - eit++) { - bufferlist *ox_buf = rd->read_data[eit->oid]; - unsigned ox_len = ox_buf->length(); - unsigned ox_off = 0; - assert(ox_len <= eit->length); - - // for each buffer extent we're mapping into... - for (map<__u32,__u32>::iterator bit = eit->buffer_extents.begin(); - bit != eit->buffer_extents.end(); - bit++) { - dout(21) << " object " << eit->oid - << " extent " << eit->offset << "~" << eit->length - << " : ox offset " << ox_off - << " -> buffer extent " << bit->first << "~" << bit->second << dendl; - by_off[bit->first] = new bufferlist; - - if (ox_off + bit->second <= ox_len) { - // we got the whole bx - by_off[bit->first]->substr_of(*ox_buf, ox_off, bit->second); - if (bytes_read < bit->first + bit->second) - bytes_read = bit->first + bit->second; - } else if (ox_off + bit->second > ox_len && ox_off < ox_len) { - // we got part of this bx - by_off[bit->first]->substr_of(*ox_buf, ox_off, (ox_len-ox_off)); - if (bytes_read < bit->first + ox_len-ox_off) - bytes_read = bit->first + ox_len-ox_off; - - // zero end of bx - dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << dendl; - bufferptr z(ox_off + bit->second - ox_len); - z.zero(); - by_off[bit->first]->append( z ); - } else { - // we got none of this bx. zero whole thing. - assert(ox_off >= ox_len); - dout(21) << " adding all zeros for this bit " << bit->second << dendl; - bufferptr z(bit->second); - z.zero(); - by_off[bit->first]->append( z ); - } - ox_off += bit->second; - } - assert(ox_off == eit->length); - } + dout(7) << " got reply on " << rd->ops << dendl; - // sort and string bits together - for (map<__u64, bufferlist*>::iterator it = by_off.begin(); - it != by_off.end(); - it++) { - assert(it->second->length()); - if (it->first < (__u64)bytes_read) { - dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << dendl; - rd->bl->claim_append(*(it->second)); - } else { - dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << dendl; - } - delete it->second; - } - - // trim trailing zeros? - if (rd->bl->length() > bytes_read) { - dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read - << " len=" << rd->bl->length() << dendl; - rd->bl->splice(bytes_read, rd->bl->length() - bytes_read); - assert(bytes_read == rd->bl->length()); - } - - // hose p->read_data bufferlist*'s - for (map::iterator it = rd->read_data.begin(); - it != rd->read_data.end(); - it++) { - delete it->second; - } - } else { - dout(15) << " only one frag" << dendl; + int bytes_read = m->get_data().length(); - // only one fragment, easy - rd->bl->claim( m->get_data() ); - bytes_read = rd->bl->length(); - } - - // finish, clean up - Context *onfinish = rd->onfinish; - - dout(7) << " " << bytes_read << " bytes " - << rd->bl->length() - << dendl; - - // done - delete rd; - if (onfinish) { - onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result()); - delete onfinish; - } - } else { - // store my bufferlist for later assembling - rd->read_data[m->get_oid()] = new bufferlist; - rd->read_data[m->get_oid()]->claim( m->get_data() ); + if (rd->pbl) + rd->pbl->claim(m->get_data()); + if (rd->psize) { + ceph_osd_op& op = m->ops[0]; + *(rd->psize) = op.length; } + // finish, clean up + Context *onfinish = rd->onfinish; + dout(7) << " " << bytes_read << " bytes " << dendl; + + // done + delete rd; + if (onfinish) { + onfinish->finish(bytes_read);// > 0 ? bytes_read:m->get_result()); + delete onfinish; + } delete m; } @@ -742,177 +447,65 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) // write ------------------------------------ -tid_t Objecter::write(object_t oid, __u64 off, size_t len, - ceph_object_layout ol, const SnapContext& snapc, - bufferlist &bl, int flags, - Context *onack, Context *oncommit) -{ - OSDWrite *wr = prepare_write(snapc, bl, flags); - wr->extents.push_back(ObjectExtent(oid, off, len)); - wr->extents.front().layout = ol; - wr->extents.front().buffer_extents[0] = len; - modifyx(wr, onack, oncommit); - return last_tid; -} - -tid_t Objecter::write_full(object_t oid, - ceph_object_layout ol, const SnapContext& snapc, - bufferlist &bl, int flags, - Context *onack, Context *oncommit) -{ - OSDWrite *wr = prepare_write_full(snapc, bl, flags); - wr->extents.push_back(ObjectExtent(oid, 0, bl.length())); - wr->extents.front().layout = ol; - wr->extents.front().buffer_extents[0] = bl.length(); - modifyx(wr, onack, oncommit); - return last_tid; -} - - -// zero - -tid_t Objecter::zero(object_t oid, __u64 off, size_t len, - ceph_object_layout ol, const SnapContext& snapc, - int flags, - Context *onack, Context *oncommit) -{ - OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_ZERO, flags); - z->extents.push_back(ObjectExtent(oid, off, len)); - z->extents.front().layout = ol; - modifyx(z, onack, oncommit); - return last_tid; -} - -// remove - -tid_t Objecter::remove(object_t oid, - ceph_object_layout ol, const SnapContext& snapc, - int flags, - Context *onack, Context *oncommit) -{ - OSDModify *z = prepare_modify(snapc, CEPH_OSD_OP_DELETE, flags); - z->extents.push_back(ObjectExtent(oid, 0, 0)); - z->extents.front().layout = ol; - modifyx(z, onack, oncommit); - return last_tid; -} - - -// lock ops - -tid_t Objecter::lock(int op, object_t oid, int flags, - ceph_object_layout ol, - Context *onack, Context *oncommit) -{ - SnapContext snapc; // null is fine - OSDModify *l = prepare_modify(snapc, op, flags); - l->extents.push_back(ObjectExtent(oid, 0, 0)); - l->extents.front().layout = ol; - modifyx(l, onack, oncommit); - return last_tid; -} - - - -// generic modify ----------------------------------- - -tid_t Objecter::modifyx(OSDModify *wr, Context *onack, Context *oncommit) -{ - wr->onack = onack; - wr->oncommit = oncommit; - - // issue writes/whatevers - for (list::iterator it = wr->extents.begin(); - it != wr->extents.end(); - it++) - modifyx_submit(wr, *it); - - return last_tid; -} - - -tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) +tid_t Objecter::modify_submit(ModifyOp *wr) { // find - PG &pg = get_pg( pg_t(ex.layout.ol_pgid) ); + PG &pg = get_pg( pg_t(wr->layout.ol_pgid) ); // pick tid - tid_t tid; - if (usetid > 0) - tid = usetid; - else - tid = ++last_tid; + if (!wr->tid) + wr->tid = ++last_tid; assert(client_inc >= 0); // add to gather set(s) int flags = wr->flags; if (wr->onack) { flags |= CEPH_OSD_OP_ACK; - wr->waitfor_ack[tid] = ex; ++num_unacked; } else { dout(20) << " note: not requesting ack" << dendl; } if (wr->oncommit) { flags |= CEPH_OSD_OP_SAFE; - wr->waitfor_commit[tid] = ex; ++num_uncommitted; } else { dout(20) << " note: not requesting commit" << dendl; } - op_modify[tid] = wr; - pg.active_tids.insert(tid); + op_modify[wr->tid] = wr; + pg.active_tids.insert(wr->tid); pg.last = g_clock.now(); // send? - dout(10) << "modifyx_submit " << ceph_osd_op_name(wr->op) << " tid " << tid - << " oid " << ex.oid - << " " << ex.offset << "~" << ex.length - << " " << ex.layout + dout(10) << "modify_submit oid " << wr->oid + << " " << wr->ops << " tid " << wr->tid + << " " << wr->layout << " osd" << pg.primary() << dendl; if (pg.primary() >= 0) { - MOSDOp *m = new MOSDOp(client_inc, tid, true, - ex.oid, ex.layout, osdmap->get_epoch(), + MOSDOp *m = new MOSDOp(client_inc, wr->tid, true, + wr->oid, wr->layout, osdmap->get_epoch(), flags); - m->add_simple_op(wr->op, ex.offset, ex.length); + m->ops = wr->ops; m->set_snap_seq(wr->snapc.seq); m->get_snaps() = wr->snapc.snaps; if (inc_lock > 0) { wr->inc_lock = inc_lock; m->set_inc_lock(inc_lock); } - if (usetid > 0) - m->set_retry_attempt(true); - - if (wr->tid_version.count(tid)) - m->set_version(wr->tid_version[tid]); // we're replaying this op! - - // what type of op? - switch (wr->op) { - case CEPH_OSD_OP_WRITE: - case CEPH_OSD_OP_WRITEFULL: - { - // map buffer segments into this extent - // (may be fragmented bc of striping) - bufferlist cur; - for (map<__u32,__u32>::iterator bit = ex.buffer_extents.begin(); - bit != ex.buffer_extents.end(); - bit++) - ((OSDWrite*)wr)->bl.copy(bit->first, bit->second, cur); - assert(cur.length() == ex.length); - m->set_data(cur);//.claim(cur); - } - break; - } + m->set_retry_attempt(wr->attempts++); + if (wr->version != eversion_t()) + m->set_version(wr->version); // we're replaying this op! + + m->set_data(wr->bl); + messenger->send_message(m, osdmap->get_inst(pg.primary())); } else maybe_request_map(); dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; - return tid; + return wr->tid; } @@ -934,7 +527,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) << (m->is_safe() ? " commit":" ack") << " v " << m->get_version() << " in " << m->get_pg() << dendl; - OSDModify *wr = op_modify[ tid ]; + ModifyOp *wr = op_modify[ tid ]; Context *onack = 0; Context *oncommit = 0; @@ -970,11 +563,7 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl; if (wr->onack) num_unacked--; if (wr->oncommit) num_uncommitted--; - if (wr->waitfor_ack.count(tid)) - modifyx_submit(wr, wr->waitfor_ack[tid]); - else if (wr->waitfor_commit.count(tid)) - modifyx_submit(wr, wr->waitfor_commit[tid]); - else assert(0); + modify_submit(wr); delete m; return; } @@ -982,75 +571,32 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m) assert(m->get_result() >= 0); // FIXME // ack|commit -> ack - if (wr->waitfor_ack.count(tid)) { - wr->waitfor_ack.erase(tid); - num_unacked--; + if (wr->onack) { dout(15) << "handle_osd_modify_reply ack" << dendl; - - /* - osd uses v to reorder during replay, but doesn't preserve it - if (wr->tid_version.count(tid) && - wr->tid_version[tid].version != m->get_version().version) { - dout(-10) << "handle_osd_modify_reply WARNING: replay of tid " << tid - << " did not achieve previous ordering" << dendl; - } - */ - wr->tid_version[tid] = m->get_version(); - - if (wr->waitfor_ack.empty()) { - onack = wr->onack; - wr->onack = 0; // only do callback once - - // buffer uncommitted? - if (!g_conf.objecter_buffer_uncommitted && - (wr->op == CEPH_OSD_OP_WRITE || wr->op == CEPH_OSD_OP_WRITEFULL)) { - // discard buffer! - ((OSDWrite*)wr)->bl.clear(); - } - } else { - dout(15) << "handle_osd_modify_reply still need " - << wr->waitfor_ack.size() << " acks" << dendl; - } + wr->version = m->get_version(); + onack = wr->onack; + wr->onack = 0; // only do callback once + num_unacked--; } - if (m->is_safe()) { - // safe - /* - osd uses v to reorder during replay, but doesn't preserve it - assert(wr->tid_version.count(tid) == 0 || - m->get_version() == wr->tid_version[tid]); - */ - - wr->waitfor_commit.erase(tid); - num_uncommitted--; + if (wr->oncommit) { dout(15) << "handle_osd_modify_reply safe" << dendl; - - if (wr->waitfor_commit.empty()) { - oncommit = wr->oncommit; - wr->oncommit = 0; - } else { - dout(15) << "handle_osd_modify_reply still need " - << wr->waitfor_commit.size() << " safes" << dendl; - } + oncommit = wr->oncommit; + wr->oncommit = 0; + num_uncommitted--; } // done? done: // done with this tid? - if (wr->waitfor_commit.count(tid) == 0 && - wr->waitfor_ack.count(tid) == 0) { + if (!wr->onack && !wr->oncommit) { assert(pg.active_tids.count(tid)); pg.active_tids.erase(tid); - dout(15) << "handle_osd_modify_reply pg " << m->get_pg() + dout(15) << "handle_osd_modify_reply completed tid " << tid << ", pg " << m->get_pg() << " still has " << pg.active_tids << dendl; if (pg.active_tids.empty()) close_pg( m->get_pg() ); op_modify.erase( tid ); - } - - // done with this overall op? - if (wr->onack == 0 && wr->oncommit == 0) { - dout(15) << "handle_osd_modify_reply completed" << dendl; delete wr; } @@ -1215,11 +761,9 @@ void Objecter::dump_active() { dout(10) << "dump_active" << dendl; - for (hash_map::iterator p = op_stat.begin(); p != op_stat.end(); p++) - dout(10) << " stat " << p->first << dendl; - for (hash_map::iterator p = op_read.begin(); p != op_read.end(); p++) + for (hash_map::iterator p = op_read.begin(); p != op_read.end(); p++) dout(10) << " read " << p->first << dendl; - for (hash_map::iterator p = op_modify.begin(); p != op_modify.end(); p++) + for (hash_map::iterator p = op_modify.begin(); p != op_modify.end(); p++) dout(10) << " modify " << p->first << dendl; } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 11ad8c18a5e7e..c00fa4c824012 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -68,86 +68,55 @@ class Objecter { /*** track pending operations ***/ // read public: - class OSDOp { - public: - list extents; - int inc_lock; - OSDOp() : inc_lock(0) {} - virtual ~OSDOp() {} - }; - - class OSDRead : public OSDOp { - public: - bufferlist *bl; - Context *onfinish; - map ops; - map read_data; // bits of data as they come back + struct ReadOp { + object_t oid; + ceph_object_layout layout; + vector ops; + bufferlist *pbl; + __u64 *psize; int flags; + Context *onfinish; + int inc_lock; - OSDRead(bufferlist *b, int f) : OSDOp(), bl(b), onfinish(0), flags(f) { - if (bl) - bl->clear(); - } - }; - - OSDRead *prepare_read(bufferlist *b, int f) { - return new OSDRead(b, f); - } - - class OSDStat : public OSDOp { - public: tid_t tid; - __u64 *size; // where the size goes. - int flags; - Context *onfinish; - OSDStat(__u64 *s, int f) : OSDOp(), tid(0), size(s), flags(f), onfinish(0) { } + int attempts; + + ReadOp(object_t o, ceph_object_layout& ol, int f, Context *of, int n=0) : + oid(o), layout(ol), ops(n), + pbl(0), psize(0), flags(f), onfinish(of), inc_lock(-1), + tid(0), attempts(0) { + for (int i=0; i extents; + vector ops; + bufferlist bl; int flags; - Context *onack; - Context *oncommit; - map waitfor_ack; - map tid_version; - map waitfor_commit; - - OSDModify(const SnapContext& sc, int o, int f) : OSDOp(), snapc(sc), op(o), flags(f), onack(0), oncommit(0) {} - }; + Context *onack, *oncommit; + int inc_lock; - OSDModify *prepare_modify(const SnapContext& sc, int o, int f) { - return new OSDModify(sc, o, f); - } - - // write (includes the bufferlist) - class OSDWrite : public OSDModify { - public: - bufferlist bl; - OSDWrite(int op, const SnapContext& sc, bufferlist &b, int f) : OSDModify(sc, op, f), bl(b) {} + tid_t tid; + eversion_t version; + int attempts; + + ModifyOp(object_t o, ceph_object_layout& l, const SnapContext& sc, int f, Context *ac, Context *co, int n=0) : + oid(o), layout(l), snapc(sc), ops(n), flags(f), onack(ac), oncommit(co), inc_lock(-1), + tid(0), attempts(0) { + for (int i=0; i op_stat; - hash_map op_read; - hash_map op_modify; + hash_map op_read; + hash_map op_modify; /** * track pending ops by pg @@ -207,16 +176,14 @@ class Objecter { public: void dispatch(Message *m); void handle_osd_op_reply(class MOSDOpReply *m); - void handle_osd_stat_reply(class MOSDOpReply *m); void handle_osd_read_reply(class MOSDOpReply *m); void handle_osd_modify_reply(class MOSDOpReply *m); void handle_osd_lock_reply(class MOSDOpReply *m); void handle_osd_map(class MOSDMap *m); private: - tid_t readx_submit(OSDRead *rd, ObjectExtent& ex, bool retry=false); - tid_t modifyx_submit(OSDModify *wr, ObjectExtent& ex, tid_t tid=0); - tid_t stat_submit(OSDStat *st); + tid_t read_submit(ReadOp *rd); + tid_t modify_submit(ModifyOp *wr); // public interface public: @@ -232,26 +199,70 @@ class Objecter { void set_inc_lock(int l) { inc_lock = l; } - // med level - tid_t readx(OSDRead *read, Context *onfinish); - tid_t modifyx(OSDModify *wr, Context *onack, Context *oncommit); + // + tid_t stat(object_t oid, ceph_object_layout ol, + __u64 *size, int flags, + Context *onfinish) { + ReadOp *rd = new ReadOp(oid, ol, flags, onfinish, 1); + rd->psize = size; + rd->ops[0].op = CEPH_OSD_OP_STAT; + return read_submit(rd); + } - // even lazier - tid_t read(object_t oid, __u64 off, size_t len, ceph_object_layout ol, bufferlist *bl, int flags, - Context *onfinish); - tid_t stat(object_t oid, __u64 *size, ceph_object_layout ol, int flags, Context *onfinish); + tid_t read(object_t oid, ceph_object_layout ol, + __u64 off, size_t len, bufferlist *bl, int flags, + Context *onfinish) { + ReadOp *rd = new ReadOp(oid, ol, flags, onfinish, 1); + rd->pbl = bl; + rd->ops[0].op = CEPH_OSD_OP_READ; + rd->ops[0].offset = off; + rd->ops[0].length = len; + return read_submit(rd); + } - tid_t write(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, int flags, - Context *onack, Context *oncommit); - tid_t write_full(object_t oid, ceph_object_layout ol, const SnapContext& snapc, bufferlist &bl, int flags, - Context *onack, Context *oncommit); - tid_t zero(object_t oid, __u64 off, size_t len, ceph_object_layout ol, const SnapContext& snapc, int flags, - Context *onack, Context *oncommit); - tid_t remove(object_t oid, ceph_object_layout ol, const SnapContext& snapc, int flags, - Context *onack, Context *oncommit); + tid_t write(object_t oid, ceph_object_layout ol, + __u64 off, size_t len, const SnapContext& snapc, bufferlist &bl, int flags, + Context *onack, Context *oncommit) { + ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1); + wr->bl = bl; + wr->ops[0].op = CEPH_OSD_OP_WRITE; + wr->ops[0].offset = off; + wr->ops[0].length = len; + return modify_submit(wr); + } + tid_t write_full(object_t oid, ceph_object_layout ol, + const SnapContext& snapc, bufferlist &bl, int flags, + Context *onack, Context *oncommit) { + ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1); + wr->bl = bl; + wr->ops[0].op = CEPH_OSD_OP_WRITEFULL; + wr->ops[0].offset = 0; + wr->ops[0].length = bl.length(); + return modify_submit(wr); + } + tid_t zero(object_t oid, ceph_object_layout ol, + __u64 off, size_t len, const SnapContext& snapc, int flags, + Context *onack, Context *oncommit) { + ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1); + wr->ops[0].op = CEPH_OSD_OP_ZERO; + wr->ops[0].offset = 0; + wr->ops[0].length = len; + return modify_submit(wr); + } + tid_t remove(object_t oid, ceph_object_layout ol, + const SnapContext& snapc, int flags, + Context *onack, Context *oncommit) { + ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1); + wr->ops[0].op = CEPH_OSD_OP_DELETE; + return modify_submit(wr); + } - // no snapc for lock ops - tid_t lock(int op, object_t oid, int flags, ceph_object_layout ol, Context *onack, Context *oncommit); + tid_t lock(object_t oid, ceph_object_layout ol, int op, int flags, Context *onack, Context *oncommit) { + SnapContext snapc; // no snapc for lock ops + ModifyOp *wr = new ModifyOp(oid, ol, snapc, flags, onack, oncommit, 1); + wr->ops[0].op = op; + return modify_submit(wr); + } @@ -280,14 +291,14 @@ class Objecter { void sg_read(vector& extents, bufferlist *bl, int flags, Context *onfinish) { if (extents.size() == 1) { - read(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout, + read(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length, bl, flags, onfinish); } else { C_Gather *g = new C_Gather; vector resultbl(extents.size()); int i=0; for (vector::iterator p = extents.begin(); p != extents.end(); p++) { - read(p->oid, p->offset, p->length, p->layout, + read(p->oid, p->layout, p->offset, p->length, &resultbl[i++], flags, onfinish); } g->set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish)); @@ -298,7 +309,7 @@ class Objecter { void sg_write(vector& extents, const SnapContext& snapc, bufferlist bl, int flags, Context *onack, Context *oncommit) { if (extents.size() == 1) { - write(extents[0].oid, extents[0].offset, extents[0].length, extents[0].layout, + write(extents[0].oid, extents[0].layout, extents[0].offset, extents[0].length, snapc, bl, flags, onack, oncommit); } else { C_Gather *gack = 0, *gcom = 0; @@ -313,7 +324,7 @@ class Objecter { bit++) bl.copy(bit->first, bit->second, cur); assert(cur.length() == p->length); - write(p->oid, p->offset, p->length, p->layout, + write(p->oid, p->layout, p->offset, p->length, snapc, cur, flags, gack ? gack->new_sub():0, gcom ? gcom->new_sub():0); -- 2.39.5