return cache_result_t::BLOCKED_PROMOTE;
}
}
+
+ bool all_dirty = true;
+ for (auto& p : obc->obs.oi.manifest.chunk_map) {
+ if (p.second.flags != chunk_info_t::FLAG_DIRTY) {
+ all_dirty = false;
+ }
+ }
+ if (all_dirty) {
+ start_flush(OpRequestRef(), obc, true, NULL, boost::none);
+ }
return cache_result_t::NOOP;
}
default:
return cache_result_t::NOOP;
}
+struct C_ManifestFlush : public Context {
+ PrimaryLogPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ ceph_tid_t tid;
+ utime_t start;
+ uint64_t offset;
+ uint64_t last_offset;
+ PrimaryLogPG::FlushOpRef manifest_fop;
+ C_ManifestFlush(PrimaryLogPG *p, hobject_t o, epoch_t lpr)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0), start(ceph_clock_now())
+ {}
+ void finish(int r) override {
+ if (r == -ECANCELED)
+ return;
+ pg->lock();
+ if (manifest_fop->rval < 0) {
+ pg->unlock();
+ return;
+ }
+ manifest_fop->io_results[offset] = r;
+ for (auto &p : manifest_fop->io_results) {
+ if (p.second < 0) {
+ pg->finish_manifest_flush(oid, tid, r, manifest_fop->obc, last_offset, manifest_fop);
+ manifest_fop->rval = r;
+ pg->unlock();
+ return;
+ }
+ }
+ if (manifest_fop->chunks == manifest_fop->io_results.size()) {
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ assert(manifest_fop->obc);
+ pg->finish_manifest_flush(oid, tid, r, manifest_fop->obc, last_offset, manifest_fop);
+ pg->osd->logger->tinc(l_osd_tier_flush_lat, ceph_clock_now() - start);
+ }
+ }
+ pg->unlock();
+ }
+};
+
+int PrimaryLogPG::start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
+ boost::optional<std::function<void()>> &&on_flush)
+{
+ auto p = obc->obs.oi.manifest.chunk_map.begin();
+ FlushOpRef manifest_fop(std::make_shared<FlushOp>());
+ manifest_fop->op = op;
+ manifest_fop->obc = obc;
+ manifest_fop->flushed_version = obc->obs.oi.user_version;
+ manifest_fop->blocking = blocking;
+ manifest_fop->on_flush = std::move(on_flush);
+ int r = do_manifest_flush(op, obc, manifest_fop, p->first, blocking);
+ if (r < 0) {
+ return r;
+ }
+
+ flush_ops[obc->obs.oi.soid] = manifest_fop;
+ return -EINPROGRESS;
+}
+
+int PrimaryLogPG::do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop,
+ uint64_t start_offset, bool block)
+{
+ struct object_manifest_t &manifest = obc->obs.oi.manifest;
+ hobject_t soid = obc->obs.oi.soid;
+ ceph_tid_t tid;
+ SnapContext snapc;
+ uint64_t max_copy_size = 0, last_offset = 0;
+
+ map<uint64_t, chunk_info_t>::iterator iter = manifest.chunk_map.find(start_offset);
+ assert(iter != manifest.chunk_map.end());
+ for (;iter != manifest.chunk_map.end(); ++iter) {
+ if (iter->second.flags == chunk_info_t::FLAG_DIRTY) {
+ last_offset = iter->first;
+ max_copy_size += iter->second.length;
+ }
+ if (get_copy_chunk_size() < max_copy_size) {
+ break;
+ }
+ }
+
+ iter = manifest.chunk_map.find(start_offset);
+ for (;iter != manifest.chunk_map.end(); ++iter) {
+ if (iter->second.flags != chunk_info_t::FLAG_DIRTY) {
+ continue;
+ }
+ uint64_t tgt_length = iter->second.length;
+ uint64_t tgt_offset= iter->second.offset;
+ hobject_t tgt_soid = iter->second.oid;
+ object_locator_t oloc(tgt_soid);
+ ObjectOperation obj_op;
+ bufferlist chunk_data;
+ int r = pgbackend->objects_read_sync(
+ soid, iter->first, tgt_length, 0, &chunk_data);
+ if (r < 0) {
+ dout(0) << __func__ << " read fail " << " offset: " << tgt_offset
+ << " len: " << tgt_length << " r: " << r << dendl;
+ return r;
+ }
+ if (!chunk_data.length()) {
+ return -ENODATA;
+ }
+ tgt_length = chunk_data.length();
+ obj_op.add_data(CEPH_OSD_OP_WRITE, tgt_offset, tgt_length, chunk_data);
+
+ unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
+ CEPH_OSD_FLAG_RWORDERED ;
+ C_ManifestFlush *fin = new C_ManifestFlush(this, soid, get_last_peering_reset());
+ fin->offset = iter->first;
+ fin->last_offset = last_offset;
+ fin->manifest_fop = manifest_fop;
+ manifest_fop->chunks++;
+
+ unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+ tid = osd->objecter->mutate(
+ tgt_soid.oid, oloc, obj_op, snapc,
+ ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
+ flags, new C_OnFinisher(fin, osd->objecter_finishers[n]));
+ fin->tid = tid;
+ manifest_fop->io_tids[iter->first] = tid;
+
+ dout(20) << __func__ << " offset: " << tgt_offset << " len: " << tgt_length
+ << " oid: " << tgt_soid.oid << " ori oid: " << soid.oid.name
+ << " tid: " << tid << dendl;
+ if (last_offset < iter->first) {
+ break;
+ }
+ }
+
+ return 0;
+}
+
+void PrimaryLogPG::finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc,
+ uint64_t last_offset, FlushOpRef manifest_fop)
+{
+ dout(10) << __func__ << " " << oid << " tid " << tid
+ << " " << cpp_strerror(r) << " last_offset: " << last_offset << dendl;
+ map<uint64_t, chunk_info_t>::iterator iter = obc->obs.oi.manifest.chunk_map.find(last_offset);
+ assert(iter != obc->obs.oi.manifest.chunk_map.end());
+ for (;iter != obc->obs.oi.manifest.chunk_map.end(); ++iter) {
+ if (iter->second.flags == chunk_info_t::FLAG_DIRTY && last_offset < iter->first) {
+ do_manifest_flush(manifest_fop->op, obc, manifest_fop, iter->first, manifest_fop->blocking);
+ return;
+ }
+ }
+ finish_flush(oid, tid, r);
+}
+
void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
MOSDOpReply *orig_reply, int r)
{
sub_chunk.outdata.length(),
sub_chunk.outdata,
p.second->dest_obj_fadvise_flags);
- obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean
dout(20) << __func__ << " offset: " << p.second->cursor.data_offset
<< " length: " << sub_chunk.outdata.length() << dendl;
sub_chunk.outdata.clear();
write_update_size_and_usage(ctx->delta_stats, obs.oi, ctx->modified_ranges,
p.second->cursor.data_offset, sub_chunk.outdata.length());
+ obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean
}
obs.oi.clear_data_digest();
ctx->at_version = get_next_version();
cancel_flush(fop, false);
}
+ if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
+ int r = start_manifest_flush(op, obc, blocking, std::move(on_flush));
+ if (r != -EINPROGRESS) {
+ if (blocking)
+ obc->stop_block();
+ }
+ return r;
+ }
+
/**
* In general, we need to send a delete and a copyfrom.
* Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)]
return;
}
FlushOpRef fop = p->second;
- if (tid != fop->objecter_tid) {
+ if (tid != fop->objecter_tid && !fop->obc->obs.oi.has_manifest()) {
dout(10) << __func__ << " tid " << tid << " != fop " << fop
<< " tid " << fop->objecter_tid << dendl;
return;
}
// successfully flushed, can we evict this object?
- if (!fop->op && agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE &&
+ if (!obc->obs.oi.has_manifest() && !fop->op && agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE &&
agent_maybe_evict(obc, true)) {
osd->logger->inc(l_osd_tier_clean);
if (fop->on_flush) {
ctx->new_obs = obc->obs;
ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
--ctx->delta_stats.num_objects_dirty;
+ if (fop->obc->obs.oi.has_manifest()) {
+ assert(obc->obs.oi.manifest.is_chunked());
+ PGTransaction* t = ctx->op_t.get();
+ uint64_t chunks_size = 0;
+ for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
+ chunks_size += p.second.length;
+ }
+ if (ctx->new_obs.oi.is_omap() && pool.info.supports_omap()) {
+ t->omap_clear(oid);
+ ctx->new_obs.oi.clear_omap_digest();
+ ctx->new_obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+ }
+ if (obc->obs.oi.size == chunks_size) {
+ t->truncate(oid, 0);
+ ctx->new_obs.oi.size = 0;
+ ctx->new_obs.oi.new_object();
+ for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
+ p.second.flags = chunk_info_t::FLAG_MISSING;
+ ctx->delta_stats.num_bytes -= p.second.length;
+ }
+ } else {
+ for (auto &p : ctx->new_obs.oi.manifest.chunk_map) {
+ if (p.second.flags == chunk_info_t::FLAG_DIRTY) {
+ dout(20) << __func__ << " offset: " << p.second.offset
+ << " length: " << p.second.length << dendl;
+ p.second.flags = 0; // CLEAN
+ }
+ }
+ }
+ }
finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
fop->objecter_tid = 0;
}
+ if (fop->io_tids.size()) {
+ for (auto &p : fop->io_tids) {
+ osd->objecter->op_cancel(p.second, -ECANCELED);
+ p.second = 0;
+ }
+ }
if (fop->blocking) {
fop->obc->stop_block();
kick_object_context_blocked(fop->obc);
bool blocking; ///< whether we are blocking updates
bool removal; ///< we are removing the backend object
boost::optional<std::function<void()>> on_flush; ///< callback, may be null
+ // for chunked object
+ map<uint64_t, int> io_results;
+ map<uint64_t, ceph_tid_t> io_tids;
+ uint64_t chunks;
FlushOp()
: flushed_version(0), objecter_tid(0), rval(0),
- blocking(false), removal(false) {}
+ blocking(false), removal(false), chunks(0) {}
~FlushOp() { assert(!on_flush); }
};
typedef ceph::shared_ptr<FlushOp> FlushOpRef;
void process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset);
void finish_promote_manifest(int r, CopyResults *results, ObjectContextRef obc);
void cancel_and_requeue_proxy_ops(hobject_t oid);
+ int do_manifest_flush(OpRequestRef op, ObjectContextRef obc, FlushOpRef manifest_fop,
+ uint64_t start_offset, bool block);
+ int start_manifest_flush(OpRequestRef op, ObjectContextRef obc, bool blocking,
+ boost::optional<std::function<void()>> &&on_flush);
+ void finish_manifest_flush(hobject_t oid, ceph_tid_t tid, int r, ObjectContextRef obc,
+ uint64_t last_offset, FlushOpRef manifest_fop);
friend struct C_ProxyChunkRead;
friend class PromoteManifestCallback;
friend class C_CopyChunk;
+ friend struct C_ManifestFlush;
public:
PrimaryLogPG(OSDService *o, OSDMapRef curmap,