return;
}
+struct C_ProxyRead : public Context {
+ ReplicatedPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ ceph_tid_t tid;
+ ReplicatedPG::ProxyReadOpRef prdop;
+ C_ProxyRead(ReplicatedPG *p, hobject_t o, epoch_t lpr,
+ const ReplicatedPG::ProxyReadOpRef& prd)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0), prdop(prd)
+ {}
+ void finish(int r) {
+ pg->lock();
+ if (last_peering_reset == pg->get_last_peering_reset()) {
+ pg->finish_proxy_read(oid, tid, r);
+ }
+ pg->unlock();
+ }
+};
+
+void ReplicatedPG::do_proxy_read(OpRequestRef op)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ object_locator_t oloc(m->get_object_locator());
+ oloc.pool = pool.info.tier_of;
+
+ hobject_t soid(m->get_oid(),
+ m->get_object_locator().key,
+ m->get_snapid(),
+ m->get_pg().ps(),
+ m->get_object_locator().get_pool(),
+ m->get_object_locator().nspace);
+ unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+ dout(10) << __func__ << " Start proxy read for " << *m << dendl;
+
+ ProxyReadOpRef prdop(new ProxyReadOp(op, soid, m->ops));
+
+ ObjectOperation obj_op;
+ obj_op.dup(prdop->ops);
+
+ C_ProxyRead *fin = new C_ProxyRead(this, soid, get_last_peering_reset(), prdop);
+ ceph_tid_t tid = osd->objecter->read(soid.oid, oloc, obj_op,
+ m->get_snapid(), NULL,
+ flags, fin,
+ &prdop->user_version, &prdop->data_offset);
+ fin->tid = tid;
+ prdop->objecter_tid = tid;
+ proxyread_ops[tid] = prdop;
+ in_progress_proxy_reads[soid].push_back(op);
+}
+
+void ReplicatedPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
+{
+ dout(10) << __func__ << " " << oid << " tid " << tid
+ << " " << cpp_strerror(r) << dendl;
+
+ map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.find(tid);
+ if (p == proxyread_ops.end()) {
+ dout(10) << __func__ << " no proxyread_op found" << dendl;
+ return;
+ }
+ ProxyReadOpRef prdop = p->second;
+ if (tid != prdop->objecter_tid) {
+ dout(10) << __func__ << " tid " << tid << " != prdop " << prdop
+ << " tid " << prdop->objecter_tid << dendl;
+ return;
+ }
+ if (oid != prdop->soid) {
+ dout(10) << __func__ << " oid " << oid << " != prdop " << prdop
+ << " soid " << prdop->soid << dendl;
+ return;
+ }
+ proxyread_ops.erase(tid);
+
+ map<hobject_t, list<OpRequestRef> >::iterator q = in_progress_proxy_reads.find(oid);
+ if (q == in_progress_proxy_reads.end()) {
+ dout(10) << __func__ << " no in_progress_proxy_reads found" << dendl;
+ return;
+ }
+ assert(q->second.size());
+ OpRequestRef op = q->second.front();
+ assert(op == prdop->op);
+ q->second.pop_front();
+ if (q->second.size() == 0) {
+ in_progress_proxy_reads.erase(oid);
+ }
+
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
+ ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+ ctx->user_at_version = prdop->user_version;
+ ctx->data_off = prdop->data_offset;
+ complete_read_ctx(r, ctx);
+}
+
class PromoteCallback: public ReplicatedPG::CopyCallback {
ObjectContextRef obc;
ReplicatedPG *pg;
publish_stats_to_osd();
// on read, return the current object version
- reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
+ if (ctx->obs) {
+ reply->set_reply_versions(eversion_t(), ctx->obs->oi.user_version);
+ } else {
+ reply->set_reply_versions(eversion_t(), ctx->user_at_version);
+ }
} else if (result == -ENOENT) {
// on ENOENT, set a floor for what the next user version will be.
reply->set_enoent_reply_versions(info.last_update, info.last_user_version);
friend class CopyFromCallback;
friend class PromoteCallback;
+ struct ProxyReadOp {
+ OpRequestRef op;
+ hobject_t soid;
+ ceph_tid_t objecter_tid;
+ vector<OSDOp> &ops;
+ version_t user_version;
+ int data_offset;
+
+ ProxyReadOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
+ : op(_op), soid(oid),
+ objecter_tid(0), ops(_ops),
+ user_version(0), data_offset(0) { }
+ };
+ typedef boost::shared_ptr<ProxyReadOp> ProxyReadOpRef;
+
struct FlushOp {
ObjectContextRef obc; ///< obc we are flushing
OpRequestRef op; ///< initiating op
snapset = &obc->ssc->snapset;
}
}
+ OpContext(OpRequestRef _op, osd_reqid_t _reqid,
+ vector<OSDOp>& _ops, ReplicatedPG *_pg) :
+ op(_op), reqid(_reqid), ops(_ops), obs(NULL), snapset(0),
+ modify(false), user_modify(false), undirty(false), cache_evict(false),
+ bytes_written(0), bytes_read(0), user_at_version(0),
+ current_osd_subop_num(0),
+ op_t(NULL),
+ data_off(0), reply(NULL), pg(_pg),
+ num_read(0),
+ num_write(0),
+ copy_cb(NULL),
+ async_read_result(0),
+ inflightreads(0),
+ lock_to_release(NONE),
+ on_finish(NULL),
+ release_snapset_obc(false) { }
void reset_obs(ObjectContextRef obc) {
new_obs = ObjectState(obc->obs.oi, obc->obs.exists);
if (obc->ssc) {
bool pgls_filter(PGLSFilter *filter, hobject_t& sobj, bufferlist& outdata);
int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
+ // -- proxyread --
+ map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
+ map<hobject_t, list<OpRequestRef> > in_progress_proxy_reads;
+
+ void do_proxy_read(OpRequestRef op);
+ void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
+
+ friend struct C_ProxyRead;
+
public:
ReplicatedPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool, spg_t p);
*op->objver = m->get_user_version();
if (op->reply_epoch)
*op->reply_epoch = m->get_map_epoch();
+ if (op->data_offset)
+ *op->data_offset = m->get_header().data_off;
// per-op result demuxing
vector<OSDOp> out_ops;
// sure older osds don't trip over an unsupported opcode.
set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
}
+
+ void dup(vector<OSDOp>& sops) {
+ ops = sops;
+ out_bl.resize(sops.size());
+ out_handler.resize(sops.size());
+ out_rval.resize(sops.size());
+ for (uint32_t i = 0; i < sops.size(); i++) {
+ out_bl[i] = &sops[i].outdata;
+ out_handler[i] = NULL;
+ out_rval[i] = &sops[i].rval;
+ }
+ }
};
/// the very first OP of the series and released upon receiving the last OP reply.
bool ctx_budgeted;
+ int *data_offset;
+
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
- int f, Context *ac, Context *co, version_t *ov) :
+ int f, Context *ac, Context *co, version_t *ov, int *offset = NULL) :
session(NULL), incarnation(0),
target(o, ol, f),
con(NULL),
map_dne_bound(0),
budgeted(false),
should_resend(true),
- ctx_budgeted(false) {
+ ctx_budgeted(false),
+ data_offset(offset) {
ops.swap(op);
/* initialize out_* to match op vector */
Op *prepare_read_op(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snapid, bufferlist *pbl, int flags,
- Context *onack, version_t *objver = NULL) {
- Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver);
+ Context *onack, version_t *objver = NULL, int *data_offset = NULL) {
+ Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset);
o->priority = op.priority;
o->snapid = snapid;
o->outbl = pbl;
ceph_tid_t read(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snapid, bufferlist *pbl, int flags,
- Context *onack, version_t *objver = NULL) {
- Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver);
+ Context *onack, version_t *objver = NULL, int *data_offset = NULL) {
+ Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, data_offset);
return op_submit(o);
}
ceph_tid_t pg_read(uint32_t hash, object_locator_t oloc,
return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, objver);
}
-
+
// writes
ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
vector<OSDOp>& ops, utime_t mtime,