for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
OSDOp& osd_op = *p;
ceph_osd_op& op = osd_op.op;
- if (op.op == CEPH_OSD_OP_SET_REDIRECT) {
+ if (op.op == CEPH_OSD_OP_SET_REDIRECT ||
+ op.op == CEPH_OSD_OP_SET_CHUNK) {
return cache_result_t::NOOP;
}
}
do_proxy_chunked_op(op, obc->obs.oi.soid, obc, write_ordered);
return cache_result_t::HANDLED_PROXY;
}
- if (obc->obs.oi.size == 0) {
- const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
- const object_locator_t oloc = m->get_object_locator();
- promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
- return cache_result_t::BLOCKED_PROMOTE;
+
+ for (auto& p : obc->obs.oi.manifest.chunk_map) {
+ if (p.second.flags == chunk_info_t::FLAG_MISSING) {
+ const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
+ const object_locator_t oloc = m->get_object_locator();
+ promote_object(obc, obc->obs.oi.soid, oloc, op, NULL);
+ return cache_result_t::BLOCKED_PROMOTE;
+ }
}
return cache_result_t::NOOP;
default:
}
};
+class PromoteManifestCallback: public PrimaryLogPG::CopyCallback {
+ ObjectContextRef obc;
+ PrimaryLogPG *pg;
+ utime_t start;
+public:
+ PromoteManifestCallback(ObjectContextRef obc_, PrimaryLogPG *pg_)
+ : obc(obc_),
+ pg(pg_),
+ start(ceph_clock_now()) {}
+
+ void finish(PrimaryLogPG::CopyCallbackResults results) override {
+ PrimaryLogPG::CopyResults *results_data = results.get<1>();
+ int r = results.get<0>();
+ pg->finish_promote_manifest(r, results_data, obc);
+ pg->osd->logger->tinc(l_osd_tier_promote_lat, ceph_clock_now() - start);
+ }
+};
+
void PrimaryLogPG::promote_object(ObjectContextRef obc,
const hobject_t& missing_oid,
const object_locator_t& oloc,
src_fadvise_flags |= LIBRADOS_OP_FLAG_FADVISE_DONTNEED;
}
- PromoteCallback *cb = new PromoteCallback(obc, this);
+ CopyCallback *cb;
object_locator_t my_oloc = oloc;
my_oloc.pool = pool.info.tier_of;
- if (obc->obs.oi.has_manifest()) {
+ if (!obc->obs.oi.has_manifest()) {
+ cb = new PromoteCallback(obc, this);
+ } else {
if (obc->obs.oi.manifest.is_chunked()) {
- object_locator_t chunk_oloc(obc->obs.oi.manifest.chunk_map[0].oid);
- my_oloc = chunk_oloc;
+ cb = new PromoteManifestCallback(obc, this);
+ } else {
+ assert(0);
}
}
pg_t raw_pg;
chunk_info_t chunk_info;
+ get_osdmap()->object_locator_to_pg(tgt_name, tgt_oloc, raw_pg);
hobject_t target(tgt_name, tgt_oloc.key, snapid_t(),
raw_pg.ps(), raw_pg.pool(),
tgt_oloc.nspace);
- get_osdmap()->object_locator_to_pg(tgt_name, tgt_oloc, raw_pg);
chunk_info.flags = chunk_info_t::FLAG_MISSING;
chunk_info.oid = target;
chunk_info.offset = tgt_offset;
}
};
+struct C_CopyChunk : public Context {
+ PrimaryLogPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ ceph_tid_t tid;
+ PrimaryLogPG::CopyOpRef cop;
+ uint64_t offset;
+ C_CopyChunk(PrimaryLogPG *p, hobject_t o, epoch_t lpr,
+ const PrimaryLogPG::CopyOpRef& c)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0), cop(c)
+ {}
+ void finish(int r) override {
+ if (r == -ECANCELED)
+ return;
+ pg->lock();
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ pg->process_copy_chunk_manifest(oid, tid, r, offset);
+ }
+ pg->unlock();
+ }
+};
+
int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
OSDOp& osd_op, ObjectContextRef &obc)
{
copy_ops[dest] = cop;
obc->start_block();
- _copy_some(obc, cop);
+ if (!obc->obs.oi.has_manifest()) {
+ _copy_some(obc, cop);
+ } else {
+ auto p = obc->obs.oi.manifest.chunk_map.begin();
+ _copy_some_manifest(obc, cop, p->first);
+ }
}
void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
gather.activate();
}
+void PrimaryLogPG::_copy_some_manifest(ObjectContextRef obc, CopyOpRef cop, uint64_t start_offset)
+{
+ dout(10) << __func__ << " " << obc << " " << cop << dendl;
+
+ unsigned flags = 0;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_FLUSH)
+ flags |= CEPH_OSD_FLAG_FLUSH;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE)
+ flags |= CEPH_OSD_FLAG_IGNORE_CACHE;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY)
+ flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE)
+ flags |= CEPH_OSD_FLAG_MAP_SNAP_CLONE;
+ if (cop->flags & CEPH_OSD_COPY_FROM_FLAG_RWORDERED)
+ flags |= CEPH_OSD_FLAG_RWORDERED;
+
+ int num_chunks = 0;
+ uint64_t last_offset = 0, chunks_size = 0;
+ object_manifest_t *manifest = &obc->obs.oi.manifest;
+ map<uint64_t, chunk_info_t>::iterator iter = manifest->chunk_map.find(start_offset);
+ for (;iter != manifest->chunk_map.end(); ++iter) {
+ num_chunks++;
+ chunks_size += iter->second.length;
+ last_offset = iter->first;
+ if (get_copy_chunk_size() < chunks_size) {
+ break;
+ }
+ }
+
+ cop->num_chunk = num_chunks;
+ cop->start_offset = start_offset;
+ cop->last_offset = last_offset;
+ dout(20) << __func__ << " oid " << obc->obs.oi.soid << " num_chunks: " << num_chunks
+ << " start_offset: " << start_offset << " chunks_size: " << chunks_size
+ << " last_offset: " << last_offset << dendl;
+
+ iter = manifest->chunk_map.find(start_offset);
+ for (;iter != manifest->chunk_map.end(); ++iter) {
+ uint64_t obj_offset = iter->first;
+ uint64_t length = manifest->chunk_map[iter->first].length;
+ hobject_t soid = manifest->chunk_map[iter->first].oid;
+ object_locator_t oloc(soid);
+ CopyOpRef sub_cop(std::make_shared<CopyOp>(cop->cb, cop->obc, cop->src, oloc,
+ cop->results.user_version, cop->flags, cop->mirror_snapset,
+ cop->src_obj_fadvise_flags, cop->dest_obj_fadvise_flags));
+ sub_cop->cursor.data_offset = obj_offset;
+ cop->chunk_cops[obj_offset] = sub_cop;
+
+ int s = sub_cop->chunk_ops.size();
+ sub_cop->chunk_ops.resize(s+1);
+ sub_cop->chunk_ops[s].op.op = CEPH_OSD_OP_READ;
+ sub_cop->chunk_ops[s].op.extent.offset = manifest->chunk_map[iter->first].offset;
+ sub_cop->chunk_ops[s].op.extent.length = length;
+
+ ObjectOperation op;
+ op.dup(sub_cop->chunk_ops);
+
+ dout(20) << __func__ << " tgt_oid: " << soid.oid << " tgt_offset: "
+ << manifest->chunk_map[iter->first].offset
+ << " length: " << length << " pool id: " << oloc.pool << dendl;
+
+ if (cop->results.user_version) {
+ op.assert_version(cop->results.user_version);
+ } else {
+ // we should learn the version after the first chunk, if we didn't know
+ // it already!
+ assert(cop->cursor.is_initial());
+ }
+ op.set_last_op_flags(cop->src_obj_fadvise_flags);
+
+ C_CopyChunk *fin = new C_CopyChunk(this, obc->obs.oi.soid,
+ get_last_peering_reset(), cop);
+ fin->offset = obj_offset;
+ unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+
+ ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, op,
+ sub_cop->src.snap, NULL,
+ flags,
+ new C_OnFinisher(fin, osd->objecter_finishers[n]),
+ // discover the object version if we don't know it yet
+ sub_cop->results.user_version ? NULL : &sub_cop->results.user_version);
+ fin->tid = tid;
+ sub_cop->objecter_tid = tid;
+ if (last_offset < iter->first) {
+ break;
+ }
+ }
+}
+
void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
{
dout(10) << __func__ << " " << oid << " tid " << tid
// cancel and requeue proxy ops on this object
if (!r) {
- for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
- it != proxyread_ops.end();) {
- if (it->second->soid == cobc->obs.oi.soid) {
- cancel_proxy_read((it++)->second);
- } else {
- ++it;
- }
+ cancel_and_requeue_proxy_ops(cobc->obs.oi.soid);
+ }
+
+ kick_object_context_blocked(cobc);
+}
+
+void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, int r, uint64_t offset)
+{
+ dout(10) << __func__ << " " << oid << " tid " << tid
+ << " " << cpp_strerror(r) << dendl;
+ map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
+ if (p == copy_ops.end()) {
+ dout(10) << __func__ << " no copy_op found" << dendl;
+ return;
+ }
+ CopyOpRef obj_cop = p->second;
+ CopyOpRef chunk_cop = obj_cop->chunk_cops[offset];
+
+ if (tid != chunk_cop->objecter_tid) {
+ dout(10) << __func__ << " tid " << tid << " != cop " << chunk_cop
+ << " tid " << chunk_cop->objecter_tid << dendl;
+ return;
+ }
+
+ if (chunk_cop->omap_data.length() || chunk_cop->omap_header.length()) {
+ r = -EOPNOTSUPP;
+ }
+
+ chunk_cop->objecter_tid = 0;
+ chunk_cop->objecter_tid2 = 0; // assume this ordered before us (if it happened)
+ ObjectContextRef& cobc = obj_cop->obc;
+ OSDOp &chunk_data = chunk_cop->chunk_ops[0];
+
+ if (r < 0) {
+ obj_cop->failed = true;
+ goto out;
+ }
+
+ if (obj_cop->failed) {
+ return;
+ }
+
+ if (!chunk_data.outdata.length()) {
+ r = -EIO;
+ obj_cop->failed = true;
+ goto out;
+ }
+
+ obj_cop->num_chunk--;
+
+ /* check all of the copyop are completed */
+ if (obj_cop->num_chunk) {
+ dout(20) << __func__ << " num_chunk: " << obj_cop->num_chunk << dendl;
+ return;
+ }
+
+ {
+ OpContextUPtr ctx = simple_opc_create(obj_cop->obc);
+ PGTransaction *t = ctx->op_t.get();
+ ObjectState& obs = ctx->new_obs;
+ for (auto p : obj_cop->chunk_cops) {
+ OSDOp &sub_chunk = p.second->chunk_ops[0];
+ t->write(cobc->obs.oi.soid,
+ p.second->cursor.data_offset,
+ sub_chunk.outdata.length(),
+ sub_chunk.outdata,
+ p.second->dest_obj_fadvise_flags);
+ obs.oi.manifest.chunk_map[p.second->cursor.data_offset].flags = 0; // clean
+ dout(20) << __func__ << " offset: " << p.second->cursor.data_offset
+ << " length: " << sub_chunk.outdata.length() << dendl;
+ sub_chunk.outdata.clear();
+ write_update_size_and_usage(ctx->delta_stats, obs.oi, ctx->modified_ranges,
+ p.second->cursor.data_offset, sub_chunk.outdata.length());
}
- for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
- it != proxywrite_ops.end();) {
- if (it->second->soid == cobc->obs.oi.soid) {
- cancel_proxy_write((it++)->second);
- } else {
- ++it;
+ obs.oi.clear_data_digest();
+ ctx->at_version = get_next_version();
+ finish_ctx(ctx.get(), pg_log_entry_t::PROMOTE);
+ simple_opc_submit(std::move(ctx));
+
+ auto p = cobc->obs.oi.manifest.chunk_map.end();
+ /* check remaining work */
+ if (obj_cop->last_offset >= p->first + p->second.length) {
+ for (auto &en : cobc->obs.oi.manifest.chunk_map) {
+ if (obj_cop->last_offset < en.first) {
+ _copy_some_manifest(cobc, obj_cop, en.first);
+ return;
+ }
}
}
- kick_proxy_ops_blocked(cobc->obs.oi.soid);
+ }
+
+ out:
+ dout(20) << __func__ << " complete r = " << cpp_strerror(r) << dendl;
+ CopyCallbackResults results(r, &obj_cop->results);
+ obj_cop->cb->complete(results);
+
+ copy_ops.erase(cobc->obs.oi.soid);
+ cobc->stop_block();
+
+ // cancel and requeue proxy ops on this object
+ if (!r) {
+ cancel_and_requeue_proxy_ops(cobc->obs.oi.soid);
}
kick_object_context_blocked(cobc);
}
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+ for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+ it != proxyread_ops.end();) {
+ if (it->second->soid == oid) {
+ cancel_proxy_read((it++)->second);
+ } else {
+ ++it;
+ }
+ }
+ for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+ it != proxywrite_ops.end();) {
+ if (it->second->soid == oid) {
+ cancel_proxy_write((it++)->second);
+ } else {
+ ++it;
+ }
+ }
+ kick_proxy_ops_blocked(oid);
+}
+
void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
{
dout(20) << __func__ << " " << cop
agent_choose_mode();
}
+void PrimaryLogPG::finish_promote_manifest(int r, CopyResults *results,
+ ObjectContextRef obc)
+{
+ const hobject_t& soid = obc->obs.oi.soid;
+ dout(10) << __func__ << " " << soid << " r=" << r
+ << " uv" << results->user_version << dendl;
+
+ if (r == -ECANCELED) {
+ return;
+ }
+
+ if (r < 0) {
+ derr << __func__ << " unexpected promote error " << cpp_strerror(r) << dendl;
+ // pass error to everyone blocked on this object
+ // FIXME: this is pretty sloppy, but at this point we got
+ // something unexpected and don't have many other options.
+ map<hobject_t,list<OpRequestRef>>::iterator blocked_iter =
+ waiting_for_blocked_object.find(soid);
+ if (blocked_iter != waiting_for_blocked_object.end()) {
+ while (!blocked_iter->second.empty()) {
+ osd->reply_op_error(blocked_iter->second.front(), r);
+ blocked_iter->second.pop_front();
+ }
+ waiting_for_blocked_object.erase(blocked_iter);
+ }
+ return;
+ }
+
+ osd->promote_finish(results->object_size);
+ osd->logger->inc(l_osd_tier_promote);
+
+ if (agent_state &&
+ agent_state->is_idle())
+ agent_choose_mode();
+}
+
void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
{
dout(10) << __func__ << " " << cop->obc->obs.oi.soid