From: myoungwon oh Date: Fri, 2 Jun 2017 13:47:23 +0000 (+0900) Subject: osd: code to proxy read() to the chunked object. X-Git-Tag: v13.0.1~59^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7118574367db9ced8074726f5faadf58d5b0a8b8;p=ceph.git osd: code to proxy read() to the chunked object. Signed-off-by: Myoungwon Oh --- diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 8aa077ffe95..273acf6df22 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2343,6 +2343,17 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( } return cache_result_t::HANDLED_PROXY; case object_manifest_t::TYPE_CHUNKED: + if (can_proxy_chunked_read(op, obc)) { + do_proxy_chunked_op(op, obc->obs.oi.soid, obc, write_ordered); + return cache_result_t::HANDLED_PROXY; + } + if (obc->obs.oi.size == 0) { + const MOSDOp *m = static_cast(op->get_req()); + const object_locator_t oloc = m->get_object_locator(); + promote_object(obc, obc->obs.oi.soid, oloc, op, NULL); + return cache_result_t::BLOCKED_PROMOTE; + } + return cache_result_t::NOOP; default: assert(0 == "unrecognized manifest type"); } @@ -2715,6 +2726,61 @@ struct C_ProxyRead : public Context { } }; +struct C_ProxyChunkRead : public Context { + PrimaryLogPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + ceph_tid_t tid; + PrimaryLogPG::ProxyReadOpRef prdop; + utime_t start; + ObjectOperation *obj_op; + int op_index; + uint64_t req_offset; + ObjectContextRef obc; + uint64_t req_total_len; + C_ProxyChunkRead(PrimaryLogPG *p, hobject_t o, epoch_t lpr, + const PrimaryLogPG::ProxyReadOpRef& prd) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0), prdop(prd), start(ceph_clock_now()), obj_op(NULL) + {} + void finish(int r) override { + if (prdop->canceled) + return; + pg->lock(); + if (prdop->canceled) { + pg->unlock(); + return; + } + if (last_peering_reset == pg->get_last_peering_reset()) { + if (r >= 0) { + if (!prdop->ops[op_index].outdata.length()) { + assert(req_total_len); + bufferlist list; + bufferptr bptr(req_total_len); + list.push_back(std::move(bptr)); + prdop->ops[op_index].outdata.append(list); + } + assert(obj_op); + uint64_t copy_offset; + if (req_offset >= prdop->ops[op_index].op.extent.offset) { + copy_offset = req_offset - prdop->ops[op_index].op.extent.offset; + } else { + copy_offset = 0; + } + prdop->ops[op_index].outdata.copy_in(copy_offset, obj_op->ops[0].outdata.length(), + obj_op->ops[0].outdata.c_str()); + } + + pg->finish_proxy_read(oid, tid, r); + pg->osd->logger->tinc(l_osd_tier_r_lat, ceph_clock_now() - start); + if (obj_op) { + delete obj_op; + } + } + pg->unlock(); + } +}; + void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc) { // NOTE: non-const here because the ProxyReadOp needs mutable refs to @@ -2729,7 +2795,6 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc) oloc = object_locator_t(obc->obs.oi.manifest.redirect_target); soid = obc->obs.oi.manifest.redirect_target; break; - case object_manifest_t::TYPE_CHUNKED: default: assert(0 == "unrecognized manifest type"); } @@ -2826,6 +2891,12 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r) q->second.erase(it); if (q->second.size() == 0) { in_progress_proxy_ops.erase(oid); + } else if (std::find(q->second.begin(), + q->second.end(), + prdop->op) != q->second.end()) { + /* multiple read case */ + dout(20) << __func__ << " " << oid << " is not completed " << dendl; + return; } osd->logger->inc(l_osd_tier_proxy_read); @@ -2938,7 +3009,6 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, ObjectContextRef obc) oloc = object_locator_t(obc->obs.oi.manifest.redirect_target); soid = obc->obs.oi.manifest.redirect_target; break; - case object_manifest_t::TYPE_CHUNKED: default: assert(0 == "unrecognized manifest type"); } @@ -2976,6 +3046,174 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, ObjectContextRef obc) in_progress_proxy_ops[soid].push_back(op); } +void PrimaryLogPG::do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid, + ObjectContextRef obc, bool write_ordered) +{ + MOSDOp *m = static_cast(op->get_nonconst_req()); + OSDOp *osd_op = NULL; + for (unsigned int i = 0; i < m->ops.size(); i++) { + osd_op = &m->ops[i]; + uint64_t cursor = osd_op->op.extent.offset; + uint64_t op_length = osd_op->op.extent.offset + osd_op->op.extent.length; + uint64_t chunk_length = 0, chunk_index = 0, req_len = 0; + object_manifest_t *manifest = &obc->obs.oi.manifest; + map > chunk_read; + + while (cursor < op_length) { + chunk_index = 0; + chunk_length = 0; + /* find the right chunk position for cursor */ + for (auto &p : manifest->chunk_map) { + if (p.first <= cursor && p.first + p.second.length > cursor) { + chunk_length = p.second.length; + chunk_index = p.first; + break; + } + } + /* no index */ + if (!chunk_index && !chunk_length) { + if (cursor == osd_op->op.extent.offset) { + OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, this); + ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); + ctx->data_off = osd_op->op.extent.offset; + ctx->ignore_log_op_stats = true; + complete_read_ctx(0, ctx); + } + break; + } + uint64_t next_length = chunk_length; + /* the size to read -> | op length | */ + /* | a chunk | */ + if (cursor + next_length > op_length) { + next_length = op_length - cursor; + } + /* the size to read -> | op length | */ + /* | a chunk | */ + if (cursor + next_length > chunk_index + chunk_length) { + next_length = chunk_index + chunk_length - cursor; + } + + chunk_read[cursor] = {{chunk_index, next_length}}; + cursor += next_length; + } + + req_len = cursor - osd_op->op.extent.offset; + for (auto &p : chunk_read) { + auto chunks = p.second.begin(); + dout(20) << __func__ << " chunk_index: " << chunks->first + << " next_length: " << chunks->second << " cursor: " + << p.first << dendl; + do_proxy_chunked_read(op, obc, i, chunks->first, p.first, chunks->second, req_len); + } + } +} + +void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index, + uint64_t chunk_index, uint64_t req_offset, uint64_t req_length, + uint64_t req_total_len) +{ + MOSDOp *m = static_cast(op->get_nonconst_req()); + object_manifest_t *manifest = &obc->obs.oi.manifest; + if (!manifest->chunk_map.count(chunk_index)) { + return; + } + uint64_t chunk_length = manifest->chunk_map[chunk_index].length; + hobject_t soid = manifest->chunk_map[chunk_index].oid; + hobject_t ori_soid = m->get_hobj(); + object_locator_t oloc(soid); + unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY; + + if (!chunk_length || soid == hobject_t()) { + return; + } + + /* same as do_proxy_read() */ + flags |= m->get_flags() & (CEPH_OSD_FLAG_RWORDERED | + CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ENFORCE_SNAPC | + CEPH_OSD_FLAG_MAP_SNAP_CLONE); + + dout(10) << __func__ << " Start do chunk proxy read for " << *m + << " index: " << op_index << " oid: " << soid.oid.name << " req_offset: " << req_offset + << " req_length: " << req_length << dendl; + + ProxyReadOpRef prdop(std::make_shared(op, ori_soid, m->ops)); + + ObjectOperation *pobj_op = new ObjectOperation; + OSDOp &osd_op = pobj_op->add_op(m->ops[op_index].op.op); + + if (chunk_index <= req_offset) { + osd_op.op.extent.offset = manifest->chunk_map[chunk_index].offset + req_offset - chunk_index; + } else { + assert(0 == "chunk_index > req_offset"); + } + osd_op.op.extent.length = req_length; + + ObjectOperation obj_op; + obj_op.dup(pobj_op->ops); + + C_ProxyChunkRead *fin = new C_ProxyChunkRead(this, ori_soid, get_last_peering_reset(), + prdop); + fin->obj_op = pobj_op; + fin->op_index = op_index; + fin->req_offset = req_offset; + fin->obc = obc; + fin->req_total_len = req_total_len; + + unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers); + ceph_tid_t tid = osd->objecter->read( + soid.oid, oloc, obj_op, + m->get_snapid(), NULL, + flags, new C_OnFinisher(fin, osd->objecter_finishers[n]), + &prdop->user_version, + &prdop->data_offset, + m->get_features()); + fin->tid = tid; + prdop->objecter_tid = tid; + proxyread_ops[tid] = prdop; + in_progress_proxy_ops[ori_soid].push_back(op); +} + +bool PrimaryLogPG::can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc) +{ + MOSDOp *m = static_cast(op->get_nonconst_req()); + OSDOp *osd_op = NULL; + bool ret = true; + for (unsigned int i = 0; i < m->ops.size(); i++) { + osd_op = &m->ops[i]; + ceph_osd_op op = osd_op->op; + switch (op.op) { + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_SYNC_READ: { + uint64_t cursor = osd_op->op.extent.offset; + uint64_t remain = osd_op->op.extent.length; + + /* requested chunks exist in chunk_map ? */ + for (auto &p : obc->obs.oi.manifest.chunk_map) { + if (p.first <= cursor && p.first + p.second.length > cursor) { + if (p.second.length >= remain) { + remain = 0; + break; + } else { + remain = remain - p.second.length; + } + cursor += p.second.length; + } + } + + if (remain) { + dout(20) << __func__ << " requested chunks don't exist in chunk_map " << dendl; + return false; + } + continue; + } + default: + return false; + } + } + return ret; +} + void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r) { dout(10) << __func__ << " " << oid << " tid " << tid @@ -3008,6 +3246,16 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r) in_progress_op.erase(it); if (in_progress_op.size() == 0) { in_progress_proxy_ops.erase(oid); + } else if (std::find(in_progress_op.begin(), + in_progress_op.end(), + pwop->op) != in_progress_op.end()) { + if (pwop->ctx) + delete pwop->ctx; + pwop->ctx = NULL; + dout(20) << __func__ << " " << oid << " tid " << tid + << " in_progress_op size: " + << in_progress_op.size() << dendl; + return; } osd->logger->inc(l_osd_tier_proxy_write); @@ -3110,6 +3358,12 @@ void PrimaryLogPG::promote_object(ObjectContextRef obc, PromoteCallback *cb = new PromoteCallback(obc, this); object_locator_t my_oloc = oloc; my_oloc.pool = pool.info.tier_of; + if (obc->obs.oi.has_manifest()) { + if (obc->obs.oi.manifest.is_chunked()) { + object_locator_t chunk_oloc(obc->obs.oi.manifest.chunk_map[0].oid); + my_oloc = chunk_oloc; + } + } unsigned flags = CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY | CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE | @@ -6141,11 +6395,21 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) goto fail; } + for (auto &p : oi.manifest.chunk_map) { + if ((p.first <= src_offset && p.first + p.second.length > src_offset) || + (p.first > src_offset && p.first <= src_offset + src_length)) { + dout(20) << __func__ << " overlapped !! offset: " << src_offset << " length: " << src_length + << " chunk_info: " << p << dendl; + result = -EOPNOTSUPP; + goto fail; + } + } + if (!oi.manifest.is_chunked()) { oi.manifest.clear(); } - pg_t raw_pg; + pg_t raw_pg; chunk_info_t chunk_info; hobject_t target(tgt_name, tgt_oloc.key, snapid_t(), raw_pg.ps(), raw_pg.pool(), diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 8606fc4f694..e5badb887a6 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -1378,6 +1378,16 @@ protected: friend struct C_ProxyWrite_Commit; + // -- chunkop -- + void do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid, + ObjectContextRef obc, bool write_ordered); + void do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index, + uint64_t chunk_index, uint64_t req_offset, uint64_t req_length, + uint64_t req_total_len); + bool can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc); + + friend struct C_ProxyChunkRead; + public: PrimaryLogPG(OSDService *o, OSDMapRef curmap, const PGPool &_pool, spg_t p);