}
return cache_result_t::HANDLED_PROXY;
case object_manifest_t::TYPE_CHUNKED:
+ if (can_proxy_chunked_read(op, obc)) {
+ do_proxy_chunked_op(op, obc->obs.oi.soid, obc, write_ordered);
+ return cache_result_t::HANDLED_PROXY;
+ }
+ if (obc->obs.oi.size == 0) {
+ const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ const object_locator_t oloc = m->get_object_locator();
+ promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
+ return cache_result_t::BLOCKED_PROMOTE;
+ }
+ return cache_result_t::NOOP;
default:
assert(0 == "unrecognized manifest type");
}
}
};
+struct C_ProxyChunkRead : public Context {
+ PrimaryLogPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ ceph_tid_t tid;
+ PrimaryLogPG::ProxyReadOpRef prdop;
+ utime_t start;
+ ObjectOperation *obj_op;
+ int op_index;
+ uint64_t req_offset;
+ ObjectContextRef obc;
+ uint64_t req_total_len;
+ C_ProxyChunkRead(PrimaryLogPG *p, hobject_t o, epoch_t lpr,
+ const PrimaryLogPG::ProxyReadOpRef& prd)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0), prdop(prd), start(ceph_clock_now()), obj_op(NULL)
+ {}
+ void finish(int r) override {
+ if (prdop->canceled)
+ return;
+ pg->lock();
+ if (prdop->canceled) {
+ pg->unlock();
+ return;
+ }
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ if (r >= 0) {
+ if (!prdop->ops[op_index].outdata.length()) {
+ assert(req_total_len);
+ bufferlist list;
+ bufferptr bptr(req_total_len);
+ list.push_back(std::move(bptr));
+ prdop->ops[op_index].outdata.append(list);
+ }
+ assert(obj_op);
+ uint64_t copy_offset;
+ if (req_offset >= prdop->ops[op_index].op.extent.offset) {
+ copy_offset = req_offset - prdop->ops[op_index].op.extent.offset;
+ } else {
+ copy_offset = 0;
+ }
+ prdop->ops[op_index].outdata.copy_in(copy_offset, obj_op->ops[0].outdata.length(),
+ obj_op->ops[0].outdata.c_str());
+ }
+
+ pg->finish_proxy_read(oid, tid, r);
+ pg->osd->logger->tinc(l_osd_tier_r_lat, ceph_clock_now() - start);
+ if (obj_op) {
+ delete obj_op;
+ }
+ }
+ pg->unlock();
+ }
+};
+
void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
{
// NOTE: non-const here because the ProxyReadOp needs mutable refs to
oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
soid = obc->obs.oi.manifest.redirect_target;
break;
- case object_manifest_t::TYPE_CHUNKED:
default:
assert(0 == "unrecognized manifest type");
}
q->second.erase(it);
if (q->second.size() == 0) {
in_progress_proxy_ops.erase(oid);
+ } else if (std::find(q->second.begin(),
+ q->second.end(),
+ prdop->op) != q->second.end()) {
+ /* multiple read case */
+ dout(20) << __func__ << " " << oid << " is not completed " << dendl;
+ return;
}
osd->logger->inc(l_osd_tier_proxy_read);
oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
soid = obc->obs.oi.manifest.redirect_target;
break;
- case object_manifest_t::TYPE_CHUNKED:
default:
assert(0 == "unrecognized manifest type");
}
in_progress_proxy_ops[soid].push_back(op);
}
+void PrimaryLogPG::do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing_oid,
+ ObjectContextRef obc, bool write_ordered)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
+ OSDOp *osd_op = NULL;
+ for (unsigned int i = 0; i < m->ops.size(); i++) {
+ osd_op = &m->ops[i];
+ uint64_t cursor = osd_op->op.extent.offset;
+ uint64_t op_length = osd_op->op.extent.offset + osd_op->op.extent.length;
+ uint64_t chunk_length = 0, chunk_index = 0, req_len = 0;
+ object_manifest_t *manifest = &obc->obs.oi.manifest;
+ map <uint64_t, map<uint64_t, uint64_t>> chunk_read;
+
+ while (cursor < op_length) {
+ chunk_index = 0;
+ chunk_length = 0;
+ /* find the right chunk position for cursor */
+ for (auto &p : manifest->chunk_map) {
+ if (p.first <= cursor && p.first + p.second.length > cursor) {
+ chunk_length = p.second.length;
+ chunk_index = p.first;
+ break;
+ }
+ }
+ /* no index */
+ if (!chunk_index && !chunk_length) {
+ if (cursor == osd_op->op.extent.offset) {
+ OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, this);
+ ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+ ctx->data_off = osd_op->op.extent.offset;
+ ctx->ignore_log_op_stats = true;
+ complete_read_ctx(0, ctx);
+ }
+ break;
+ }
+ uint64_t next_length = chunk_length;
+ /* the size to read -> | op length | */
+ /* | a chunk | */
+ if (cursor + next_length > op_length) {
+ next_length = op_length - cursor;
+ }
+ /* the size to read -> | op length | */
+ /* | a chunk | */
+ if (cursor + next_length > chunk_index + chunk_length) {
+ next_length = chunk_index + chunk_length - cursor;
+ }
+
+ chunk_read[cursor] = {{chunk_index, next_length}};
+ cursor += next_length;
+ }
+
+ req_len = cursor - osd_op->op.extent.offset;
+ for (auto &p : chunk_read) {
+ auto chunks = p.second.begin();
+ dout(20) << __func__ << " chunk_index: " << chunks->first
+ << " next_length: " << chunks->second << " cursor: "
+ << p.first << dendl;
+ do_proxy_chunked_read(op, obc, i, chunks->first, p.first, chunks->second, req_len);
+ }
+ }
+}
+
+void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
+ uint64_t chunk_index, uint64_t req_offset, uint64_t req_length,
+ uint64_t req_total_len)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
+ object_manifest_t *manifest = &obc->obs.oi.manifest;
+ if (!manifest->chunk_map.count(chunk_index)) {
+ return;
+ }
+ uint64_t chunk_length = manifest->chunk_map[chunk_index].length;
+ hobject_t soid = manifest->chunk_map[chunk_index].oid;
+ hobject_t ori_soid = m->get_hobj();
+ object_locator_t oloc(soid);
+ unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+
+ if (!chunk_length || soid == hobject_t()) {
+ return;
+ }
+
+ /* same as do_proxy_read() */
+ flags |= m->get_flags() & (CEPH_OSD_FLAG_RWORDERED |
+ CEPH_OSD_FLAG_ORDERSNAP |
+ CEPH_OSD_FLAG_ENFORCE_SNAPC |
+ CEPH_OSD_FLAG_MAP_SNAP_CLONE);
+
+ dout(10) << __func__ << " Start do chunk proxy read for " << *m
+ << " index: " << op_index << " oid: " << soid.oid.name << " req_offset: " << req_offset
+ << " req_length: " << req_length << dendl;
+
+ ProxyReadOpRef prdop(std::make_shared<ProxyReadOp>(op, ori_soid, m->ops));
+
+ ObjectOperation *pobj_op = new ObjectOperation;
+ OSDOp &osd_op = pobj_op->add_op(m->ops[op_index].op.op);
+
+ if (chunk_index <= req_offset) {
+ osd_op.op.extent.offset = manifest->chunk_map[chunk_index].offset + req_offset - chunk_index;
+ } else {
+ assert(0 == "chunk_index > req_offset");
+ }
+ osd_op.op.extent.length = req_length;
+
+ ObjectOperation obj_op;
+ obj_op.dup(pobj_op->ops);
+
+ C_ProxyChunkRead *fin = new C_ProxyChunkRead(this, ori_soid, get_last_peering_reset(),
+ prdop);
+ fin->obj_op = pobj_op;
+ fin->op_index = op_index;
+ fin->req_offset = req_offset;
+ fin->obc = obc;
+ fin->req_total_len = req_total_len;
+
+ unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+ ceph_tid_t tid = osd->objecter->read(
+ soid.oid, oloc, obj_op,
+ m->get_snapid(), NULL,
+ flags, new C_OnFinisher(fin, osd->objecter_finishers[n]),
+ &prdop->user_version,
+ &prdop->data_offset,
+ m->get_features());
+ fin->tid = tid;
+ prdop->objecter_tid = tid;
+ proxyread_ops[tid] = prdop;
+ in_progress_proxy_ops[ori_soid].push_back(op);
+}
+
+bool PrimaryLogPG::can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
+ OSDOp *osd_op = NULL;
+ bool ret = true;
+ for (unsigned int i = 0; i < m->ops.size(); i++) {
+ osd_op = &m->ops[i];
+ ceph_osd_op op = osd_op->op;
+ switch (op.op) {
+ case CEPH_OSD_OP_READ:
+ case CEPH_OSD_OP_SYNC_READ: {
+ uint64_t cursor = osd_op->op.extent.offset;
+ uint64_t remain = osd_op->op.extent.length;
+
+ /* requested chunks exist in chunk_map ? */
+ for (auto &p : obc->obs.oi.manifest.chunk_map) {
+ if (p.first <= cursor && p.first + p.second.length > cursor) {
+ if (p.second.length >= remain) {
+ remain = 0;
+ break;
+ } else {
+ remain = remain - p.second.length;
+ }
+ cursor += p.second.length;
+ }
+ }
+
+ if (remain) {
+ dout(20) << __func__ << " requested chunks don't exist in chunk_map " << dendl;
+ return false;
+ }
+ continue;
+ }
+ default:
+ return false;
+ }
+ }
+ return ret;
+}
+
void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
{
dout(10) << __func__ << " " << oid << " tid " << tid
in_progress_op.erase(it);
if (in_progress_op.size() == 0) {
in_progress_proxy_ops.erase(oid);
+ } else if (std::find(in_progress_op.begin(),
+ in_progress_op.end(),
+ pwop->op) != in_progress_op.end()) {
+ if (pwop->ctx)
+ delete pwop->ctx;
+ pwop->ctx = NULL;
+ dout(20) << __func__ << " " << oid << " tid " << tid
+ << " in_progress_op size: "
+ << in_progress_op.size() << dendl;
+ return;
}
osd->logger->inc(l_osd_tier_proxy_write);
PromoteCallback *cb = new PromoteCallback(obc, this);
object_locator_t my_oloc = oloc;
my_oloc.pool = pool.info.tier_of;
+ if (obc->obs.oi.has_manifest()) {
+ if (obc->obs.oi.manifest.is_chunked()) {
+ object_locator_t chunk_oloc(obc->obs.oi.manifest.chunk_map[0].oid);
+ my_oloc = chunk_oloc;
+ }
+ }
unsigned flags = CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY |
CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE |
goto fail;
}
+ for (auto &p : oi.manifest.chunk_map) {
+ if ((p.first <= src_offset && p.first + p.second.length > src_offset) ||
+ (p.first > src_offset && p.first <= src_offset + src_length)) {
+ dout(20) << __func__ << " overlapped !! offset: " << src_offset << " length: " << src_length
+ << " chunk_info: " << p << dendl;
+ result = -EOPNOTSUPP;
+ goto fail;
+ }
+ }
+
if (!oi.manifest.is_chunked()) {
oi.manifest.clear();
}
- pg_t raw_pg;
+ pg_t raw_pg;
chunk_info_t chunk_info;
hobject_t target(tgt_name, tgt_oloc.key, snapid_t(),
raw_pg.ps(), raw_pg.pool(),