return false;
break;
case pg_pool_t::CACHEMODE_WRITEBACK:
- if (obc.get()) {
+ if (obc.get() && obc->obs.exists) { // we have the object already
return false;
- } else {
- do_cache_redirect(op, obc);
+ } else { // try and promote!
+ promote_object(op, obc);
return true;
}
break;
do_cache_redirect(op, obc);
return true;
break;
- case pg_pool_t::CACHEMODE_READONLY:
- if (obc.get() && !r) {
+ case pg_pool_t::CACHEMODE_READONLY: // TODO: clean this case up
+ if (!obc.get() && r == -ENOENT) { // we don't have the object and op's a read
+ promote_object(op, obc);
+ return true;
+ } else if (obc.get() && obc->obs.exists) { // we have the object locally
return false;
- } else {
+ } else if (!r) { // it must be a write
do_cache_redirect(op, obc);
return true;
+ } else { // crap, there was a failure of some kind
+ return false;
}
break;
default:
return;
}
+void ReplicatedPG::promote_object(OpRequestRef op, ObjectContextRef obc)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ if (!obc.get()) { // we need to create an ObjectContext
+ int r = find_object_context(
+ hobject_t(m->get_oid(),
+ m->get_object_locator().key,
+ m->get_snapid(),
+ m->get_pg().ps(),
+ m->get_object_locator().get_pool(),
+ m->get_object_locator().nspace),
+ &obc, true, NULL);
+ assert(r == 0); // a lookup that allows creates can't fail now
+ }
+dout(10) << __func__ << " " << obc->obs.oi.soid << dendl;
+
+ hobject_t temp_target = generate_temp_object();
+ PromoteCallback *cb = new PromoteCallback(obc, temp_target, this);
+ object_locator_t oloc(m->get_object_locator());
+ oloc.pool = pool.info.tier_of;
+ start_copy(cb, obc, obc->obs.oi.soid, oloc, 0, temp_target);
+
+ assert(obc->is_blocked());
+ wait_for_blocked_object(obc->obs.oi.soid, op);
+}
+
void ReplicatedPG::execute_ctx(OpContext *ctx)
{
dout(10) << __func__ << " " << ctx << dendl;
ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10);
}
+void ReplicatedPG::finish_promote(CopyResults *results, ObjectContextRef obc,
+ hobject_t& temp_obj)
+{
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this);
+ tctx->mtime = ceph_clock_now(g_ceph_context);
+ tctx->op_t.swap(results->final_tx);
+ if (results->started_temp_obj) {
+ tctx->discard_temp_oid = temp_obj;
+ }
+
+ RepGather *repop = new_repop(tctx, obc, rep_tid);
+ C_KickBlockedObject *blockedcb = new C_KickBlockedObject(obc, this);
+ repop->ondone = blockedcb;
+ object_stat_sum_t delta;
+ ++delta.num_objects;
+ obc->obs.exists = true;
+ delta.num_bytes += results->object_size;
+ obc->obs.oi.category = results->category;
+ info.stats.stats.add(delta, obc->obs.oi.category);
+ tctx->at_version.epoch = get_osdmap()->get_epoch();
+ tctx->at_version.version = pg_log.get_head().version + 1;
+ tctx->user_at_version = results->user_version;
+
+ tctx->log.push_back(pg_log_entry_t(
+ pg_log_entry_t::MODIFY,
+ obc->obs.oi.soid,
+ tctx->at_version,
+ tctx->obs->oi.version,
+ tctx->user_at_version,
+ osd_reqid_t(),
+ repop->ctx->mtime));
+ append_log(tctx->log, eversion_t(), tctx->local_t);
+ issue_repop(repop, repop->ctx->mtime);
+ eval_repop(repop);
+ repop->put();
+}
+
void ReplicatedPG::cancel_copy(CopyOpRef cop, bool requeue)
{
dout(10) << __func__ << " " << cop->obc->obs.oi.soid
};
friend class CopyFromCallback;
+ class PromoteCallback: public CopyCallback {
+ ObjectContextRef obc;
+ hobject_t temp_obj;
+ ReplicatedPG *pg;
+ public:
+ PromoteCallback(ObjectContextRef obc_, const hobject_t& temp_obj_,
+ ReplicatedPG *pg_) :
+ obc(obc_), temp_obj(temp_obj_), pg(pg_) {}
+
+ virtual void finish(CopyCallbackResults results) {
+ CopyResults* results_data = results.get<1>();
+ assert(results.get<0>() == 0); // we don't handle errors right now
+ pg->finish_promote(results_data, obc, temp_obj);
+ delete results_data;
+ }
+ };
+
boost::scoped_ptr<PGBackend> pgbackend;
PGBackend *get_pgbackend() {
return pgbackend.get();
uint64_t offset, uint64_t length, bool count_bytes);
void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
+ /**
+ * This helper function is called from do_op if the ObjectContext lookup fails.
+ * @returns true if the caching code is handling the Op, false otherwise.
+ */
inline bool maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, int r);
+ /**
+ * This helper function tells the client to redirect their request elsewhere.
+ */
void do_cache_redirect(OpRequestRef op, ObjectContextRef obc);
+ /**
+ * This function starts up a copy from
+ */
+ void promote_object(OpRequestRef op, ObjectContextRef obc);
int prepare_transaction(OpContext *ctx);
void _build_finish_copy_transaction(CopyOpRef cop,
ObjectStore::Transaction& t);
void finish_copyfrom(OpContext *ctx);
+ void finish_promote(CopyResults *results, ObjectContextRef obc,
+ hobject_t& temp_obj);
void cancel_copy(CopyOpRef cop, bool requeue);
void cancel_copy_ops(bool requeue);