From: Samuel Just Date: Fri, 6 Dec 2013 21:54:04 +0000 (-0800) Subject: ReplicatedPG: support async reads on ec pools X-Git-Tag: v0.78~286^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0bba79b7222371bc4ff568edfdf3667f55c36012;p=ceph.git ReplicatedPG: support async reads on ec pools Signed-off-by: Samuel Just --- diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 0b750420519a..cda7db383f37 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -425,9 +425,8 @@ virtual void objects_read_async( const hobject_t &hoid, - uint64_t off, - uint64_t len, - bufferlist *bl, + const list, + pair > > &to_read, Context *on_complete) = 0; }; diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 602365416255..1d1809e794e9 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -347,12 +347,26 @@ struct AsyncReadCallback : public GenContext { }; void ReplicatedBackend::objects_read_async( const hobject_t &hoid, - uint64_t off, - uint64_t len, - bufferlist *bl, + const list, + pair > > &to_read, Context *on_complete) { - int r = osd->store->read(coll, hoid, off, len, *bl); + int r = 0; + for (list, + pair > >::const_iterator i = + to_read.begin(); + i != to_read.end() && r >= 0; + ++i) { + int _r = osd->store->read(coll, hoid, i->first.first, + i->first.second, *(i->second.first)); + if (i->second.second) { + osd->gen_wq.queue( + get_parent()->bless_gencontext( + new AsyncReadCallback(_r, i->second.second))); + } + if (_r < 0) + r = _r; + } osd->gen_wq.queue( get_parent()->bless_gencontext( new AsyncReadCallback(r, on_complete))); diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index e967d7dbcbd5..4f4ae6454af6 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -184,9 +184,8 @@ public: void objects_read_async( const hobject_t &hoid, - uint64_t off, - uint64_t len, - bufferlist *bl, + const list, + pair > > &to_read, Context *on_complete); private: diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 013b350b9a6c..03bd4e4057a9 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -101,6 +101,41 @@ static void log_subop_stats( osd->logger->tinc(tag_lat, latency); } +struct OnReadComplete : public Context { + ReplicatedPG *pg; + ReplicatedPG::OpContext *opcontext; + OnReadComplete( + ReplicatedPG *pg, + ReplicatedPG::OpContext *ctx) : pg(pg), opcontext(ctx) {} + void finish(int r) { + if (r < 0) + opcontext->async_read_result = r; + opcontext->finish_read(pg); + } + ~OnReadComplete() {} +}; + +// OpContext +void ReplicatedPG::OpContext::start_async_reads(ReplicatedPG *pg) +{ + inflightreads = 1; + pg->pgbackend->objects_read_async( + obc->obs.oi.soid, + pending_async_reads, + new OnReadComplete(pg, this)); + pending_async_reads.clear(); +} +void ReplicatedPG::OpContext::finish_read(ReplicatedPG *pg) +{ + assert(inflightreads > 0); + --inflightreads; + if (async_reads_complete()) { + set::iterator iter = pg->in_progress_async_reads.find(this); + assert(iter != pg->in_progress_async_reads.end()); + pg->in_progress_async_reads.erase(iter); + pg->complete_read_ctx(async_read_result, this); + } +} class CopyFromCallback: public ReplicatedPG::CopyCallback { public: @@ -146,8 +181,6 @@ public: } }; - - // ====================== // PGBackend::Listener @@ -1621,11 +1654,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // possible to construct an operation that does a read, does a guard // check (e.g., CMPXATTR), and then a write. Then we either succeed // with the write, or return a CMPXATTR and the read value. - if ((ctx->op_t->empty() && !ctx->modify) || result < 0) { - // read. - ctx->reply->claim_op_out_data(ctx->ops); - ctx->reply->get_header().data_off = ctx->data_off; - } else { + if (!((ctx->op_t->empty() && !ctx->modify) || result < 0)) { // write. normalize the result code. if (result > 0) { dout(20) << " zeroing write result code " << result << dendl; @@ -1636,23 +1665,12 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) // read or error? if (ctx->op_t->empty() || result < 0) { - MOSDOpReply *reply = ctx->reply; - ctx->reply = NULL; - - if (result >= 0) { - log_op_stats(ctx); - publish_stats_to_osd(); - - // on read, return the current object version - reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version); - } else if (result == -ENOENT) { - // on ENOENT, set a floor for what the next user version will be. - reply->set_enoent_reply_versions(info.last_update, info.last_user_version); + if (ctx->pending_async_reads.empty()) { + complete_read_ctx(result, ctx); + } else { + in_progress_async_reads.insert(ctx); + ctx->start_async_reads(this); } - - reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - osd->send_message_osd_client(reply, m->get_connection()); - close_op_ctx(ctx); return; } @@ -2718,6 +2736,16 @@ static int check_offset_and_length(uint64_t offset, uint64_t length, uint64_t ma return 0; } +struct FillInExtent : public Context { + ceph_le64 *r; + FillInExtent(ceph_le64 *r) : r(r) {} + void finish(int _r) { + if (_r >= 0) { + *r = _r; + } + } +}; + int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) { int result = 0; @@ -2802,25 +2830,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) case CEPH_OSD_OP_READ: ++ctx->num_read; { - // read into a buffer - bufferlist bl; - int r = pgbackend->objects_read_sync( - soid, op.extent.offset, op.extent.length, &bl); - if (first_read) { - first_read = false; - ctx->data_off = op.extent.offset; - } - osd_op.outdata.claim_append(bl); - if (r >= 0) - op.extent.length = r; - else { - result = r; - op.extent.length = 0; - } - ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10); - ctx->delta_stats.num_rd++; - dout(10) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl; - __u32 seq = oi.truncate_seq; // are we beyond truncate_size? if ( (seq < op.extent.truncate_seq) && @@ -2832,15 +2841,35 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) unsigned trim = to-from; op.extent.length = op.extent.length - trim; + } - bufferlist keep; - - // keep first part of osd_op.outdata; trim at truncation point - dout(10) << " obj " << soid << " seq " << seq - << ": trimming overlap " << from << "~" << trim << dendl; - keep.substr_of(osd_op.outdata, 0, osd_op.outdata.length() - trim); - osd_op.outdata.claim(keep); + // read into a buffer + bufferlist bl; + if (pool.info.ec_pool()) { + ctx->pending_async_reads.push_back( + make_pair( + make_pair(op.extent.offset, op.extent.length), + make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length)))); + dout(10) << " async_read noted for " << soid << dendl; + } else { + int r = pgbackend->objects_read_sync( + soid, op.extent.offset, op.extent.length, &osd_op.outdata); + if (r >= 0) + op.extent.length = r; + else { + result = r; + op.extent.length = 0; + } + dout(10) << " read got " << r << " / " << op.extent.length + << " bytes from obj " << soid << dendl; + } + if (first_read) { + first_read = false; + ctx->data_off = op.extent.offset; } + ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10); + ctx->delta_stats.num_rd++; + } break; @@ -4971,6 +5000,32 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type) } } +void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx) +{ + MOSDOp *m = static_cast(ctx->op->get_req()); + assert(ctx->async_reads_complete()); + ctx->reply->claim_op_out_data(ctx->ops); + ctx->reply->get_header().data_off = ctx->data_off; + + MOSDOpReply *reply = ctx->reply; + ctx->reply = NULL; + + if (result >= 0) { + log_op_stats(ctx); + publish_stats_to_osd(); + + // on read, return the current object version + reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version); + } else if (result == -ENOENT) { + // on ENOENT, set a floor for what the next user version will be. + reply->set_enoent_reply_versions(info.last_update, info.last_user_version); + } + + reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + osd->send_message_osd_client(reply, m->get_connection()); + close_op_ctx(ctx); +} + // ======================================================================== // copyfrom @@ -8600,6 +8655,12 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t) context_registry_on_change(); + for (set::iterator i = in_progress_async_reads.begin(); + i != in_progress_async_reads.end(); + in_progress_async_reads.erase(i++)) { + close_op_ctx(*i); + } + cancel_copy_ops(is_primary()); cancel_flush_ops(is_primary()); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 6cf4af74fe38..cb1c7327a0a1 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -449,6 +449,18 @@ public: pending_attrs.clear(); } + // pending async reads -> + list, + pair > > pending_async_reads; + int async_read_result; + unsigned inflightreads; + friend struct OnReadComplete; + void start_async_reads(ReplicatedPG *pg); + void finish_read(ReplicatedPG *pg); + bool async_reads_complete() { + return inflightreads == 0; + } + ObjectModDesc mod_desc; enum { W_LOCK, R_LOCK, NONE } lock_to_release; @@ -469,6 +481,8 @@ public: num_read(0), num_write(0), copy_cb(NULL), + async_read_result(0), + inflightreads(0), lock_to_release(NONE) { if (_ssc) { new_snapset = _ssc->snapset; @@ -487,8 +501,16 @@ public: assert(lock_to_release == NONE); if (reply) reply->put(); + for (list, + pair > >::iterator i = + pending_async_reads.begin(); + i != pending_async_reads.end(); + pending_async_reads.erase(i++)) { + delete i->second.second; + } } }; + friend class OpContext; /* * State on the PG primary associated with the replicated mutation @@ -869,6 +891,8 @@ protected: bool can_skip_promote(OpRequestRef op, ObjectContextRef obc); int prepare_transaction(OpContext *ctx); + set in_progress_async_reads; + void complete_read_ctx(int result, OpContext *ctx); // pg on-disk content void check_local();