From: myoungwon oh Date: Tue, 10 Oct 2017 11:50:37 +0000 (+0900) Subject: osd: add promote_object() for chunked objects. X-Git-Tag: v13.0.1~59^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b9f9d2095e6bc13664122e9751d309d076255ae0;p=ceph.git osd: add promote_object() for chunked objects. Signed-off-by: Myoungwon Oh --- diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 273acf6df22..ca6ae5915b6 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2329,7 +2329,8 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( for (vector::iterator p = ops.begin(); p != ops.end(); ++p) { OSDOp& osd_op = *p; ceph_osd_op& op = osd_op.op; - if (op.op == CEPH_OSD_OP_SET_REDIRECT) { + if (op.op == CEPH_OSD_OP_SET_REDIRECT || + op.op == CEPH_OSD_OP_SET_CHUNK) { return cache_result_t::NOOP; } } @@ -2347,11 +2348,14 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( do_proxy_chunked_op(op, obc->obs.oi.soid, obc, write_ordered); return cache_result_t::HANDLED_PROXY; } - if (obc->obs.oi.size == 0) { - const MOSDOp *m = static_cast(op->get_req()); - const object_locator_t oloc = m->get_object_locator(); - promote_object(obc, obc->obs.oi.soid, oloc, op, NULL); - return cache_result_t::BLOCKED_PROMOTE; + + for (auto& p : obc->obs.oi.manifest.chunk_map) { + if (p.second.flags == chunk_info_t::FLAG_MISSING) { + const MOSDOp *m = static_cast(op->get_req()); + const object_locator_t oloc = m->get_object_locator(); + promote_object(obc, obc->obs.oi.soid, oloc, op, NULL); + return cache_result_t::BLOCKED_PROMOTE; + } } return cache_result_t::NOOP; default: @@ -3316,6 +3320,24 @@ public: } }; +class PromoteManifestCallback: public PrimaryLogPG::CopyCallback { + ObjectContextRef obc; + PrimaryLogPG *pg; + utime_t start; +public: + PromoteManifestCallback(ObjectContextRef obc_, PrimaryLogPG *pg_) + : obc(obc_), + pg(pg_), + start(ceph_clock_now()) {} + + void finish(PrimaryLogPG::CopyCallbackResults results) override { + PrimaryLogPG::CopyResults *results_data = results.get<1>(); + int r = results.get<0>(); + pg->finish_promote_manifest(r, results_data, obc); + pg->osd->logger->tinc(l_osd_tier_promote_lat, ceph_clock_now() - start); + } +}; + void PrimaryLogPG::promote_object(ObjectContextRef obc, const hobject_t& missing_oid, const object_locator_t& oloc, @@ -3355,13 +3377,16 @@ void PrimaryLogPG::promote_object(ObjectContextRef obc, src_fadvise_flags |= LIBRADOS_OP_FLAG_FADVISE_DONTNEED; } - PromoteCallback *cb = new PromoteCallback(obc, this); + CopyCallback *cb; object_locator_t my_oloc = oloc; my_oloc.pool = pool.info.tier_of; - if (obc->obs.oi.has_manifest()) { + if (!obc->obs.oi.has_manifest()) { + cb = new PromoteCallback(obc, this); + } else { if (obc->obs.oi.manifest.is_chunked()) { - object_locator_t chunk_oloc(obc->obs.oi.manifest.chunk_map[0].oid); - my_oloc = chunk_oloc; + cb = new PromoteManifestCallback(obc, this); + } else { + assert(0); } } @@ -6411,10 +6436,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector& ops) pg_t raw_pg; chunk_info_t chunk_info; + get_osdmap()->object_locator_to_pg(tgt_name, tgt_oloc, raw_pg); hobject_t target(tgt_name, tgt_oloc.key, snapid_t(), raw_pg.ps(), raw_pg.pool(), tgt_oloc.nspace); - get_osdmap()->object_locator_to_pg(tgt_name, tgt_oloc, raw_pg); chunk_info.flags = chunk_info_t::FLAG_MISSING; chunk_info.oid = target; chunk_info.offset = tgt_offset; @@ -7969,6 +7994,29 @@ struct C_CopyFrom_AsyncReadCb : public Context { } }; +struct C_CopyChunk : public Context { + PrimaryLogPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + ceph_tid_t tid; + PrimaryLogPG::CopyOpRef cop; + uint64_t offset; + C_CopyChunk(PrimaryLogPG *p, hobject_t o, epoch_t lpr, + const PrimaryLogPG::CopyOpRef& c) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0), cop(c) + {} + void finish(int r) override { + if (r == -ECANCELED) + return; + pg->lock(); + if (last_peering_reset == pg->get_last_peering_reset()) { + pg->process_copy_chunk_manifest(oid, tid, r, offset); + } + pg->unlock(); + } +}; + int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, ObjectContextRef &obc) { @@ -8193,7 +8241,12 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc, copy_ops[dest] = cop; obc->start_block(); - _copy_some(obc, cop); + if (!obc->obs.oi.has_manifest()) { + _copy_some(obc, cop); + } else { + auto p = obc->obs.oi.manifest.chunk_map.begin(); + _copy_some_manifest(obc, cop, p->first); + } } void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop) @@ -8264,6 +8317,95 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop) gather.activate(); } +void PrimaryLogPG::_copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset) +{ + dout(10) << __func__ << " " << obc << " " << cop << dendl; + + unsigned flags = 0; + if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_FLUSH) + flags |= CEPH_OSD_FLAG_FLUSH; + if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE) + flags |= CEPH_OSD_FLAG_IGNORE_CACHE; + if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY) + flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY; + if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE) + flags |= CEPH_OSD_FLAG_MAP_SNAP_CLONE; + if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_RWORDERED) + flags |= CEPH_OSD_FLAG_RWORDERED; + + int num_chunks = 0; + uint64_t last_offset = 0, chunks_size = 0; + object_manifest_t *manifest = &obc->obs.oi.manifest; + map::iterator iter = manifest->chunk_map.find(start_offset); + for (;iter != manifest->chunk_map.end(); ++iter) { + num_chunks++; + chunks_size += iter->second.length; + last_offset = iter->first; + if (get_copy_chunk_size() < chunks_size) { + break; + } + } + + cop->num_chunk = num_chunks; + cop->start_offset = start_offset; + cop->last_offset = last_offset; + dout(20) << __func__ << " oid " << obc->obs.oi.soid << " num_chunks: " << num_chunks + << " start_offset: " << start_offset << " chunks_size: " << chunks_size + << " last_offset: " << last_offset << dendl; + + iter = manifest->chunk_map.find(start_offset); + for (;iter != manifest->chunk_map.end(); ++iter) { + uint64_t obj_offset = iter->first; + uint64_t length = manifest->chunk_map[iter->first].length; + hobject_t soid = manifest->chunk_map[iter->first].oid; + object_locator_t oloc(soid); + CopyOpRef sub_cop(std::make_shared(cop->cb, cop->obc, cop->src, oloc, + cop->results.user_version, cop->flags, cop->mirror_snapset, + cop->src_obj_fadvise_flags, cop->dest_obj_fadvise_flags)); + sub_cop->cursor.data_offset = obj_offset; + cop->chunk_cops[obj_offset] = sub_cop; + + int s = sub_cop->chunk_ops.size(); + sub_cop->chunk_ops.resize(s+1); + sub_cop->chunk_ops[s].op.op = CEPH_OSD_OP_READ; + sub_cop->chunk_ops[s].op.extent.offset = manifest->chunk_map[iter->first].offset; + sub_cop->chunk_ops[s].op.extent.length = length; + + ObjectOperation op; + op.dup(sub_cop->chunk_ops); + + dout(20) << __func__ << " tgt_oid: " << soid.oid << " tgt_offset: " + << manifest->chunk_map[iter->first].offset + << " length: " << length << " pool id: " << oloc.pool << dendl; + + if (cop->results.user_version) { + op.assert_version(cop->results.user_version); + } else { + // we should learn the version after the first chunk, if we didn't know + // it already! + assert(cop->cursor.is_initial()); + } + op.set_last_op_flags(cop->src_obj_fadvise_flags); + + C_CopyChunk *fin = new C_CopyChunk(this, obc->obs.oi.soid, + get_last_peering_reset(), cop); + fin->offset = obj_offset; + unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers); + + ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, op, + sub_cop->src.snap, NULL, + flags, + new C_OnFinisher(fin, osd->objecter_finishers[n]), + // discover the object version if we don't know it yet + sub_cop->results.user_version ? NULL : &sub_cop->results.user_version); + fin->tid = tid; + sub_cop->objecter_tid = tid; + if (last_offset < iter->first) { + break; + } + } +} + void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r) { dout(10) << __func__ << " " << oid << " tid " << tid @@ -8449,28 +8591,133 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r) // cancel and requeue proxy ops on this object if (!r) { - for (map::iterator it = proxyread_ops.begin(); - it != proxyread_ops.end();) { - if (it->second->soid == cobc->obs.oi.soid) { - cancel_proxy_read((it++)->second); - } else { - ++it; - } + cancel_and_requeue_proxy_ops(cobc->obs.oi.soid); + } + + kick_object_context_blocked(cobc); +} + +void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset) +{ + dout(10) << __func__ << " " << oid << " tid " << tid + << " " << cpp_strerror(r) << dendl; + map::iterator p = copy_ops.find(oid); + if (p == copy_ops.end()) { + dout(10) << __func__ << " no copy_op found" << dendl; + return; + } + CopyOpRef obj_cop = p->second; + CopyOpRef chunk_cop = obj_cop->chunk_cops[offset]; + + if (tid != chunk_cop->objecter_tid) { + dout(10) << __func__ << " tid " << tid << " != cop " << chunk_cop + << " tid " << chunk_cop->objecter_tid << dendl; + return; + } + + if (chunk_cop->omap_data.length() || chunk_cop->omap_header.length()) { + r = -EOPNOTSUPP; + } + + chunk_cop->objecter_tid = 0; + chunk_cop->objecter_tid2 = 0; // assume this ordered before us (if it happened) + ObjectContextRef& cobc = obj_cop->obc; + OSDOp &chunk_data = chunk_cop->chunk_ops[0]; + + if (r < 0) { + obj_cop->failed = true; + goto out; + } + + if (obj_cop->failed) { + return; + } + + if (!chunk_data.outdata.length()) { + r = -EIO; + obj_cop->failed = true; + goto out; + } + + obj_cop->num_chunk--; + + /* check all of the copyop are completed */ + if (obj_cop->num_chunk) { + dout(20) << __func__ << " num_chunk: " << obj_cop->num_chunk << dendl; + return; + } + + { + OpContextUPtr ctx = simple_opc_create(obj_cop->obc); + PGTransaction *t = ctx->op_t.get(); + ObjectState& obs = ctx->new_obs; + for (auto p : obj_cop->chunk_cops) { + OSDOp &sub_chunk = p.second->chunk_ops[0]; + t->write(cobc->obs.oi.soid, + p.second->cursor.data_offset, + sub_chunk.outdata.length(), + sub_chunk.outdata, + p.second->dest_obj_fadvise_flags); + obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean + dout(20) << __func__ << " offset: " << p.second->cursor.data_offset + << " length: " << sub_chunk.outdata.length() << dendl; + sub_chunk.outdata.clear(); + write_update_size_and_usage(ctx->delta_stats, obs.oi, ctx->modified_ranges, + p.second->cursor.data_offset, sub_chunk.outdata.length()); } - for (map::iterator it = proxywrite_ops.begin(); - it != proxywrite_ops.end();) { - if (it->second->soid == cobc->obs.oi.soid) { - cancel_proxy_write((it++)->second); - } else { - ++it; + obs.oi.clear_data_digest(); + ctx->at_version = get_next_version(); + finish_ctx(ctx.get(), pg_log_entry_t::PROMOTE); + simple_opc_submit(std::move(ctx)); + + auto p = cobc->obs.oi.manifest.chunk_map.end(); + /* check remaining work */ + if (obj_cop->last_offset >= p->first + p->second.length) { + for (auto &en : cobc->obs.oi.manifest.chunk_map) { + if (obj_cop->last_offset < en.first) { + _copy_some_manifest(cobc, obj_cop, en.first); + return; + } } } - kick_proxy_ops_blocked(cobc->obs.oi.soid); + } + + out: + dout(20) << __func__ << " complete r = " << cpp_strerror(r) << dendl; + CopyCallbackResults results(r, &obj_cop->results); + obj_cop->cb->complete(results); + + copy_ops.erase(cobc->obs.oi.soid); + cobc->stop_block(); + + // cancel and requeue proxy ops on this object + if (!r) { + cancel_and_requeue_proxy_ops(cobc->obs.oi.soid); } kick_object_context_blocked(cobc); } +void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) { + for (map::iterator it = proxyread_ops.begin(); + it != proxyread_ops.end();) { + if (it->second->soid == oid) { + cancel_proxy_read((it++)->second); + } else { + ++it; + } + } + for (map::iterator it = proxywrite_ops.begin(); + it != proxywrite_ops.end();) { + if (it->second->soid == oid) { + cancel_proxy_write((it++)->second); + } else { + ++it; + } + } + kick_proxy_ops_blocked(oid); +} + void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t) { dout(20) << __func__ << " " << cop @@ -8814,6 +9061,42 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results, agent_choose_mode(); } +void PrimaryLogPG::finish_promote_manifest(int r, CopyResults *results, + ObjectContextRef obc) +{ + const hobject_t& soid = obc->obs.oi.soid; + dout(10) << __func__ << " " << soid << " r=" << r + << " uv" << results->user_version << dendl; + + if (r == -ECANCELED) { + return; + } + + if (r < 0) { + derr << __func__ << " unexpected promote error " << cpp_strerror(r) << dendl; + // pass error to everyone blocked on this object + // FIXME: this is pretty sloppy, but at this point we got + // something unexpected and don't have many other options. + map>::iterator blocked_iter = + waiting_for_blocked_object.find(soid); + if (blocked_iter != waiting_for_blocked_object.end()) { + while (!blocked_iter->second.empty()) { + osd->reply_op_error(blocked_iter->second.front(), r); + blocked_iter->second.pop_front(); + } + waiting_for_blocked_object.erase(blocked_iter); + } + return; + } + + osd->promote_finish(results->object_size); + osd->logger->inc(l_osd_tier_promote); + + if (agent_state && + agent_state->is_idle()) + agent_choose_mode(); +} + void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue) { dout(10) << __func__ << " " << cop->obc->obs.oi.soid diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index e5badb887a6..bda615b2de5 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -117,6 +117,9 @@ public: {} }; + struct CopyOp; + typedef ceph::shared_ptr CopyOpRef; + struct CopyOp { CopyCallback *cb; ObjectContextRef obc; @@ -149,6 +152,13 @@ public: unsigned src_obj_fadvise_flags; unsigned dest_obj_fadvise_flags; + map chunk_cops; + int num_chunk; + bool failed; + uint64_t start_offset; + uint64_t last_offset; + vector chunk_ops; + CopyOp(CopyCallback *cb_, ObjectContextRef _obc, hobject_t s, object_locator_t l, version_t v, @@ -162,13 +172,14 @@ public: objecter_tid2(0), rval(-1), src_obj_fadvise_flags(src_obj_fadvise_flags), - dest_obj_fadvise_flags(dest_obj_fadvise_flags) + dest_obj_fadvise_flags(dest_obj_fadvise_flags), + num_chunk(0), + failed(false) { results.user_version = v; results.mirror_snapset = mirror_snapset; } }; - typedef ceph::shared_ptr CopyOpRef; /** * The CopyCallback class defines an interface for completions to the @@ -1385,8 +1396,14 @@ protected: uint64_t chunk_index, uint64_t req_offset, uint64_t req_length, uint64_t req_total_len); bool can_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc); - + void _copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset); + void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset); + void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc); + void cancel_and_requeue_proxy_ops(hobject_t oid); + friend struct C_ProxyChunkRead; + friend class PromoteManifestCallback; + friend class C_CopyChunk; public: PrimaryLogPG(OSDService *o, OSDMapRef curmap,