From: Samuel Just Date: Sun, 16 Feb 2014 01:43:19 +0000 (-0800) Subject: ReplicatedBackend: excise OSDService* X-Git-Tag: v0.78~163^2~34 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fa980644bd577cefc104e2d52289fe0310eaf147;p=ceph.git ReplicatedBackend: excise OSDService* This should eventually make it easier to mock out a PGBackend::Listener. Signed-off-by: Samuel Just --- diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 3d6ad7da3356..e13369c2ee07 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -148,6 +148,26 @@ virtual void update_stats( const pg_stat_t &stat) = 0; + virtual void schedule_work( + GenContext *c) = 0; + + virtual int whoami() const = 0; + + virtual void send_message_osd_cluster( + int peer, Message *m, epoch_t from_epoch) = 0; + virtual void send_message_osd_cluster( + Message *m, Connection *con) = 0; + virtual void send_message_osd_cluster( + Message *m, const ConnectionRef& con) = 0; + virtual ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) = 0; + virtual entity_name_t get_cluster_msgr_name() = 0; + + virtual PerfCounters *get_logger() = 0; + + virtual tid_t get_tid() = 0; + + virtual LogClientTemp clog_error() = 0; + virtual ~Listener() {} }; Listener *parent; @@ -499,4 +519,28 @@ ostream &errorstream) { assert(0); } }; +struct PG_SendMessageOnConn: public Context { + PGBackend::Listener *pg; + Message *reply; + ConnectionRef conn; + PG_SendMessageOnConn( + PGBackend::Listener *pg, + Message *reply, + ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {} + void finish(int) { + pg->send_message_osd_cluster(reply, conn.get()); + } +}; + +struct PG_QueueAsync : public Context { + PGBackend::Listener *pg; + GenContext *c; + PG_QueueAsync( + PGBackend::Listener *pg, + GenContext *c) : pg(pg), c(c) {} + void finish(int) { + pg->schedule_work(c); + } +}; + #endif diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index f624901fe1d1..bf0c8542c08e 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -29,10 +29,11 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) { } ReplicatedBackend::ReplicatedBackend( - PGBackend::Listener *pg, coll_t coll, OSDService *osd) : - PGBackend(pg), temp_created(false), - temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)), - coll(coll), osd(osd), cct(osd->cct) {} + PGBackend::Listener *pg, coll_t coll, ObjectStore *store, + CephContext *cct) : + PGBackend(pg, store, + coll, coll_t::make_temp_coll(pg->get_info().pgid)), + cct(cct) {} void ReplicatedBackend::run_recovery_op( PGBackend::RecoveryHandle *_h, @@ -204,9 +205,9 @@ void ReplicatedBackend::_on_change(ObjectStore::Transaction *t) void ReplicatedBackend::on_flushed() { if (have_temp_coll() && - !osd->store->collection_empty(get_temp_coll())) { + !store->collection_empty(get_temp_coll())) { vector objects; - osd->store->collection_list(get_temp_coll(), objects); + store->collection_list(get_temp_coll(), objects); derr << __func__ << ": found objects in the temp collection: " << objects << ", crashing now" << dendl; @@ -220,7 +221,7 @@ int ReplicatedBackend::objects_read_sync( uint64_t len, bufferlist *bl) { - return osd->store->read(coll, hoid, off, len, *bl); + return store->read(coll, hoid, off, len, *bl); } struct AsyncReadCallback : public GenContext { @@ -247,17 +248,17 @@ void ReplicatedBackend::objects_read_async( to_read.begin(); i != to_read.end() && r >= 0; ++i) { - int _r = osd->store->read(coll, hoid, i->first.first, - i->first.second, *(i->second.first)); + int _r = store->read(coll, hoid, i->first.first, + i->first.second, *(i->second.first)); if (i->second.second) { - osd->gen_wq.queue( + get_parent()->schedule_work( get_parent()->bless_gencontext( new AsyncReadCallback(_r, i->second.second))); } if (_r < 0) r = _r; } - osd->gen_wq.queue( + get_parent()->schedule_work( get_parent()->bless_gencontext( new AsyncReadCallback(r, on_complete))); } @@ -552,7 +553,7 @@ void ReplicatedBackend::op_applied( if (op->op) op->op->mark_event("op_applied"); - op->waiting_for_applied.erase(osd->whoami); + op->waiting_for_applied.erase(get_parent()->whoami()); parent->op_applied(op->v); if (op->waiting_for_applied.empty()) { @@ -572,7 +573,7 @@ void ReplicatedBackend::op_commit( if (op->op) op->op->mark_event("op_commit"); - op->waiting_for_commit.erase(osd->whoami); + op->waiting_for_commit.erase(get_parent()->whoami()); if (op->waiting_for_commit.empty()) { op->on_commit->complete(0); diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index 7ec0b88d788c..05dd9c761cef 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -28,10 +28,11 @@ class ReplicatedBackend : public PGBackend { }; friend struct C_ReplicatedBackend_OnPullComplete; public: - OSDService *osd; CephContext *cct; - ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd); + ReplicatedBackend( + PGBackend::Listener *pg, coll_t coll, ObjectStore *store, + CephContext *cct); /// @see PGBackend::open_recovery_op RPGHandle *_open_recovery_op() { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 5ea6fdcc4333..879a19283734 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -82,7 +82,7 @@ PGLSFilter::~PGLSFilter() } static void log_subop_stats( - OSDService *osd, + PerfCounters *logger, OpRequestRef op, int tag_inb, int tag_lat) { utime_t now = ceph_clock_now(g_ceph_context); @@ -91,14 +91,14 @@ static void log_subop_stats( uint64_t inb = op->get_req()->get_data().length(); - osd->logger->inc(l_osd_sop); + logger->inc(l_osd_sop); - osd->logger->inc(l_osd_sop_inb, inb); - osd->logger->tinc(l_osd_sop_lat, latency); + logger->inc(l_osd_sop_inb, inb); + logger->tinc(l_osd_sop_lat, latency); if (tag_inb) - osd->logger->inc(tag_inb, inb); - osd->logger->tinc(tag_lat, latency); + logger->inc(tag_inb, inb); + logger->tinc(tag_lat, latency); } struct OnReadComplete : public Context { @@ -323,6 +323,41 @@ void ReplicatedPG::begin_peer_recover( peer_missing[peer].revise_have(soid, eversion_t()); } +void ReplicatedPG::schedule_work( + GenContext *c) +{ + osd->gen_wq.queue(c); +} + +void ReplicatedPG::send_message_osd_cluster( + int peer, Message *m, epoch_t from_epoch) +{ + osd->send_message_osd_cluster(peer, m, from_epoch); +} + +void ReplicatedPG::send_message_osd_cluster( + Message *m, Connection *con) +{ + osd->send_message_osd_cluster(m, con); +} + +void ReplicatedPG::send_message_osd_cluster( + Message *m, const ConnectionRef& con) +{ + osd->send_message_osd_cluster(m, con); +} + +ConnectionRef ReplicatedPG::get_con_osd_cluster( + int peer, epoch_t from_epoch) +{ + return osd->get_con_osd_cluster(peer, from_epoch); +} + +PerfCounters *ReplicatedPG::get_logger() +{ + return osd->logger; +} + // ======================= // pg changes @@ -965,7 +1000,7 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap, const PGPool &_pool, pg_t p, const hobject_t& oid, const hobject_t& ioid) : PG(o, curmap, _pool, p, oid, ioid), - pgbackend(new ReplicatedBackend(this, coll_t(p), o)), + pgbackend(new ReplicatedBackend(this, coll_t(p), o->store, cct)), snapset_contexts_lock("ReplicatedPG::snapset_contexts"), temp_seq(0), snap_trimmer_machine(this) @@ -1953,8 +1988,8 @@ void ReplicatedBackend::_do_push(OpRequestRef op) reply->compute_cost(cct); t->register_on_complete( - new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); t->register_on_applied( new ObjectStore::C_DeleteTransaction(t)); @@ -2011,8 +2046,8 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op) m->get_priority()); c->to_continue.swap(to_continue); t->register_on_complete( - new C_QueueInWQ( - &osd->push_wq, + new PG_QueueAsync( + get_parent(), get_parent()->bless_gencontext(c))); } replies.erase(replies.end() - 1); @@ -2026,8 +2061,8 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op) reply->compute_cost(cct); t->register_on_complete( - new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); } t->register_on_applied( @@ -6349,8 +6384,8 @@ void ReplicatedBackend::issue_op( wr->new_temp_oid = new_temp_oid; wr->discard_temp_oid = discard_temp_oid; - osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch()); - + get_parent()->send_message_osd_cluster( + peer, wr, get_osdmap()->get_epoch()); } } @@ -7112,7 +7147,8 @@ void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm) // send ack to acker only if we haven't sent a commit already MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! - osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch()); + get_parent()->send_message_osd_cluster( + rm->ackerosd, ack, get_osdmap()->get_epoch()); } parent->op_applied(m->version); @@ -7133,9 +7169,11 @@ void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); commit->set_last_complete_ondisk(rm->last_complete); commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! - osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch()); + get_parent()->send_message_osd_cluster( + rm->ackerosd, commit, get_osdmap()->get_epoch()); - log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat); + log_subop_stats(get_parent()->get_logger(), rm->op, + l_osd_sop_w_inb, l_osd_sop_w_lat); } @@ -7584,8 +7622,8 @@ int ReplicatedBackend::send_pull_legacy(int prio, int peer, ObjectRecoveryProgress progress) { // send op - tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); + tid_t tid = get_parent()->get_tid(); + osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); dout(10) << "send_pull_op " << recovery_info.soid << " " << recovery_info.version @@ -7605,9 +7643,10 @@ int ReplicatedBackend::send_pull_legacy(int prio, int peer, subop->recovery_info = recovery_info; subop->recovery_progress = progress; - osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); + get_parent()->send_message_osd_cluster( + peer, subop, get_osdmap()->get_epoch()); - osd->logger->inc(l_osd_pull); + get_parent()->get_logger()->inc(l_osd_pull); return 0; } @@ -7628,7 +7667,7 @@ void ReplicatedBackend::submit_push_data( } else { dout(10) << __func__ << ": Creating oid " << recovery_info.soid << " in the temp collection" << dendl; - temp_contents.insert(recovery_info.soid); + add_temp_obj(recovery_info.soid); target_coll = get_temp_coll(t); } @@ -7656,10 +7695,9 @@ void ReplicatedBackend::submit_push_data( if (complete) { if (!first) { - assert(temp_contents.count(recovery_info.soid)); dout(10) << __func__ << ": Removing oid " << recovery_info.soid << " from the temp collection" << dendl; - temp_contents.erase(recovery_info.soid); + clear_temp_obj(recovery_info.soid); t->collection_move(coll, target_coll, recovery_info.soid); } @@ -7799,7 +7837,7 @@ struct C_OnPushCommit : public Context { C_OnPushCommit(ReplicatedPG *pg, OpRequestRef op) : pg(pg), op(op) {} void finish(int) { op->mark_event("committed"); - log_subop_stats(pg->osd, op, l_osd_push_inb, l_osd_sop_push_lat); + log_subop_stats(pg->osd->logger, op, l_osd_push_inb, l_osd_sop_push_lat); } }; @@ -7842,7 +7880,7 @@ void ReplicatedBackend::send_pushes(int prio, map > &pushes) for (map >::iterator i = pushes.begin(); i != pushes.end(); ++i) { - ConnectionRef con = osd->get_con_osd_cluster( + ConnectionRef con = get_parent()->get_con_osd_cluster( i->first, get_osdmap()->get_epoch()); if (!con) @@ -7876,7 +7914,7 @@ void ReplicatedBackend::send_pushes(int prio, map > &pushes) msg->pushes.push_back(*j); } msg->compute_cost(cct); - osd->send_message_osd_cluster(msg, con); + get_parent()->send_message_osd_cluster(msg, con); } } } @@ -7887,7 +7925,7 @@ void ReplicatedBackend::send_pulls(int prio, map > &pulls) for (map >::iterator i = pulls.begin(); i != pulls.end(); ++i) { - ConnectionRef con = osd->get_con_osd_cluster( + ConnectionRef con = get_parent()->get_con_osd_cluster( i->first, get_osdmap()->get_epoch()); if (!con) @@ -7913,7 +7951,7 @@ void ReplicatedBackend::send_pulls(int prio, map > &pulls) msg->map_epoch = get_osdmap()->get_epoch(); msg->pulls.swap(i->second); msg->compute_cost(cct); - osd->send_message_osd_cluster(msg, con); + get_parent()->send_message_osd_cluster(msg, con); } } } @@ -7937,8 +7975,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, << dendl; if (progress.first) { - osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header); - osd->store->getattrs(coll, recovery_info.soid, out_op->attrset); + store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header); + store->getattrs(coll, recovery_info.soid, out_op->attrset); // Debug bufferlist bv; @@ -7946,11 +7984,11 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, object_info_t oi(bv); if (oi.version != recovery_info.version) { - osd->clog.error() << get_info().pgid << " push " - << recovery_info.soid << " v " - << recovery_info.version - << " failed because local copy is " - << oi.version << "\n"; + get_parent()->clog_error() << get_info().pgid << " push " + << recovery_info.soid << " v " + << recovery_info.version + << " failed because local copy is " + << oi.version << "\n"; return -EINVAL; } @@ -7960,8 +7998,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, uint64_t available = cct->_conf->osd_recovery_max_chunk; if (!progress.omap_complete) { ObjectMap::ObjectMapIterator iter = - osd->store->get_omap_iterator(coll, - recovery_info.soid); + store->get_omap_iterator(coll, + recovery_info.soid); for (iter->lower_bound(progress.omap_recovered_to); iter->valid(); iter->next()) { @@ -7993,7 +8031,7 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, p != out_op->data_included.end(); ++p) { bufferlist bit; - osd->store->read(coll, recovery_info.soid, + store->read(coll, recovery_info.soid, p.get_start(), p.get_len(), bit); if (p.get_len() != bit.length()) { dout(10) << " extent " << p.get_start() << "~" << p.get_len() @@ -8019,8 +8057,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, stat->num_bytes_recovered += out_op->data.length(); } - osd->logger->inc(l_osd_push); - osd->logger->inc(l_osd_push_outb, out_op->data.length()); + get_parent()->get_logger()->inc(l_osd_push); + get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length()); // send out_op->version = recovery_info.version; @@ -8033,8 +8071,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info, int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop) { - tid_t tid = osd->get_tid(); - osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid); + tid_t tid = get_parent()->get_tid(); + osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid); MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, pop.soid, false, 0, get_osdmap()->get_epoch(), tid, pop.recovery_info.version); @@ -8052,7 +8090,7 @@ int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop) subop->current_progress = pop.before_progress; subop->recovery_progress = pop.after_progress; - osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); + get_parent()->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch()); return 0; } @@ -8185,18 +8223,18 @@ void ReplicatedBackend::sub_op_pull(OpRequestRef op) m->get_source().num(), reply); - log_subop_stats(osd, op, 0, l_osd_sop_pull_lat); + log_subop_stats(get_parent()->get_logger(), op, 0, l_osd_sop_pull_lat); } void ReplicatedBackend::handle_pull(int peer, PullOp &op, PushOp *reply) { const hobject_t &soid = op.soid; struct stat st; - int r = osd->store->stat(coll, soid, &st); + int r = store->stat(coll, soid, &st); if (r != 0) { - osd->clog.error() << get_info().pgid << " " - << peer << " tried to pull " << soid - << " but got " << cpp_strerror(-r) << "\n"; + get_parent()->clog_error() << get_info().pgid << " " + << peer << " tried to pull " << soid + << " but got " << cpp_strerror(-r) << "\n"; prep_push_op_blank(soid, reply); } else { ObjectRecoveryInfo &recovery_info = op.recovery_info; @@ -8389,8 +8427,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op) op->get_req()->get_priority()); c->to_continue.swap(to_continue); t->register_on_complete( - new C_QueueInWQ( - &osd->push_wq, + new PG_QueueAsync( + get_parent(), get_parent()->bless_gencontext(c))); } run_recovery_op(h, op->get_req()->get_priority()); @@ -8401,8 +8439,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op) reply->set_priority(m->get_priority()); assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type); handle_push(m->get_source().num(), pop, &resp, t); - t->register_on_complete(new C_OSD_SendMessageOnConn( - osd, reply, m->get_connection())); + t->register_on_complete(new PG_SendMessageOnConn( + get_parent(), reply, m->get_connection())); } t->register_on_applied( new ObjectStore::C_DeleteTransaction(t)); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 7f53f8d54310..2a0e1080b96b 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -363,6 +363,30 @@ public: info.stats = stat; } + void schedule_work( + GenContext *c); + + int whoami() const { + return osd->whoami; + } + + void send_message_osd_cluster( + int peer, Message *m, epoch_t from_epoch); + void send_message_osd_cluster( + Message *m, Connection *con); + void send_message_osd_cluster( + Message *m, const ConnectionRef& con); + ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch); + entity_name_t get_cluster_msgr_name() { + return osd->get_cluster_msgr_name(); + } + + PerfCounters *get_logger(); + + tid_t get_tid() { return osd->get_tid(); } + + LogClientTemp clog_error() { return osd->clog.error(); } + /* * Capture all object state associated with an in-progress read or write. */