From 2e3af00ab1b8803ab144f4e45442ed1b3e4489eb Mon Sep 17 00:00:00 2001 From: myoungwon oh Date: Tue, 20 Jun 2017 20:38:18 +0900 Subject: [PATCH] osd: add flush() for the chunked object. If all chunks are dirty, the cheunked object will be flushed Signed-off-by: Myoungwon Oh --- src/osd/PrimaryLogPG.cc | 209 +++++++++++++++++++++++++++++++++++++++- src/osd/PrimaryLogPG.h | 13 ++- 2 files changed, 218 insertions(+), 4 deletions(-) diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 0f57703eadd7a..682ec073491cd 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -2412,6 +2412,16 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( return cache_result_t::BLOCKED_PROMOTE; } } + + bool all_dirty = true; + for (auto& p : obc->obs.oi.manifest.chunk_map) { + if (p.second.flags != chunk_info_t::FLAG_DIRTY) { + all_dirty = false; + } + } + if (all_dirty) { + start_flush(OpRequestRef(), obc, true, NULL, boost::none); + } return cache_result_t::NOOP; } default: @@ -2421,6 +2431,154 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail( return cache_result_t::NOOP; } +struct C_ManifestFlush : public Context { + PrimaryLogPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + ceph_tid_t tid; + utime_t start; + uint64_t offset; + uint64_t last_offset; + PrimaryLogPG::FlushOpRef manifest_fop; + C_ManifestFlush(PrimaryLogPG *p, hobject_t o, epoch_t lpr) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0), start(ceph_clock_now()) + {} + void finish(int r) override { + if (r == -ECANCELED) + return; + pg->lock(); + if (manifest_fop->rval < 0) { + pg->unlock(); + return; + } + manifest_fop->io_results[offset] = r; + for (auto &p : manifest_fop->io_results) { + if (p.second < 0) { + pg->finish_manifest_flush(oid, tid, r, manifest_fop->obc, last_offset, manifest_fop); + manifest_fop->rval = r; + pg->unlock(); + return; + } + } + if (manifest_fop->chunks == manifest_fop->io_results.size()) { + if (last_peering_reset == pg->get_last_peering_reset()) { + assert(manifest_fop->obc); + pg->finish_manifest_flush(oid, tid, r, manifest_fop->obc, last_offset, manifest_fop); + pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start); + } + } + pg->unlock(); + } +}; + +int PrimaryLogPG::start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking, + boost::optional> &&on_flush) +{ + auto p = obc->obs.oi.manifest.chunk_map.begin(); + FlushOpRef manifest_fop(std::make_shared()); + manifest_fop->op = op; + manifest_fop->obc = obc; + manifest_fop->flushed_version = obc->obs.oi.user_version; + manifest_fop->blocking = blocking; + manifest_fop->on_flush = std::move(on_flush); + int r = do_manifest_flush(op, obc, manifest_fop, p->first, blocking); + if (r < 0) { + return r; + } + + flush_ops[obc->obs.oi.soid] = manifest_fop; + return -EINPROGRESS; +} + +int PrimaryLogPG::do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop, + uint64_t start_offset, bool block) +{ + struct object_manifest_t &manifest = obc->obs.oi.manifest; + hobject_t soid = obc->obs.oi.soid; + ceph_tid_t tid; + SnapContext snapc; + uint64_t max_copy_size = 0, last_offset = 0; + + map::iterator iter = manifest.chunk_map.find(start_offset); + assert(iter != manifest.chunk_map.end()); + for (;iter != manifest.chunk_map.end(); ++iter) { + if (iter->second.flags == chunk_info_t::FLAG_DIRTY) { + last_offset = iter->first; + max_copy_size += iter->second.length; + } + if (get_copy_chunk_size() < max_copy_size) { + break; + } + } + + iter = manifest.chunk_map.find(start_offset); + for (;iter != manifest.chunk_map.end(); ++iter) { + if (iter->second.flags != chunk_info_t::FLAG_DIRTY) { + continue; + } + uint64_t tgt_length = iter->second.length; + uint64_t tgt_offset= iter->second.offset; + hobject_t tgt_soid = iter->second.oid; + object_locator_t oloc(tgt_soid); + ObjectOperation obj_op; + bufferlist chunk_data; + int r = pgbackend->objects_read_sync( + soid, iter->first, tgt_length, 0, &chunk_data); + if (r < 0) { + dout(0) << __func__ << " read fail " << " offset: " << tgt_offset + << " len: " << tgt_length << " r: " << r << dendl; + return r; + } + if (!chunk_data.length()) { + return -ENODATA; + } + tgt_length = chunk_data.length(); + obj_op.add_data(CEPH_OSD_OP_WRITE, tgt_offset, tgt_length, chunk_data); + + unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY | + CEPH_OSD_FLAG_RWORDERED ; + C_ManifestFlush *fin = new C_ManifestFlush(this, soid, get_last_peering_reset()); + fin->offset = iter->first; + fin->last_offset = last_offset; + fin->manifest_fop = manifest_fop; + manifest_fop->chunks++; + + unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers); + tid = osd->objecter->mutate( + tgt_soid.oid, oloc, obj_op, snapc, + ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime), + flags, new C_OnFinisher(fin, osd->objecter_finishers[n])); + fin->tid = tid; + manifest_fop->io_tids[iter->first] = tid; + + dout(20) << __func__ << " offset: " << tgt_offset << " len: " << tgt_length + << " oid: " << tgt_soid.oid << " ori oid: " << soid.oid.name + << " tid: " << tid << dendl; + if (last_offset < iter->first) { + break; + } + } + + return 0; +} + +void PrimaryLogPG::finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc, + uint64_t last_offset, FlushOpRef manifest_fop) +{ + dout(10) << __func__ << " " << oid << " tid " << tid + << " " << cpp_strerror(r) << " last_offset: " << last_offset << dendl; + map::iterator iter = obc->obs.oi.manifest.chunk_map.find(last_offset); + assert(iter != obc->obs.oi.manifest.chunk_map.end()); + for (;iter != obc->obs.oi.manifest.chunk_map.end(); ++iter) { + if (iter->second.flags == chunk_info_t::FLAG_DIRTY && last_offset < iter->first) { + do_manifest_flush(manifest_fop->op, obc, manifest_fop, iter->first, manifest_fop->blocking); + return; + } + } + finish_flush(oid, tid, r); +} + void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid, MOSDOpReply *orig_reply, int r) { @@ -8745,12 +8903,12 @@ void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, in sub_chunk.outdata.length(), sub_chunk.outdata, p.second->dest_obj_fadvise_flags); - obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean dout(20) << __func__ << " offset: " << p.second->cursor.data_offset << " length: " << sub_chunk.outdata.length() << dendl; sub_chunk.outdata.clear(); write_update_size_and_usage(ctx->delta_stats, obs.oi, ctx->modified_ranges, p.second->cursor.data_offset, sub_chunk.outdata.length()); + obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean } obs.oi.clear_data_digest(); ctx->at_version = get_next_version(); @@ -9359,6 +9517,15 @@ int PrimaryLogPG::start_flush( cancel_flush(fop, false); } + if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) { + int r = start_manifest_flush(op, obc, blocking, std::move(on_flush)); + if (r != -EINPROGRESS) { + if (blocking) + obc->stop_block(); + } + return r; + } + /** * In general, we need to send a delete and a copyfrom. * Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)] @@ -9475,7 +9642,7 @@ void PrimaryLogPG::finish_flush(hobject_t oid, ceph_tid_t tid, int r) return; } FlushOpRef fop = p->second; - if (tid != fop->objecter_tid) { + if (tid != fop->objecter_tid && !fop->obc->obs.oi.has_manifest()) { dout(10) << __func__ << " tid " << tid << " != fop " << fop << " tid " << fop->objecter_tid << dendl; return; @@ -9559,7 +9726,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop) } // successfully flushed, can we evict this object? - if (!fop->op && agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE && + if (!obc->obs.oi.has_manifest() && !fop->op && agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE && agent_maybe_evict(obc, true)) { osd->logger->inc(l_osd_tier_clean); if (fop->on_flush) { @@ -9606,6 +9773,36 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop) ctx->new_obs = obc->obs; ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY); --ctx->delta_stats.num_objects_dirty; + if (fop->obc->obs.oi.has_manifest()) { + assert(obc->obs.oi.manifest.is_chunked()); + PGTransaction* t = ctx->op_t.get(); + uint64_t chunks_size = 0; + for (auto &p : ctx->new_obs.oi.manifest.chunk_map) { + chunks_size += p.second.length; + } + if (ctx->new_obs.oi.is_omap() && pool.info.supports_omap()) { + t->omap_clear(oid); + ctx->new_obs.oi.clear_omap_digest(); + ctx->new_obs.oi.clear_flag(object_info_t::FLAG_OMAP); + } + if (obc->obs.oi.size == chunks_size) { + t->truncate(oid, 0); + ctx->new_obs.oi.size = 0; + ctx->new_obs.oi.new_object(); + for (auto &p : ctx->new_obs.oi.manifest.chunk_map) { + p.second.flags = chunk_info_t::FLAG_MISSING; + ctx->delta_stats.num_bytes -= p.second.length; + } + } else { + for (auto &p : ctx->new_obs.oi.manifest.chunk_map) { + if (p.second.flags == chunk_info_t::FLAG_DIRTY) { + dout(20) << __func__ << " offset: " << p.second.offset + << " length: " << p.second.length << dendl; + p.second.flags = 0; // CLEAN + } + } + } + } finish_ctx(ctx.get(), pg_log_entry_t::CLEAN); @@ -9640,6 +9837,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue) osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED); fop->objecter_tid = 0; } + if (fop->io_tids.size()) { + for (auto &p : fop->io_tids) { + osd->objecter->op_cancel(p.second, -ECANCELED); + p.second = 0; + } + } if (fop->blocking) { fop->obc->stop_block(); kick_object_context_blocked(fop->obc); diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 36cc06fc96803..838053bd2f804 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -243,10 +243,14 @@ public: bool blocking; ///< whether we are blocking updates bool removal; ///< we are removing the backend object boost::optional> on_flush; ///< callback, may be null + // for chunked object + map io_results; + map io_tids; + uint64_t chunks; FlushOp() : flushed_version(0), objecter_tid(0), rval(0), - blocking(false), removal(false) {} + blocking(false), removal(false), chunks(0) {} ~FlushOp() { assert(!on_flush); } }; typedef ceph::shared_ptr FlushOpRef; @@ -1413,10 +1417,17 @@ protected: void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset); void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc); void cancel_and_requeue_proxy_ops(hobject_t oid); + int do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop, + uint64_t start_offset, bool block); + int start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking, + boost::optional> &&on_flush); + void finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc, + uint64_t last_offset, FlushOpRef manifest_fop); friend struct C_ProxyChunkRead; friend class PromoteManifestCallback; friend class C_CopyChunk; + friend struct C_ManifestFlush; public: PrimaryLogPG(OSDService *o, OSDMapRef curmap, -- 2.39.5