osd->logger->tinc(tag_lat, latency);
}
+struct OnReadComplete : public Context {
+ ReplicatedPG *pg;
+ ReplicatedPG::OpContext *opcontext;
+ OnReadComplete(
+ ReplicatedPG *pg,
+ ReplicatedPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
+ void finish(int r) {
+ if (r < 0)
+ opcontext->async_read_result = r;
+ opcontext->finish_read(pg);
+ }
+ ~OnReadComplete() {}
+};
+
+// OpContext
+void ReplicatedPG::OpContext::start_async_reads(ReplicatedPG *pg)
+{
+ inflightreads = 1;
+ pg->pgbackend->objects_read_async(
+ obc->obs.oi.soid,
+ pending_async_reads,
+ new OnReadComplete(pg, this));
+ pending_async_reads.clear();
+}
+void ReplicatedPG::OpContext::finish_read(ReplicatedPG *pg)
+{
+ assert(inflightreads > 0);
+ --inflightreads;
+ if (async_reads_complete()) {
+ set<OpContext*>::iterator iter = pg->in_progress_async_reads.find(this);
+ assert(iter != pg->in_progress_async_reads.end());
+ pg->in_progress_async_reads.erase(iter);
+ pg->complete_read_ctx(async_read_result, this);
+ }
+}
class CopyFromCallback: public ReplicatedPG::CopyCallback {
public:
}
};
-
-
// ======================
// PGBackend::Listener
// possible to construct an operation that does a read, does a guard
// check (e.g., CMPXATTR), and then a write. Then we either succeed
// with the write, or return a CMPXATTR and the read value.
- if ((ctx->op_t->empty() && !ctx->modify) || result < 0) {
- // read.
- ctx->reply->claim_op_out_data(ctx->ops);
- ctx->reply->get_header().data_off = ctx->data_off;
- } else {
+ if (!((ctx->op_t->empty() && !ctx->modify) || result < 0)) {
// write. normalize the result code.
if (result > 0) {
dout(20) << " zeroing write result code " << result << dendl;
// read or error?
if (ctx->op_t->empty() || result < 0) {
- MOSDOpReply *reply = ctx->reply;
- ctx->reply = NULL;
-
- if (result >= 0) {
- log_op_stats(ctx);
- publish_stats_to_osd();
-
- // on read, return the current object version
- reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
- } else if (result == -ENOENT) {
- // on ENOENT, set a floor for what the next user version will be.
- reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
+ if (ctx->pending_async_reads.empty()) {
+ complete_read_ctx(result, ctx);
+ } else {
+ in_progress_async_reads.insert(ctx);
+ ctx->start_async_reads(this);
}
-
- reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- osd->send_message_osd_client(reply, m->get_connection());
- close_op_ctx(ctx);
return;
}
return 0;
}
+struct FillInExtent : public Context {
+ ceph_le64 *r;
+ FillInExtent(ceph_le64 *r) : r(r) {}
+ void finish(int _r) {
+ if (_r >= 0) {
+ *r = _r;
+ }
+ }
+};
+
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
int result = 0;
case CEPH_OSD_OP_READ:
++ctx->num_read;
{
- // read into a buffer
- bufferlist bl;
- int r = pgbackend->objects_read_sync(
- soid, op.extent.offset, op.extent.length, &bl);
- if (first_read) {
- first_read = false;
- ctx->data_off = op.extent.offset;
- }
- osd_op.outdata.claim_append(bl);
- if (r >= 0)
- op.extent.length = r;
- else {
- result = r;
- op.extent.length = 0;
- }
- ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
- ctx->delta_stats.num_rd++;
- dout(10) << " read got " << r << " / " << op.extent.length << " bytes from obj " << soid << dendl;
-
__u32 seq = oi.truncate_seq;
// are we beyond truncate_size?
if ( (seq < op.extent.truncate_seq) &&
unsigned trim = to-from;
op.extent.length = op.extent.length - trim;
+ }
- bufferlist keep;
-
- // keep first part of osd_op.outdata; trim at truncation point
- dout(10) << " obj " << soid << " seq " << seq
- << ": trimming overlap " << from << "~" << trim << dendl;
- keep.substr_of(osd_op.outdata, 0, osd_op.outdata.length() - trim);
- osd_op.outdata.claim(keep);
+ // read into a buffer
+ bufferlist bl;
+ if (pool.info.ec_pool()) {
+ ctx->pending_async_reads.push_back(
+ make_pair(
+ make_pair(op.extent.offset, op.extent.length),
+ make_pair(&osd_op.outdata, new FillInExtent(&op.extent.length))));
+ dout(10) << " async_read noted for " << soid << dendl;
+ } else {
+ int r = pgbackend->objects_read_sync(
+ soid, op.extent.offset, op.extent.length, &osd_op.outdata);
+ if (r >= 0)
+ op.extent.length = r;
+ else {
+ result = r;
+ op.extent.length = 0;
+ }
+ dout(10) << " read got " << r << " / " << op.extent.length
+ << " bytes from obj " << soid << dendl;
+ }
+ if (first_read) {
+ first_read = false;
+ ctx->data_off = op.extent.offset;
}
+ ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+ ctx->delta_stats.num_rd++;
+
}
break;
}
}
+void ReplicatedPG::complete_read_ctx(int result, OpContext *ctx)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
+ assert(ctx->async_reads_complete());
+ ctx->reply->claim_op_out_data(ctx->ops);
+ ctx->reply->get_header().data_off = ctx->data_off;
+
+ MOSDOpReply *reply = ctx->reply;
+ ctx->reply = NULL;
+
+ if (result >= 0) {
+ log_op_stats(ctx);
+ publish_stats_to_osd();
+
+ // on read, return the current object version
+ reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
+ } else if (result == -ENOENT) {
+ // on ENOENT, set a floor for what the next user version will be.
+ reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
+ }
+
+ reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ osd->send_message_osd_client(reply, m->get_connection());
+ close_op_ctx(ctx);
+}
+
// ========================================================================
// copyfrom
context_registry_on_change();
+ for (set<OpContext*>::iterator i = in_progress_async_reads.begin();
+ i != in_progress_async_reads.end();
+ in_progress_async_reads.erase(i++)) {
+ close_op_ctx(*i);
+ }
+
cancel_copy_ops(is_primary());
cancel_flush_ops(is_primary());
pending_attrs.clear();
}
+ // pending async reads <off, len> -> <outbl, outr>
+ list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > > pending_async_reads;
+ int async_read_result;
+ unsigned inflightreads;
+ friend struct OnReadComplete;
+ void start_async_reads(ReplicatedPG *pg);
+ void finish_read(ReplicatedPG *pg);
+ bool async_reads_complete() {
+ return inflightreads == 0;
+ }
+
ObjectModDesc mod_desc;
enum { W_LOCK, R_LOCK, NONE } lock_to_release;
num_read(0),
num_write(0),
copy_cb(NULL),
+ async_read_result(0),
+ inflightreads(0),
lock_to_release(NONE) {
if (_ssc) {
new_snapset = _ssc->snapset;
assert(lock_to_release == NONE);
if (reply)
reply->put();
+ for (list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > >::iterator i =
+ pending_async_reads.begin();
+ i != pending_async_reads.end();
+ pending_async_reads.erase(i++)) {
+ delete i->second.second;
+ }
}
};
+ friend class OpContext;
/*
* State on the PG primary associated with the replicated mutation
bool can_skip_promote(OpRequestRef op, ObjectContextRef obc);
int prepare_transaction(OpContext *ctx);
+ set<OpContext*> in_progress_async_reads;
+ void complete_read_ctx(int result, OpContext *ctx);
// pg on-disk content
void check_local();