From: Greg Farnum Date: Thu, 26 Jan 2012 01:30:07 +0000 (-0800) Subject: PG: switch op passing interface to use OpRequest X-Git-Tag: v0.44~99^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ba392e3d28bb52c2e4bd5286188fb15aaaf7910b;p=ceph.git PG: switch op passing interface to use OpRequest This is all the PG/ReplicatedPG internals and the few remaining OSD callers. Signed-off-by: Greg Farnum --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 2c6a3716905e..7dfb07f6e22f 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5624,19 +5624,19 @@ void OSD::dequeue_op(PG *pg) break; case MSG_OSD_SUBOP: - pg->do_sub_op((MOSDSubOp*)op); + pg->do_sub_op(op); break; case MSG_OSD_SUBOPREPLY: - pg->do_sub_op_reply((MOSDSubOpReply*)op); + pg->do_sub_op_reply(op); break; case MSG_OSD_PG_SCAN: - pg->do_scan((MOSDPGScan*)op); + pg->do_scan(op); break; case MSG_OSD_PG_BACKFILL: - pg->do_backfill((MOSDPGBackfill*)op); + pg->do_backfill(op); break; default: diff --git a/src/osd/PG.cc b/src/osd/PG.cc index a0186583ee8a..6b0361bfd19d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -15,6 +15,7 @@ #include "PG.h" #include "common/config.h" #include "OSD.h" +#include "OpRequest.h" #include "common/Timer.h" @@ -550,7 +551,7 @@ bool PG::search_for_missing(const Info &oinfo, const Missing *omissing, map >::iterator ml = missing_loc.find(soid); if (ml == missing_loc.end()) { - map >::iterator wmo = + map >::iterator wmo = waiting_for_missing_object.find(soid); if (wmo != waiting_for_missing_object.end()) { osd->requeue_ops(this, wmo->second); @@ -1381,11 +1382,11 @@ void PG::replay_queued_ops() { assert(is_replay() && is_active()); eversion_t c = info.last_update; - list replay; + list replay; dout(10) << "replay_queued_ops" << dendl; state_clear(PG_STATE_REPLAY); - for (map::iterator p = replay_queue.begin(); + for (map::iterator p = replay_queue.begin(); p != replay_queue.end(); p++) { if (p->first.version != c.version+1) { @@ -1395,7 +1396,8 @@ void PG::replay_queued_ops() << dendl; c = p->first; } - dout(10) << "activate replay " << p->first << " " << *p->second << dendl; + dout(10) << "activate replay " << p->first << " " + << *p->second->request << dendl; replay.push_back(p->second); } replay_queue.clear(); @@ -2257,9 +2259,9 @@ void PG::adjust_local_snaps() } } -void PG::requeue_object_waiters(map >& m) +void PG::requeue_object_waiters(map >& m) { - for (map >::iterator it = m.begin(); + for (map >::iterator it = m.begin(); it != m.end(); it++) osd->requeue_ops(this, it->second); @@ -2337,21 +2339,23 @@ bool PG::sched_scrub() } -void PG::sub_op_scrub_map(MOSDSubOp *op) +void PG::sub_op_scrub_map(OpRequest *op) { + MOSDSubOp *m = (MOSDSubOp *)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_map" << dendl; - if (op->map_epoch < info.history.same_interval_since) { + if (m->map_epoch < info.history.same_interval_since) { dout(10) << "sub_op_scrub discarding old sub_op from " - << op->map_epoch << " < " << info.history.same_interval_since << dendl; + << m->map_epoch << " < " << info.history.same_interval_since << dendl; op->put(); return; } - int from = op->get_source().num(); + int from = m->get_source().num(); dout(10) << " got osd." << from << " scrub map" << dendl; - bufferlist::iterator p = op->get_data().begin(); + bufferlist::iterator p = m->get_data().begin(); if (scrub_received_maps.count(from)) { ScrubMap incoming; incoming.decode(p); @@ -2407,8 +2411,10 @@ void PG::_request_scrub_map(int replica, eversion_t version) get_osdmap()->get_cluster_inst(replica)); } -void PG::sub_op_scrub_reserve(MOSDSubOp *op) +void PG::sub_op_scrub_reserve(OpRequest *op) { + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_reserve" << dendl; if (scrub_reserved) { @@ -2419,15 +2425,17 @@ void PG::sub_op_scrub_reserve(MOSDSubOp *op) scrub_reserved = osd->inc_scrubs_pending(); - MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); ::encode(scrub_reserved, reply->get_data()); - osd->cluster_messenger->send_message(reply, op->get_connection()); + osd->cluster_messenger->send_message(reply, m->get_connection()); op->put(); } -void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op) +void PG::sub_op_scrub_reserve_reply(OpRequest *op) { + MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request; + assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(7) << "sub_op_scrub_reserve_reply" << dendl; if (!scrub_reserved) { @@ -2436,8 +2444,8 @@ void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op) return; } - int from = op->get_source().num(); - bufferlist::iterator p = op->get_data().begin(); + int from = reply->get_source().num(); + bufferlist::iterator p = reply->get_data().begin(); bool reserved; ::decode(reserved, p); @@ -2458,8 +2466,9 @@ void PG::sub_op_scrub_reserve_reply(MOSDSubOpReply *op) op->put(); } -void PG::sub_op_scrub_unreserve(MOSDSubOp *op) +void PG::sub_op_scrub_unreserve(OpRequest *op) { + assert(op->request->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_unreserve" << dendl; clear_scrub_reserved(); @@ -2467,15 +2476,17 @@ void PG::sub_op_scrub_unreserve(MOSDSubOp *op) op->put(); } -void PG::sub_op_scrub_stop(MOSDSubOp *op) +void PG::sub_op_scrub_stop(OpRequest *op) { + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_stop" << dendl; // see comment in sub_op_scrub_reserve scrub_reserved = false; - MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - osd->cluster_messenger->send_message(reply, op->get_connection()); + MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + osd->cluster_messenger->send_message(reply, m->get_connection()); op->put(); } @@ -3370,8 +3381,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap, clear_stats(); // take replay queue waiters - list ls; - for (map::iterator it = replay_queue.begin(); + list ls; + for (map::iterator it = replay_queue.begin(); it != replay_queue.end(); it++) ls.push_back(it->second); diff --git a/src/osd/PG.h b/src/osd/PG.h index 965614d71e77..d3fee3d68462 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -52,6 +52,7 @@ using namespace __gnu_cxx; class OSD; +class OpRequest; class MOSDOp; class MOSDSubOp; class MOSDSubOpReply; @@ -842,7 +843,7 @@ public: } - list op_queue; // op queue + list op_queue; // op queue bool dirty_info, dirty_log; @@ -1477,14 +1478,14 @@ protected: // pg waiters - list waiting_for_active; - list waiting_for_all_missing; - map > waiting_for_missing_object, + list waiting_for_active; + list waiting_for_all_missing; + map > waiting_for_missing_object, waiting_for_degraded_object; - map > waiting_for_ondisk; - map replay_queue; + map > waiting_for_ondisk; + map replay_queue; - void requeue_object_waiters(map >& m); + void requeue_object_waiters(map >& m); // stats Mutex pg_stats_lock; @@ -1638,11 +1639,11 @@ public: bool sched_scrub(); void replica_scrub(class MOSDRepScrub *op); - void sub_op_scrub_map(class MOSDSubOp *op); - void sub_op_scrub_reserve(class MOSDSubOp *op); - void sub_op_scrub_reserve_reply(class MOSDSubOpReply *op); - void sub_op_scrub_unreserve(class MOSDSubOp *op); - void sub_op_scrub_stop(class MOSDSubOp *op); + void sub_op_scrub_map(OpRequest *op); + void sub_op_scrub_reserve(OpRequest *op); + void sub_op_scrub_reserve_reply(OpRequest *op); + void sub_op_scrub_unreserve(OpRequest *op); + void sub_op_scrub_stop(OpRequest *op); public: PG(OSD *o, PGPool *_pool, pg_t p, const hobject_t& loid, const hobject_t& ioid) : @@ -1790,11 +1791,11 @@ public: void on_removal(); // abstract bits - virtual void do_op(MOSDOp *op) = 0; - virtual void do_sub_op(MOSDSubOp *op) = 0; - virtual void do_sub_op_reply(MOSDSubOpReply *op) = 0; - virtual void do_scan(MOSDPGScan *op) = 0; - virtual void do_backfill(MOSDPGBackfill *op) = 0; + virtual void do_op(OpRequest *op) = 0; + virtual void do_sub_op(OpRequest *op) = 0; + virtual void do_sub_op_reply(OpRequest *op) = 0; + virtual void do_scan(OpRequest *op) = 0; + virtual void do_backfill(OpRequest *op) = 0; virtual bool snap_trimmer() = 0; virtual bool same_for_read_since(epoch_t e) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b81a03848bbe..d40ade61070b 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -14,6 +14,7 @@ #include "PG.h" #include "ReplicatedPG.h" #include "OSD.h" +#include "OpRequest.h" #include "PGLS.h" #include "common/errno.h" @@ -98,7 +99,7 @@ bool ReplicatedPG::is_missing_object(const hobject_t& soid) return missing.missing.count(soid); } -void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, Message *m) +void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op) { assert(is_missing_object(soid)); @@ -118,12 +119,12 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, Message *m) dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl; pull(soid, v); } - waiting_for_missing_object[soid].push_back(m); + waiting_for_missing_object[soid].push_back(op); } -void ReplicatedPG::wait_for_all_missing(Message *m) +void ReplicatedPG::wait_for_all_missing(OpRequest *op) { - waiting_for_all_missing.push_back(m); + waiting_for_all_missing.push_back(op); } bool ReplicatedPG::is_degraded_object(const hobject_t& soid) @@ -147,7 +148,7 @@ bool ReplicatedPG::is_degraded_object(const hobject_t& soid) return false; } -void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, Message *m) +void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op) { assert(is_degraded_object(soid)); @@ -173,7 +174,7 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, Message *m) } recover_object_replicas(soid, v); } - waiting_for_degraded_object[soid].push_back(m); + waiting_for_degraded_object[soid].push_back(op); } bool PGLSParentFilter::filter(bufferlist& xattr_data, bufferlist& outdata) @@ -256,9 +257,11 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op) return false; } -void ReplicatedPG::do_pg_op(MOSDOp *op) +void ReplicatedPG::do_pg_op(OpRequest *op) { - dout(10) << "do_pg_op " << *op << dendl; + MOSDOp *m = (MOSDOp *)op->request; + assert(m->get_header().type == CEPH_MSG_OSD_OP); + dout(10) << "do_pg_op " << *m << dendl; bufferlist outdata; int result = 0; @@ -266,9 +269,9 @@ void ReplicatedPG::do_pg_op(MOSDOp *op) PGLSFilter *filter = NULL; bufferlist filter_out; - snapid_t snapid = op->get_snapid(); + snapid_t snapid = m->get_snapid(); - for (vector::iterator p = op->ops.begin(); p != op->ops.end(); p++) { + for (vector::iterator p = m->ops.begin(); p != m->ops.end(); p++) { bufferlist::iterator bp = p->indata.begin(); switch (p->op.op) { case CEPH_OSD_OP_PGLS_FILTER: @@ -277,7 +280,7 @@ void ReplicatedPG::do_pg_op(MOSDOp *op) ::decode(mname, bp); } catch (const buffer::error& e) { - dout(0) << "unable to decode PGLS_FILTER description in " << *op << dendl; + dout(0) << "unable to decode PGLS_FILTER description in " << *m << dendl; result = -EINVAL; break; } @@ -290,11 +293,11 @@ void ReplicatedPG::do_pg_op(MOSDOp *op) // fall through case CEPH_OSD_OP_PGLS: - if (op->get_pg() != info.pgid) { - dout(10) << " pgls pg=" << op->get_pg() << " != " << info.pgid << dendl; + if (m->get_pg() != info.pgid) { + dout(10) << " pgls pg=" << m->get_pg() << " != " << info.pgid << dendl; result = 0; // hmm? } else { - dout(10) << " pgls pg=" << op->get_pg() << " count " << p->op.pgls.count << dendl; + dout(10) << " pgls pg=" << m->get_pg() << " count " << p->op.pgls.count << dendl; // read into a buffer vector sentries; PGLSResponse response; @@ -302,7 +305,7 @@ void ReplicatedPG::do_pg_op(MOSDOp *op) ::decode(response.handle, bp); } catch (const buffer::error& e) { - dout(0) << "unable to decode PGLS handle in " << *op << dendl; + dout(0) << "unable to decode PGLS handle in " << *m << dendl; result = -EINVAL; break; } @@ -390,11 +393,11 @@ void ReplicatedPG::do_pg_op(MOSDOp *op) } // reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), + MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); reply->set_data(outdata); reply->set_result(result); - osd->client_messenger->send_message(reply, op->get_connection()); + osd->client_messenger->send_message(reply, m->get_connection()); op->put(); delete filter; } @@ -445,72 +448,74 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo * pg lock will be held (if multithreaded) * osd_lock NOT held. */ -void ReplicatedPG::do_op(MOSDOp *op) +void ReplicatedPG::do_op(OpRequest *op) { - if ((op->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) { - if (pg_op_must_wait(op)) { + MOSDOp *m = (MOSDOp*)op->request; + assert(m->get_header().type == CEPH_MSG_OSD_OP); + if ((m->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) { + if (pg_op_must_wait(m)) { wait_for_all_missing(op); return; } return do_pg_op(op); } - dout(10) << "do_op " << *op << (op->may_write() ? " may_write" : "") << dendl; + dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl; - if (finalizing_scrub && op->may_write()) { + if (finalizing_scrub && m->may_write()) { dout(20) << __func__ << ": waiting for scrub" << dendl; waiting_for_active.push_back(op); return; } // missing object? - hobject_t head(op->get_oid(), op->get_object_locator().key, - CEPH_NOSNAP, op->get_pg().ps()); + hobject_t head(m->get_oid(), m->get_object_locator().key, + CEPH_NOSNAP, m->get_pg().ps()); if (is_missing_object(head)) { wait_for_missing_object(head, op); return; } // degraded object? - if (op->may_write() && is_degraded_object(head)) { + if (m->may_write() && is_degraded_object(head)) { wait_for_degraded_object(head, op); return; } // missing snapdir? - hobject_t snapdir(op->get_oid(), op->get_object_locator().key, - CEPH_SNAPDIR, op->get_pg().ps()); + hobject_t snapdir(m->get_oid(), m->get_object_locator().key, + CEPH_SNAPDIR, m->get_pg().ps()); if (is_missing_object(snapdir)) { wait_for_missing_object(snapdir, op); return; } // degraded object? - if (op->may_write() && is_degraded_object(snapdir)) { + if (m->may_write() && is_degraded_object(snapdir)) { wait_for_degraded_object(snapdir, op); return; } - entity_inst_t client = op->get_source_inst(); + entity_inst_t client = m->get_source_inst(); ObjectContext *obc; - bool can_create = op->may_write(); + bool can_create = m->may_write(); snapid_t snapid; - int r = find_object_context(hobject_t(op->get_oid(), - op->get_object_locator().key, - op->get_snapid(), op->get_pg().ps()), - op->get_object_locator(), + int r = find_object_context(hobject_t(m->get_oid(), + m->get_object_locator().key, + m->get_snapid(), m->get_pg().ps()), + m->get_object_locator(), &obc, can_create, &snapid); if (r) { if (r == -EAGAIN) { // If we're not the primary of this OSD, and we have // CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise, // we have to wait for the object. - if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) { + if (is_primary() || (!(m->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) { // missing the specific snap we need; requeue and wait. assert(!can_create); // only happens on a read - hobject_t soid(op->get_oid(), op->get_object_locator().key, - snapid, op->get_pg().ps()); + hobject_t soid(m->get_oid(), m->get_object_locator().key, + snapid, m->get_pg().ps()); wait_for_missing_object(soid, op); return; } @@ -520,17 +525,17 @@ void ReplicatedPG::do_op(MOSDOp *op) } // make sure locator is consistent - if (op->get_object_locator() != obc->obs.oi.oloc) { - dout(10) << " provided locator " << op->get_object_locator() + if (m->get_object_locator() != obc->obs.oi.oloc) { + dout(10) << " provided locator " << m->get_object_locator() << " != object's " << obc->obs.oi.oloc << " on " << obc->obs.oi.soid << dendl; - osd->clog.warn() << "bad locator " << op->get_object_locator() + osd->clog.warn() << "bad locator " << m->get_object_locator() << " on object " << obc->obs.oi.oloc - << " loc " << op->get_object_locator() - << " op " << *op << "\n"; + << " loc " << m->get_object_locator() + << " op " << *m << "\n"; } - if ((op->may_read()) && (obc->obs.oi.lost)) { + if ((m->may_read()) && (obc->obs.oi.lost)) { // This object is lost. Reading from it returns an error. dout(20) << __func__ << ": object " << obc->obs.oi.soid << " is lost" << dendl; @@ -543,12 +548,12 @@ void ReplicatedPG::do_op(MOSDOp *op) bool ok; dout(10) << "do_op mode is " << mode << dendl; assert(!mode.wake); // we should never have woken waiters here. - if ((op->may_read() && op->may_write()) || - (op->get_flags() & CEPH_OSD_FLAG_RWORDERED)) + if ((m->may_read() && m->may_write()) || + (m->get_flags() & CEPH_OSD_FLAG_RWORDERED)) ok = mode.try_rmw(client); - else if (op->may_write()) + else if (m->may_write()) ok = mode.try_write(client); - else if (op->may_read()) + else if (m->may_read()) ok = mode.try_read(client); else assert(0); @@ -558,7 +563,7 @@ void ReplicatedPG::do_op(MOSDOp *op) return; } - if (!op->may_write() && !obc->obs.exists) { + if (!m->may_write() && !obc->obs.exists) { osd->reply_op_error(op, -ENOENT); put_object_context(obc); return; @@ -589,14 +594,14 @@ void ReplicatedPG::do_op(MOSDOp *op) // src_oids map src_obc; - for (vector::iterator p = op->ops.begin(); p != op->ops.end(); p++) { + for (vector::iterator p = m->ops.begin(); p != m->ops.end(); p++) { OSDOp& osd_op = *p; if (!ceph_osd_op_type_multi(osd_op.op.op)) continue; if (osd_op.soid.oid.name.length()) { object_locator_t src_oloc; - get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc); - hobject_t src_oid(osd_op.soid, src_oloc.key, op->get_pg().ps()); + get_src_oloc(m->get_oid(), m->get_object_locator(), src_oloc); + hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps()); if (!src_obc.count(src_oid)) { ObjectContext *sobc; snapid_t ssnapid; @@ -604,7 +609,7 @@ void ReplicatedPG::do_op(MOSDOp *op) int r = find_object_context(src_oid, src_oloc, &sobc, false, &ssnapid); if (r == -EAGAIN) { // missing the specific snap we need; requeue and wait. - hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, op->get_pg().ps()); + hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, m->get_pg().ps()); wait_for_missing_object(wait_oid, op); } else if (r) { osd->reply_op_error(op, r); @@ -612,7 +617,7 @@ void ReplicatedPG::do_op(MOSDOp *op) sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name && sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) { dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != " - << op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl; + << m->get_oid() << " oloc " << obc->obs.oi.oloc << dendl; osd->reply_op_error(op, -EINVAL); } else if (is_degraded_object(sobc->obs.oi.soid) || (before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) { @@ -644,23 +649,23 @@ void ReplicatedPG::do_op(MOSDOp *op) } const hobject_t& soid = obc->obs.oi.soid; - OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops, + OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, this); ctx->obc = obc; ctx->src_obc = src_obc; - if (op->may_write()) { + if (m->may_write()) { // snap if (pool->info.is_pool_snaps_mode()) { // use pool's snapc ctx->snapc = pool->snapc; } else { // client specified snapc - ctx->snapc.seq = op->get_snap_seq(); - ctx->snapc.snaps = op->get_snaps(); + ctx->snapc.seq = m->get_snap_seq(); + ctx->snapc.snaps = m->get_snaps(); } - if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) && + if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) && ctx->snapc.seq < obc->ssc->snapset.seq) { dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq << " < snapset seq " << obc->ssc->snapset.seq @@ -695,7 +700,7 @@ void ReplicatedPG::do_op(MOSDOp *op) assert(ctx->at_version > info.last_update); assert(ctx->at_version > log.head); - ctx->mtime = op->get_mtime(); + ctx->mtime = m->get_mtime(); dout(10) << "do_op " << soid << " " << ctx->ops << " ov " << obc->obs.oi.version << " av " << ctx->at_version @@ -717,7 +722,7 @@ void ReplicatedPG::do_op(MOSDOp *op) uint64_t old_size = obc->obs.oi.size; eversion_t old_version = obc->obs.oi.version; - if (op->may_read()) { + if (m->may_read()) { dout(10) << " taking ondisk_read_lock" << dendl; obc->ondisk_read_lock(); } @@ -728,7 +733,7 @@ void ReplicatedPG::do_op(MOSDOp *op) int result = prepare_transaction(ctx); - if (op->may_read()) { + if (m->may_read()) { dout(10) << " dropping ondisk_read_lock" << dendl; obc->ondisk_read_unlock(); } @@ -742,11 +747,12 @@ void ReplicatedPG::do_op(MOSDOp *op) delete ctx; put_object_context(obc); put_object_contexts(src_obc); + op->put(); return; } // prepare the reply - ctx->reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0); + ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); ctx->reply->claim_op_out_data(ctx->ops); ctx->reply->get_header().data_off = ctx->data_off; ctx->reply->set_result(result); @@ -762,7 +768,7 @@ void ReplicatedPG::do_op(MOSDOp *op) reply->set_version(info.last_update); ctx->reply = NULL; reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); - osd->client_messenger->send_message(reply, op->get_connection()); + osd->client_messenger->send_message(reply, m->get_connection()); op->put(); delete ctx; put_object_context(obc); @@ -770,7 +776,7 @@ void ReplicatedPG::do_op(MOSDOp *op) return; } - assert(op->may_write()); + assert(m->may_write()); // trim log? calc_trim_to(); @@ -796,16 +802,16 @@ void ReplicatedPG::do_op(MOSDOp *op) void ReplicatedPG::log_op_stats(OpContext *ctx) { - MOSDOp *op = (MOSDOp*)ctx->op; + MOSDOp *m = (MOSDOp*)ctx->op->request; utime_t now = ceph_clock_now(g_ceph_context); utime_t latency = now; - latency -= ctx->op->get_recv_stamp(); + latency -= ctx->op->request->get_recv_stamp(); utime_t rlatency; if (ctx->readable_stamp != utime_t()) { rlatency = ctx->readable_stamp; - rlatency -= ctx->op->get_recv_stamp(); + rlatency -= ctx->op->request->get_recv_stamp(); } uint64_t inb = ctx->bytes_written; @@ -817,17 +823,17 @@ void ReplicatedPG::log_op_stats(OpContext *ctx) osd->logger->inc(l_osd_op_inb, inb); osd->logger->finc(l_osd_op_lat, latency); - if (op->may_read() && op->may_write()) { + if (m->may_read() && m->may_write()) { osd->logger->inc(l_osd_op_rw); osd->logger->inc(l_osd_op_rw_inb, inb); osd->logger->inc(l_osd_op_rw_outb, outb); osd->logger->finc(l_osd_op_rw_rlat, rlatency); osd->logger->finc(l_osd_op_rw_lat, latency); - } else if (op->may_read()) { + } else if (m->may_read()) { osd->logger->inc(l_osd_op_r); osd->logger->inc(l_osd_op_r_outb, outb); osd->logger->finc(l_osd_op_r_lat, latency); - } else if (op->may_write()) { + } else if (m->may_write()) { osd->logger->inc(l_osd_op_w); osd->logger->inc(l_osd_op_w_inb, inb); osd->logger->finc(l_osd_op_w_rlat, rlatency); @@ -835,20 +841,20 @@ void ReplicatedPG::log_op_stats(OpContext *ctx) } else assert(0); - dout(15) << "log_op_stats " << *op + dout(15) << "log_op_stats " << *m << " inb " << inb << " outb " << outb << " rlat " << rlatency << " lat " << latency << dendl; } -void ReplicatedPG::log_subop_stats(MOSDSubOp *op, int tag_inb, int tag_lat) +void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat) { utime_t now = ceph_clock_now(g_ceph_context); utime_t latency = now; - latency -= op->get_recv_stamp(); + latency -= op->request->get_recv_stamp(); - uint64_t inb = op->get_data().length(); + uint64_t inb = op->request->get_data().length(); osd->logger->inc(l_osd_sop); @@ -859,17 +865,19 @@ void ReplicatedPG::log_subop_stats(MOSDSubOp *op, int tag_inb, int tag_lat) osd->logger->inc(tag_inb, inb); osd->logger->finc(tag_lat, latency); - dout(15) << "log_subop_stats " << *op << " inb " << inb << " latency " << latency << dendl; + dout(15) << "log_subop_stats " << *op->request << " inb " << inb << " latency " << latency << dendl; } -void ReplicatedPG::do_sub_op(MOSDSubOp *op) +void ReplicatedPG::do_sub_op(OpRequest *op) { - dout(15) << "do_sub_op " << *op << dendl; + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + dout(15) << "do_sub_op " << *op->request << dendl; - if (op->ops.size() >= 1) { - OSDOp& first = op->ops[0]; + if (m->ops.size() >= 1) { + OSDOp& first = m->ops[0]; switch (first.op.op) { case CEPH_OSD_OP_PULL: sub_op_pull(op); @@ -898,27 +906,31 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op) sub_op_modify(op); } -void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r) +void ReplicatedPG::do_sub_op_reply(OpRequest *op) { + MOSDSubOpReply *r = (MOSDSubOpReply *)op->request; + assert(r->get_header().type == MSG_OSD_SUBOPREPLY); if (r->ops.size() >= 1) { OSDOp& first = r->ops[0]; switch (first.op.op) { case CEPH_OSD_OP_PUSH: // continue peer recovery - sub_op_push_reply(r); + sub_op_push_reply(op); return; case CEPH_OSD_OP_SCRUB_RESERVE: - sub_op_scrub_reserve_reply(r); + sub_op_scrub_reserve_reply(op); return; } } - sub_op_modify_reply(r); + sub_op_modify_reply(op); } -void ReplicatedPG::do_scan(MOSDPGScan *m) +void ReplicatedPG::do_scan(OpRequest *op) { + MOSDPGScan *m = (MOSDPGScan*)op->request; + assert(m->get_header().type == MSG_OSD_PG_SCAN); dout(10) << "do_scan " << *m << dendl; switch (m->op) { @@ -956,11 +968,13 @@ void ReplicatedPG::do_scan(MOSDPGScan *m) break; } - m->put(); + op->put(); } -void ReplicatedPG::do_backfill(MOSDPGBackfill *m) +void ReplicatedPG::do_backfill(OpRequest *op) { + MOSDPGBackfill *m = (MOSDPGBackfill*)op->request; + assert(m->get_header().type == MSG_OSD_PG_BACKFILL); dout(10) << "do_backfill " << *m << dendl; switch (m->op) { @@ -1007,7 +1021,7 @@ void ReplicatedPG::do_backfill(MOSDPGBackfill *m) break; } - m->put(); + op->put(); } /* Returns head of snap_trimq as snap_to_trim and the relevant objects as @@ -1412,7 +1426,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) ObjectContext *src_obc = 0; if (ceph_osd_op_type_multi(op.op)) { object_locator_t src_oloc; - get_src_oloc(soid.oid, ((MOSDOp *)ctx->op)->get_object_locator(), src_oloc); + get_src_oloc(soid.oid, ((MOSDOp *)ctx->op->request)->get_object_locator(), src_oloc); hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash); src_obc = ctx->src_obc[src_oid]; dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl; @@ -1737,7 +1751,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) case CEPH_OSD_OP_NOTIFY_ACK: { osd->watch_lock.Lock(); - entity_name_t source = ctx->op->get_source(); + entity_name_t source = ctx->op->request->get_source(); map::iterator oi_iter = oi.watchers.find(source); Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie); if (oi_iter != oi.watchers.end() && notif) { @@ -2535,7 +2549,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) { if (ctx->watch_connect || ctx->watch_disconnect || !ctx->notifies.empty() || !ctx->notify_acks.empty()) { - OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv(); + OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv(); ObjectContext *obc = ctx->obc; object_info_t& oi = ctx->new_obs.oi; hobject_t& soid = oi.soid; @@ -2869,7 +2883,7 @@ void ReplicatedPG::op_applied(RepGather *repop) // discard my reference to the buffer if (repop->ctx->op) - repop->ctx->op->clear_data(); + repop->ctx->op->request->clear_data(); repop->applying = false; repop->applied = true; @@ -2946,11 +2960,11 @@ void ReplicatedPG::op_commit(RepGather *repop) void ReplicatedPG::eval_repop(RepGather *repop) { - MOSDOp *op = (MOSDOp *)repop->ctx->op; + MOSDOp *m = (MOSDOp *)repop->ctx->op->request; - if (op) + if (m) dout(10) << "eval_repop " << *repop - << " wants=" << (op->wants_ack() ? "a":"") << (op->wants_ondisk() ? "d":"") + << " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"") << dendl; else dout(10) << "eval_repop " << *repop << " (no op)" << dendl; @@ -2962,7 +2976,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) mode.is_rmw_mode())) apply_repop(repop); - if (op) { + if (m) { // an 'ondisk' reply implies 'ack'. so, prefer to send just one // ondisk instead of ack followed by ondisk. @@ -2972,34 +2986,34 @@ void ReplicatedPG::eval_repop(RepGather *repop) log_op_stats(repop->ctx); - if (op->wants_ondisk() && !repop->sent_disk) { + if (m->wants_ondisk() && !repop->sent_disk) { // send commit. MOSDOpReply *reply = repop->ctx->reply; if (reply) repop->ctx->reply = NULL; else - reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0); + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); dout(10) << " sending commit on " << *repop << " " << reply << dendl; - assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type); - osd->client_messenger->send_message(reply, op->get_connection()); + assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); + osd->client_messenger->send_message(reply, m->get_connection()); repop->sent_disk = true; } } // applied? if (repop->waitfor_ack.empty()) { - if (op->wants_ack() && !repop->sent_ack && !repop->sent_disk) { + if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) { // send ack MOSDOpReply *reply = repop->ctx->reply; if (reply) repop->ctx->reply = NULL; else - reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0); + reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->add_flags(CEPH_OSD_FLAG_ACK); dout(10) << " sending ack on " << *repop << " " << reply << dendl; - assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type); - osd->client_messenger->send_message(reply, op->get_connection()); + assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type); + osd->client_messenger->send_message(reply, m->get_connection()); repop->sent_ack = true; } @@ -3039,7 +3053,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, { OpContext *ctx = repop->ctx; const hobject_t& soid = ctx->obs->oi.soid; - MOSDOp *op = (MOSDOp *)ctx->op; + MOSDOp *m = (MOSDOp *)ctx->op->request; dout(7) << "issue_repop rep_tid " << repop->rep_tid << " o " << soid @@ -3066,7 +3080,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, get_osdmap()->get_epoch(), repop->rep_tid, repop->ctx->at_version); - if (op && op->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) { + if (m && m->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) { // replicate original op for parallel execution on replica assert(0 == "broken implementation, do not use"); wr->oloc = repop->ctx->obs->oi.oloc; @@ -3077,7 +3091,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, wr->old_version = old_version; wr->snapset = repop->obc->ssc->snapset; wr->snapc = repop->ctx->snapc; - wr->set_data(repop->ctx->op->get_data()); // _copy_ bufferlist + wr->set_data(repop->ctx->op->request->get_data()); // _copy_ bufferlist } else { // ship resulting transaction, log entries, and pg_stats if (peer == backfill_target && soid >= backfill_pos) { @@ -3110,7 +3124,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContext * tid_t rep_tid) { if (ctx->op) - dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl; + dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl; else dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl; @@ -3144,10 +3158,10 @@ void ReplicatedPG::remove_repop(RepGather *repop) void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, int fromosd, eversion_t peer_lcod) { - MOSDOp *op = (MOSDOp *)repop->ctx->op; + MOSDOp *m = (MOSDOp *)repop->ctx->op->request; - if (op) - dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op + if (m) + dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m << " result " << result << " ack_type " << ack_type << " from osd." << fromosd @@ -3583,28 +3597,31 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) // sub op modify -void ReplicatedPG::sub_op_modify(MOSDSubOp *op) +void ReplicatedPG::sub_op_modify(OpRequest *op) { - const hobject_t& soid = op->poid; + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + + const hobject_t& soid = m->poid; const char *opname; - if (op->noop) + if (m->noop) opname = "no-op"; - else if (op->ops.size()) - opname = ceph_osd_op_name(op->ops[0].op.op); + else if (m->ops.size()) + opname = ceph_osd_op_name(m->ops[0].op.op); else opname = "trans"; dout(10) << "sub_op_modify " << opname << " " << soid - << " v " << op->version - << (op->noop ? " NOOP" : "") - << (op->logbl.length() ? " (transaction)" : " (parallel exec") - << " " << op->logbl.length() + << " v " << m->version + << (m->noop ? " NOOP" : "") + << (m->logbl.length() ? " (transaction)" : " (parallel exec") + << " " << m->logbl.length() << dendl; // sanity checks - assert(op->map_epoch >= info.history.same_interval_since); + assert(m->map_epoch >= info.history.same_interval_since); assert(is_active()); assert(is_replica()); @@ -3621,19 +3638,19 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) rm->ackerosd = ackerosd; rm->last_complete = info.last_complete; - if (!op->noop) { - if (op->logbl.length()) { + if (!m->noop) { + if (m->logbl.length()) { // shipped transaction and log entries vector log; - bufferlist::iterator p = op->get_data().begin(); + bufferlist::iterator p = m->get_data().begin(); ::decode(rm->opt, p); - p = op->logbl.begin(); + p = m->logbl.begin(); ::decode(log, p); - info.stats = op->pg_stats; + info.stats = m->pg_stats; update_snap_collections(log, rm->localt); - append_log(log, op->pg_trim_to, rm->localt); + append_log(log, m->pg_trim_to, rm->localt); rm->tls.push_back(&rm->localt); rm->tls.push_back(&rm->opt); @@ -3645,24 +3662,24 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) // TODO: this is severely broken because we don't know whether this object is really lost or // not. We just always assume that it's not right now. // Also, we're taking the address of a variable on the stack. - object_info_t oi(soid, op->oloc); + object_info_t oi(soid, m->oloc); oi.lost = false; // I guess? - oi.version = op->old_version; - oi.size = op->old_size; - ObjectState obs(oi, op->old_exists); - SnapSetContext ssc(op->poid.oid); + oi.version = m->old_version; + oi.size = m->old_size; + ObjectState obs(oi, m->old_exists); + SnapSetContext ssc(m->poid.oid); - rm->ctx = new OpContext(op, op->reqid, op->ops, &obs, &ssc, this); + rm->ctx = new OpContext(op, m->reqid, m->ops, &obs, &ssc, this); - rm->ctx->mtime = op->mtime; - rm->ctx->at_version = op->version; - rm->ctx->snapc = op->snapc; + rm->ctx->mtime = m->mtime; + rm->ctx->at_version = m->version; + rm->ctx->snapc = m->snapc; - ssc.snapset = op->snapset; + ssc.snapset = m->snapset; rm->ctx->obc->ssc = &ssc; prepare_transaction(rm->ctx); - append_log(rm->ctx->log, op->pg_trim_to, rm->ctx->local_t); + append_log(rm->ctx->log, m->pg_trim_to, rm->ctx->local_t); rm->tls.push_back(&rm->ctx->op_t); rm->tls.push_back(&rm->ctx->local_t); @@ -3672,8 +3689,8 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) } else { // just trim the log - if (op->pg_trim_to != eversion_t()) { - trim(rm->localt, op->pg_trim_to); + if (m->pg_trim_to != eversion_t()) { + trim(rm->localt, m->pg_trim_to); rm->tls.push_back(&rm->localt); } } @@ -3691,11 +3708,13 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op) void ReplicatedPG::sub_op_modify_applied(RepModify *rm) { lock(); - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op << dendl; + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl; + MOSDSubOp *m = (MOSDSubOp*)rm->op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); if (!rm->committed) { // send ack to acker only if we haven't sent a commit already - MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd)); } @@ -3703,9 +3722,9 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) rm->applied = true; bool done = rm->applied && rm->committed; - assert(info.last_update >= rm->op->version); - assert(last_update_applied < rm->op->version); - last_update_applied = rm->op->version; + assert(info.last_update >= m->version); + assert(last_update_applied < m->version); + last_update_applied = m->version; if (finalizing_scrub) { assert(active_rep_scrub); assert(info.last_update <= active_rep_scrub->scrub_to); @@ -3729,7 +3748,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) lock(); // send commit. - dout(10) << "sub_op_modify_commit on op " << *rm->op + dout(10) << "sub_op_modify_commit on op " << *rm->op->request << ", sending commit to osd." << rm->ackerosd << dendl; @@ -3737,7 +3756,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) if (get_osdmap()->is_up(rm->ackerosd)) { last_complete_ondisk = rm->last_complete; - MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); commit->set_last_complete_ondisk(rm->last_complete); commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd)); @@ -3755,8 +3774,10 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) } } -void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r) +void ReplicatedPG::sub_op_modify_reply(OpRequest *op) { + MOSDSubOpReply *r = (MOSDSubOpReply*)op->request; + assert(r->get_header().type == MSG_OSD_SUBOPREPLY); // must be replication. tid_t rep_tid = r->get_tid(); int fromosd = r->get_source().num(); @@ -3769,7 +3790,7 @@ void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r) r->get_last_complete_ondisk()); } - r->put(); + op->put(); } @@ -4231,8 +4252,10 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer) osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer)); } -void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply) +void ReplicatedPG::sub_op_push_reply(OpRequest *op) { + MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request; + assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; int peer = reply->get_source().num(); @@ -4288,7 +4311,7 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply) } } } - reply->put(); + op->put(); } void ReplicatedPG::finish_degraded_object(const hobject_t& oid) @@ -4313,12 +4336,15 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid) * process request to pull an entire object. * NOTE: called from opqueue. */ -void ReplicatedPG::sub_op_pull(MOSDSubOp *op) +void ReplicatedPG::sub_op_pull(OpRequest *op) { - const hobject_t soid = op->poid; + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); - dout(7) << "op_pull " << soid << " v " << op->version - << " from " << op->get_source() + const hobject_t soid = m->poid; + + dout(7) << "op_pull " << soid << " v " << m->version + << " from " << m->get_source() << dendl; assert(!is_primary()); // we should be a replica or stray. @@ -4326,24 +4352,24 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op) struct stat st; int r = osd->store->stat(coll, soid, &st); if (r != 0) { - osd->clog.error() << info.pgid << " " << op->get_source() << " tried to pull " << soid + osd->clog.error() << info.pgid << " " << m->get_source() << " tried to pull " << soid << " but got " << cpp_strerror(-r) << "\n"; - send_push_op_blank(soid, op->get_source().num()); + send_push_op_blank(soid, m->get_source().num()); } else { uint64_t size = st.st_size; bool complete = false; - if (!op->data_subset.empty() && op->data_subset.range_end() >= size) + if (!m->data_subset.empty() && m->data_subset.range_end() >= size) complete = true; // complete==true implies we are definitely complete. // complete==false means nothing. we don't know because the primary may // not be pulling the entire object. - r = send_push_op(soid, op->version, op->get_source().num(), size, op->first, complete, - op->data_subset, op->clone_subsets); + r = send_push_op(soid, m->version, m->get_source().num(), size, m->first, complete, + m->data_subset, m->clone_subsets); if (r < 0) - send_push_op_blank(soid, op->get_source().num()); + send_push_op_blank(soid, m->get_source().num()); } log_subop_stats(op, 0, l_osd_sop_pull_lat); @@ -4352,7 +4378,7 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op) } -void ReplicatedPG::_committed_pushed_object(MOSDSubOp *op, epoch_t same_since, eversion_t last_complete) +void ReplicatedPG::_committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t last_complete) { lock(); if (same_since == info.history.same_interval_since) { @@ -4430,20 +4456,23 @@ void ReplicatedPG::recover_got(hobject_t oid, eversion_t v) /** op_push * NOTE: called from opqueue. */ -void ReplicatedPG::sub_op_push(MOSDSubOp *op) +void ReplicatedPG::sub_op_push(OpRequest *op) { - const hobject_t& soid = op->poid; - eversion_t v = op->version; - OSDOp& push = op->ops[0]; + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + + const hobject_t& soid = m->poid; + eversion_t v = m->version; + OSDOp& push = m->ops[0]; dout(7) << "op_push " << soid << " v " << v - << " " << op->oloc + << " " << m->oloc << " len " << push.op.extent.length - << " data_subset " << op->data_subset - << " clone_subsets " << op->clone_subsets - << " data len " << op->get_data().length() + << " data_subset " << m->data_subset + << " clone_subsets " << m->clone_subsets + << " data len " << m->get_data().length() << dendl; if (v == eversion_t()) { @@ -4456,25 +4485,25 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) map > clone_subsets; bufferlist data; - op->claim_data(data); + m->claim_data(data); // we need these later, and they get clobbered by t.setattrs() bufferlist oibl; - if (op->attrset.count(OI_ATTR)) - oibl.push_back(op->attrset[OI_ATTR]); + if (m->attrset.count(OI_ATTR)) + oibl.push_back(m->attrset[OI_ATTR]); bufferlist ssbl; - if (op->attrset.count(SS_ATTR)) - ssbl.push_back(op->attrset[SS_ATTR]); + if (m->attrset.count(SS_ATTR)) + ssbl.push_back(m->attrset[SS_ATTR]); // determine data/clone subsets - data_subset = op->data_subset; + data_subset = m->data_subset; if (data_subset.empty() && push.op.extent.length && push.op.extent.length == data.length()) data_subset.insert(0, push.op.extent.length); - clone_subsets = op->clone_subsets; + clone_subsets = m->clone_subsets; pull_info_t *pi = 0; - bool first = op->first; - bool complete = op->complete; + bool first = m->first; + bool complete = m->complete; // op->complete == true means we reached the end of the object (file size) // op->complete == false means nothing; we may not have asked for the whole thing. @@ -4489,8 +4518,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) // did we learn object size? if (pi->need_size) { - dout(10) << " learned object size is " << op->old_size << dendl; - pi->data_subset.erase(op->old_size, (uint64_t)-1 - op->old_size); + dout(10) << " learned object size is " << m->old_size << dendl; + pi->data_subset.erase(m->old_size, (uint64_t)-1 - m->old_size); pi->need_size = false; } @@ -4555,7 +4584,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) complete = pi->data_subset.range_end() == data_subset.range_end(); } - if (op->complete && !complete) { + if (m->complete && !complete) { dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl; _failed_push(op); return; @@ -4563,7 +4592,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) } else { // head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning) - assert(op->clone_subsets.empty()); + assert(m->clone_subsets.empty()); } } dout(15) << " data_subset " << data_subset @@ -4630,11 +4659,11 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) if (data_subset.empty()) t->touch(coll, soid); - t->setattrs(coll, soid, op->attrset); + t->setattrs(coll, soid, m->attrset); if (soid.snap && soid.snap < CEPH_NOSNAP && - op->attrset.count(OI_ATTR)) { + m->attrset.count(OI_ATTR)) { bufferlist bl; - bl.push_back(op->attrset[OI_ATTR]); + bl.push_back(m->attrset[OI_ATTR]); object_info_t oi(bl); if (oi.snaps.size()) { coll_t lc = make_snap_collection(*t, oi.snaps[0]); @@ -4665,7 +4694,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) // track ObjectContext if (is_primary()) { dout(10) << " setting up obc for " << soid << dendl; - ObjectContext *obc = get_object_context(soid, op->oloc, true); + ObjectContext *obc = get_object_context(soid, m->oloc, true); assert(obc->registered); obc->ondisk_write_lock(); @@ -4742,9 +4771,9 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) } else { // ack if i'm a replica and being pushed to. - MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); - assert(entity_name_t::TYPE_OSD == op->get_connection()->peer_type); - osd->cluster_messenger->send_message(reply, op->get_connection()); + MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); + osd->cluster_messenger->send_message(reply, m->get_connection()); } if (complete) { @@ -4770,10 +4799,12 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) op->put(); // at the end... soid is a ref to op->soid! } -void ReplicatedPG::_failed_push(MOSDSubOp *op) +void ReplicatedPG::_failed_push(OpRequest *op) { - const hobject_t& soid = op->poid; - int from = op->get_source().num(); + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + const hobject_t& soid = m->poid; + int from = m->get_source().num(); map >::iterator p = missing_loc.find(soid); if (p != missing_loc.end()) { dout(0) << "_failed_push " << soid << " from osd." << from @@ -4794,12 +4825,14 @@ void ReplicatedPG::_failed_push(MOSDSubOp *op) op->put(); } -void ReplicatedPG::sub_op_remove(MOSDSubOp *op) +void ReplicatedPG::sub_op_remove(OpRequest *op) { - dout(7) << "sub_op_remove " << op->poid << dendl; + MOSDSubOp *m = (MOSDSubOp*)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + dout(7) << "sub_op_remove " << m->poid << dendl; ObjectStore::Transaction *t = new ObjectStore::Transaction; - remove_object_with_snap_hardlinks(*t, op->poid); + remove_object_with_snap_hardlinks(*t, m->poid); int r = osd->store->queue_transaction(&osr, t); assert(r == 0); @@ -4840,7 +4873,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac { // Wake anyone waiting for this object. Now that it's been marked as lost, // we will just return an error code. - map >::iterator wmo = + map >::iterator wmo = waiting_for_missing_object.find(oid); if (wmo != waiting_for_missing_object.end()) { osd->requeue_ops(this, wmo->second); @@ -5001,7 +5034,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list& obcs) void ReplicatedPG::apply_and_flush_repops(bool requeue) { - list rq; + list rq; // apply all repops while (!repop_queue.empty()) { @@ -5013,7 +5046,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) repop->aborted = true; if (requeue && repop->ctx->op) { - dout(10) << " requeuing " << *repop->ctx->op << dendl; + dout(10) << " requeuing " << *repop->ctx->op->request << dendl; rq.push_back(repop->ctx->op); repop->ctx->op = 0; } @@ -5071,7 +5104,7 @@ void ReplicatedPG::on_change() // take object waiters requeue_object_waiters(waiting_for_missing_object); - for (map >::iterator p = waiting_for_degraded_object.begin(); + for (map >::iterator p = waiting_for_degraded_object.begin(); p != waiting_for_degraded_object.end(); waiting_for_degraded_object.erase(p++)) { osd->requeue_ops(this, p->second); @@ -5094,7 +5127,7 @@ void ReplicatedPG::on_role_change() dout(10) << "on_role_change" << dendl; // take commit waiters - for (map >::iterator p = waiting_for_ondisk.begin(); + for (map >::iterator p = waiting_for_ondisk.begin(); p != waiting_for_ondisk.end(); p++) osd->requeue_ops(this, p->second); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 6b43aa23d676..c8a63db7b09b 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -18,6 +18,7 @@ #include "PG.h" #include "OSD.h" #include "Watch.h" +#include "OpRequest.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -132,7 +133,7 @@ public: } state_t state; int num_wr; - list waiting; + list waiting; list waiting_cond; bool wake; @@ -330,7 +331,7 @@ public: * Capture all object state associated with an in-progress read or write. */ struct OpContext { - Message *op; + OpRequest *op; osd_reqid_t reqid; vector& ops; @@ -377,7 +378,7 @@ public: OpContext(const OpContext& other); const OpContext& operator=(const OpContext& other); - OpContext(Message *_op, osd_reqid_t _reqid, vector& _ops, + OpContext(OpRequest *_op, osd_reqid_t _reqid, vector& _ops, ObjectState *_obs, SnapSetContext *_ssc, ReplicatedPG *_pg) : op(_op), reqid(_reqid), ops(_ops), obs(_obs), @@ -664,7 +665,7 @@ protected: struct RepModify { ReplicatedPG *pg; - MOSDSubOp *op; + OpRequest *op; OpContext *ctx; bool applied, committed; int ackerosd; @@ -722,10 +723,10 @@ protected: }; struct C_OSD_CommittedPushedObject : public Context { ReplicatedPG *pg; - MOSDSubOp *op; + OpRequest *op; epoch_t same_since; eversion_t last_complete; - C_OSD_CommittedPushedObject(ReplicatedPG *p, MOSDSubOp *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) { + C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequest *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) { if (op) op->get(); pg->get(); @@ -737,22 +738,22 @@ protected: } }; - void sub_op_remove(MOSDSubOp *op); + void sub_op_remove(OpRequest *op); - void sub_op_modify(MOSDSubOp *op); + void sub_op_modify(OpRequest *op); void sub_op_modify_applied(RepModify *rm); void sub_op_modify_commit(RepModify *rm); - void sub_op_modify_reply(MOSDSubOpReply *reply); + void sub_op_modify_reply(OpRequest *op); void _applied_pushed_object(ObjectStore::Transaction *t, ObjectContext *obc); - void _committed_pushed_object(MOSDSubOp *op, epoch_t same_since, eversion_t lc); + void _committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t lc); void recover_got(hobject_t oid, eversion_t v); - void sub_op_push(MOSDSubOp *op); - void _failed_push(MOSDSubOp *op); - void sub_op_push_reply(MOSDSubOpReply *reply); - void sub_op_pull(MOSDSubOp *op); + void sub_op_push(OpRequest *op); + void _failed_push(OpRequest *op); + void sub_op_push_reply(OpRequest *op); + void sub_op_pull(OpRequest *op); - void log_subop_stats(MOSDSubOp *ctx, int tag_inb, int tag_lat); + void log_subop_stats(OpRequest *op, int tag_inb, int tag_lat); // -- scrub -- @@ -772,13 +773,13 @@ public: ~ReplicatedPG() {} - void do_op(MOSDOp *op); + void do_op(OpRequest *op); bool pg_op_must_wait(MOSDOp *op); - void do_pg_op(MOSDOp *op); - void do_sub_op(MOSDSubOp *op); - void do_sub_op_reply(MOSDSubOpReply *op); - void do_scan(MOSDPGScan *op); - void do_backfill(MOSDPGBackfill *op); + void do_pg_op(OpRequest *op); + void do_sub_op(OpRequest *op); + void do_sub_op_reply(OpRequest *op); + void do_scan(OpRequest *op); + void do_backfill(OpRequest *op); bool get_obs_to_trim(snapid_t &snap_to_trim, coll_t &col_to_trim, vector &obs_to_trim); @@ -858,11 +859,11 @@ public: bool same_for_rep_modify_since(epoch_t e); bool is_missing_object(const hobject_t& oid); - void wait_for_missing_object(const hobject_t& oid, Message *op); - void wait_for_all_missing(Message *op); + void wait_for_missing_object(const hobject_t& oid, OpRequest *op); + void wait_for_all_missing(OpRequest *op); bool is_degraded_object(const hobject_t& oid); - void wait_for_degraded_object(const hobject_t& oid, Message *op); + void wait_for_degraded_object(const hobject_t& oid, OpRequest *op); void mark_all_unfound_lost(int what); eversion_t pick_newest_available(const hobject_t& oid); @@ -902,7 +903,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) //<< " wfnvram=" << repop.waitfor_nvram << " wfdisk=" << repop.waitfor_disk; if (repop.ctx->op) - out << " op=" << *(repop.ctx->op); + out << " op=" << *(repop.ctx->op->request); out << ")"; return out; }