From: Sage Weil Date: Thu, 10 Apr 2014 01:02:27 +0000 (-0700) Subject: osdc/Objecter: move mapping into struct, helper X-Git-Tag: v0.80-rc1~45^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=860d72770cdf092c027d50f4ee03bed76c975599;p=ceph.git osdc/Objecter: move mapping into struct, helper Move the common bits of Op and LingerOp into op_target_t and separate the actual mapping calculation into calc_target(). This hugely simplifies recal_*op_target() by mostly just shuffling all of the same logic into that helper. There is one functional change in this patch: recalc_linger_op() now is aware of the tiering logic that was previously only handled in recalc_op_target(). Signed-off-by: Sage Weil --- diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index f876262bdff0..145628f65e57 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -305,7 +305,8 @@ void Objecter::send_linger(LingerOp *info) vector opv = info->ops; // need to pass a copy to ops Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL; Context *oncommit = new C_Linger_Commit(this, info); - Op *o = new Op(info->oid, info->oloc, opv, info->flags | CEPH_OSD_FLAG_READ, + Op *o = new Op(info->target.base_oid, info->target.base_oloc, + opv, info->target.flags | CEPH_OSD_FLAG_READ, onack, oncommit, info->pobjver); o->snapid = info->snap; @@ -391,13 +392,13 @@ ceph_tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t& version_t *objver) { LingerOp *info = new LingerOp; - info->oid = oid; - info->oloc = oloc; - if (info->oloc.key == oid) - info->oloc.key.clear(); + info->target.base_oid = oid; + info->target.base_oloc = oloc; + if (info->target.base_oloc.key == oid) + info->target.base_oloc.key.clear(); info->snapc = snapc; info->mtime = mtime; - info->flags = flags | CEPH_OSD_FLAG_WRITE; + info->target.flags = flags | CEPH_OSD_FLAG_WRITE; info->ops = op.ops; info->inbl = inbl; info->poutbl = NULL; @@ -422,12 +423,12 @@ ceph_tid_t Objecter::linger_read(const object_t& oid, const object_locator_t& ol version_t *objver) { LingerOp *info = new LingerOp; - info->oid = oid; - info->oloc = oloc; - if (info->oloc.key == oid) - info->oloc.key.clear(); + info->target.base_oid = oid; + info->target.base_oloc = oloc; + if (info->target.base_oloc.key == oid) + info->target.base_oloc.key.clear(); info->snap = snap; - info->flags = flags; + info->target.flags = flags; info->ops = op.ops; info->inbl = inbl; info->poutbl = poutbl; @@ -515,7 +516,7 @@ void Objecter::scan_requests(bool force_resend, switch (r) { case RECALC_OP_TARGET_NO_ACTION: if (!force_resend && - (!force_resend_writes || !(op->flags & CEPH_OSD_FLAG_WRITE))) + (!force_resend_writes || !(op->target.flags & CEPH_OSD_FLAG_WRITE))) break; // -- fall-thru -- case RECALC_OP_TARGET_NEED_RESEND: @@ -665,7 +666,7 @@ void Objecter::handle_osd_map(MOSDMap *m) for (map::iterator p = need_resend.begin(); p != need_resend.end(); ++p) { Op *op = p->second; if (op->should_resend) { - if (op->session && !op->paused) { + if (op->session && !op->target.paused) { logger->inc(l_osdc_op_resend); send_op(op); } @@ -750,7 +751,7 @@ void Objecter::check_op_pool_dne(Op *op) if (osdmap->get_epoch() >= op->map_dne_bound) { // we had a new enough map ldout(cct, 10) << "check_op_pool_dne tid " << op->tid - << " concluding pool " << op->pgid.pool() << " dne" + << " concluding pool " << op->target.base_pgid.pool() << " dne" << dendl; if (op->onack) { op->onack->complete(-ENOENT); @@ -1049,7 +1050,7 @@ void Objecter::kick_requests(OSDSession *session) ++p; logger->inc(l_osdc_op_resend); if (op->should_resend) { - if (!op->paused) + if (!op->target.paused) resend[op->tid] = op; } else { cancel_linger_op(op); @@ -1267,14 +1268,14 @@ ceph_tid_t Objecter::_op_submit(Op *op) logger->set(l_osdc_op_active, ops.size()); logger->inc(l_osdc_op); - if ((op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) + if ((op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) logger->inc(l_osdc_op_rmw); - else if (op->flags & CEPH_OSD_FLAG_WRITE) + else if (op->target.flags & CEPH_OSD_FLAG_WRITE) logger->inc(l_osdc_op_w); - else if (op->flags & CEPH_OSD_FLAG_READ) + else if (op->target.flags & CEPH_OSD_FLAG_READ) logger->inc(l_osdc_op_r); - if (op->flags & CEPH_OSD_FLAG_PGOP) + if (op->target.flags & CEPH_OSD_FLAG_PGOP) logger->inc(l_osdc_op_pg); for (vector::iterator p = op->ops.begin(); p != op->ops.end(); ++p) { @@ -1310,27 +1311,27 @@ ceph_tid_t Objecter::_op_submit(Op *op) } // send? - ldout(cct, 10) << "op_submit oid " << op->base_oid - << " " << op->base_oloc << " " << op->target_oloc + ldout(cct, 10) << "op_submit oid " << op->target.base_oid + << " " << op->target.base_oloc << " " << op->target.target_oloc << " " << op->ops << " tid " << op->tid << " osd." << (op->session ? op->session->osd : -1) << dendl; - assert(op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); + assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); - if ((op->flags & CEPH_OSD_FLAG_WRITE) && + if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) { ldout(cct, 10) << " paused modify " << op << " tid " << last_tid << dendl; - op->paused = true; + op->target.paused = true; maybe_request_map(); - } else if ((op->flags & CEPH_OSD_FLAG_READ) && + } else if ((op->target.flags & CEPH_OSD_FLAG_READ) && osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) { ldout(cct, 10) << " paused read " << op << " tid " << last_tid << dendl; - op->paused = true; + op->target.paused = true; maybe_request_map(); - } else if ((op->flags & CEPH_OSD_FLAG_WRITE) && osdmap_full_flag()) { + } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && osdmap_full_flag()) { ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid << dendl; - op->paused = true; + op->target.paused = true; maybe_request_map(); } else if (op->session) { send_op(op); @@ -1391,13 +1392,13 @@ bool Objecter::is_pg_changed( return false; // same primary (tho replicas may have changed) } -bool Objecter::op_should_be_paused(Op *op) +bool Objecter::target_should_be_paused(op_target_t *t) { bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || osdmap_full_flag(); - return (op->flags & CEPH_OSD_FLAG_READ && pauserd) || - (op->flags & CEPH_OSD_FLAG_WRITE && pausewr); + return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || + (t->flags & CEPH_OSD_FLAG_WRITE && pausewr); } @@ -1430,42 +1431,42 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key, return p->raw_hash_to_pg(p->hash_key(key, ns)); } -int Objecter::recalc_op_target(Op *op) +int Objecter::calc_target(op_target_t *t) { - bool is_read = op->flags & CEPH_OSD_FLAG_READ; - bool is_write = op->flags & CEPH_OSD_FLAG_WRITE; + bool is_read = t->flags & CEPH_OSD_FLAG_READ; + bool is_write = t->flags & CEPH_OSD_FLAG_WRITE; bool need_check_tiering = false; - if (op->target_oid.name.empty()) { - op->target_oid = op->base_oid; + if (t->target_oid.name.empty()) { + t->target_oid = t->base_oid; need_check_tiering = true; } - if (op->target_oloc.empty()) { - op->target_oloc = op->base_oloc; + if (t->target_oloc.empty()) { + t->target_oloc = t->base_oloc; need_check_tiering = true; } if (need_check_tiering && - (op->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { - const pg_pool_t *pi = osdmap->get_pg_pool(op->base_oloc.pool); + (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool); if (pi) { if (is_read && pi->has_read_tier()) - op->target_oloc.pool = pi->read_tier; + t->target_oloc.pool = pi->read_tier; if (is_write && pi->has_write_tier()) - op->target_oloc.pool = pi->write_tier; + t->target_oloc.pool = pi->write_tier; } } pg_t pgid; - if (op->precalc_pgid) { - assert(op->base_oid.name.empty()); // make sure this is a listing op - ldout(cct, 10) << "recalc_op_target have " << op->base_pgid << " pool " - << osdmap->have_pg_pool(op->base_pgid.pool()) << dendl; - if (!osdmap->have_pg_pool(op->base_pgid.pool())) + if (t->precalc_pgid) { + assert(t->base_oid.name.empty()); // make sure this is a listing op + ldout(cct, 10) << "recalc_op_target have " << t->base_pgid << " pool " + << osdmap->have_pg_pool(t->base_pgid.pool()) << dendl; + if (!osdmap->have_pg_pool(t->base_pgid.pool())) return RECALC_OP_TARGET_POOL_DNE; - pgid = osdmap->raw_pg_to_pg(op->base_pgid); + pgid = osdmap->raw_pg_to_pg(t->base_pgid); } else { - int ret = osdmap->object_locator_to_pg(op->target_oid, op->target_oloc, + int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc, pgid); if (ret == -ENOENT) return RECALC_OP_TARGET_POOL_DNE; @@ -1476,33 +1477,30 @@ int Objecter::recalc_op_target(Op *op) bool need_resend = false; - bool paused = op_should_be_paused(op); - if (!paused && paused != op->paused) { - op->paused = false; + bool paused = target_should_be_paused(t); + if (!paused && paused != t->paused) { + t->paused = false; need_resend = true; } - if (op->pgid != pgid || - is_pg_changed( - op->primary, op->acting, primary, acting, op->used_replica)) { - op->pgid = pgid; - op->acting = acting; - op->primary = primary; - ldout(cct, 10) << "recalc_op_target tid " << op->tid - << " pgid " << pgid << " acting " << acting << dendl; - - OSDSession *s = NULL; - op->used_replica = false; + if (t->pgid != pgid || + is_pg_changed(t->primary, t->acting, primary, acting, t->used_replica)) { + t->pgid = pgid; + t->acting = acting; + t->primary = primary; + ldout(cct, 10) << "calc_op_target " + << " pgid " << pgid << " acting " << acting << dendl; + t->used_replica = false; if (primary != -1) { int osd; bool read = is_read && !is_write; - if (read && (op->flags & CEPH_OSD_FLAG_BALANCE_READS)) { + if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) { int p = rand() % acting.size(); if (p) - op->used_replica = true; + t->used_replica = true; osd = acting[p]; ldout(cct, 10) << " chose random osd." << osd << " of " << acting << dendl; - } else if (read && (op->flags & CEPH_OSD_FLAG_LOCALIZE_READS) && + } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) && acting.size() > 1) { // look for a local replica. prefer the primary if the // distance is the same. @@ -1521,7 +1519,7 @@ int Objecter::recalc_op_target(Op *op) best = i; best_locality = locality; if (i) - op->used_replica = true; + t->used_replica = true; } } assert(best >= 0); @@ -1529,9 +1527,23 @@ int Objecter::recalc_op_target(Op *op) } else { osd = primary; } - s = get_session(osd); + t->osd = osd; } + need_resend = true; + } + if (need_resend) { + return RECALC_OP_TARGET_NEED_RESEND; + } + return RECALC_OP_TARGET_NO_ACTION; +} +int Objecter::recalc_op_target(Op *op) +{ + int r = calc_target(&op->target); + if (r == RECALC_OP_TARGET_NEED_RESEND) { + OSDSession *s = NULL; + if (op->target.osd >= 0) + s = get_session(op->target.osd); if (op->session != s) { if (!op->session) num_homeless_ops--; @@ -1542,44 +1554,28 @@ int Objecter::recalc_op_target(Op *op) else num_homeless_ops++; } - need_resend = true; } - if (need_resend) { - return RECALC_OP_TARGET_NEED_RESEND; - } - return RECALC_OP_TARGET_NO_ACTION; + return r; } bool Objecter::recalc_linger_op_target(LingerOp *linger_op) { - int primary; - vector acting; - pg_t pgid; - int ret = osdmap->object_locator_to_pg(linger_op->oid, linger_op->oloc, pgid); - if (ret == -ENOENT) { - return RECALC_OP_TARGET_POOL_DNE; - } - osdmap->pg_to_acting_osds(pgid, &acting, &primary); - - if (pgid != linger_op->pgid || - is_pg_changed( - linger_op->primary, linger_op->acting, primary, acting, true)) { - linger_op->pgid = pgid; - linger_op->acting = acting; - linger_op->primary = primary; + int r = calc_target(&linger_op->target); + if (r == RECALC_OP_TARGET_NEED_RESEND) { ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id - << " pgid " << pgid << " acting " << acting << dendl; + << " pgid " << linger_op->target.pgid + << " acting " << linger_op->target.acting << dendl; - OSDSession *s = primary != -1 ? get_session(primary) : NULL; + OSDSession *s = linger_op->target.osd != -1 ? + get_session(linger_op->target.osd) : NULL; if (linger_op->session != s) { linger_op->session_item.remove_myself(); linger_op->session = s; if (s) s->linger_ops.push_back(&linger_op->session_item); } - return RECALC_OP_TARGET_NEED_RESEND; } - return RECALC_OP_TARGET_NO_ACTION; + return r; } void Objecter::cancel_linger_op(Op *op) @@ -1615,7 +1611,7 @@ void Objecter::send_op(Op *op) { ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl; - int flags = op->flags; + int flags = op->target.flags; if (op->oncommit) flags |= CEPH_OSD_FLAG_ONDISK; if (op->onack) @@ -1634,12 +1630,13 @@ void Objecter::send_op(Op *op) op->con->post_rx_buffer(op->tid, *op->outbl); } - op->paused = false; + op->target.paused = false; op->incarnation = op->session->incarnation; op->stamp = ceph_clock_now(cct); MOSDOp *m = new MOSDOp(client_inc, op->tid, - op->target_oid, op->target_oloc, op->pgid, + op->target.target_oid, op->target.target_oloc, + op->target.pgid, osdmap->get_epoch(), flags); @@ -1759,7 +1756,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) if (m->is_redirect_reply()) { ldout(cct, 5) << " got redirect reply; redirecting" << dendl; unregister_op(op); - m->get_redirect().combine_with_locator(op->target_oloc, op->target_oid.name); + m->get_redirect().combine_with_locator(op->target.target_oloc, + op->target.target_oid.name); _op_submit(op); m->put(); return; @@ -2519,13 +2517,28 @@ void Objecter::ms_handle_remote_reset(Connection *con) } +void Objecter::op_target_t::dump(Formatter *f) const +{ + f->dump_stream("pg") << pgid; + f->dump_int("osd", osd); + f->dump_stream("object_id") << base_oid; + f->dump_stream("object_locator") << base_oloc; + f->dump_stream("target_object_id") << target_oid; + f->dump_stream("target_object_locator") << target_oloc; + f->dump_int("paused", (int)paused); + f->dump_int("used_replica", (int)used_replica); + f->dump_int("precalc_pgid", (int)precalc_pgid); +} + void Objecter::dump_active() { ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless" << dendl; for (map::iterator p = ops.begin(); p != ops.end(); ++p) { Op *op = p->second; - ldout(cct, 20) << op->tid << "\t" << op->pgid << "\tosd." << (op->session ? op->session->osd : -1) - << "\t" << op->base_oid << "\t" << op->ops << dendl; + ldout(cct, 20) << op->tid << "\t" << op->target.pgid + << "\tosd." << (op->session ? op->session->osd : -1) + << "\t" << op->target.base_oid + << "\t" << op->ops << dendl; } } @@ -2552,13 +2565,9 @@ void Objecter::dump_ops(Formatter *fmt) const Op *op = p->second; fmt->open_object_section("op"); fmt->dump_unsigned("tid", op->tid); - fmt->dump_stream("pg") << op->pgid; - fmt->dump_int("osd", op->session ? op->session->osd : -1); + op->target.dump(fmt); fmt->dump_stream("last_sent") << op->stamp; fmt->dump_int("attempts", op->attempts); - fmt->dump_stream("object_id") << op->base_oid; - fmt->dump_stream("object_locator") << op->base_oloc; - fmt->dump_stream("target_object_locator") << op->target_oloc; fmt->dump_stream("snapid") << op->snapid; fmt->dump_stream("snap_context") << op->snapc; fmt->dump_stream("mtime") << op->mtime; @@ -2585,10 +2594,7 @@ void Objecter::dump_linger_ops(Formatter *fmt) const LingerOp *op = p->second; fmt->open_object_section("linger_op"); fmt->dump_unsigned("linger_id", op->linger_id); - fmt->dump_stream("pg") << op->pgid; - fmt->dump_int("osd", op->session ? op->session->osd : -1); - fmt->dump_stream("object_id") << op->oid; - fmt->dump_stream("object_locator") << op->oloc; + op->target.dump(fmt); fmt->dump_stream("snapid") << op->snap; fmt->dump_stream("registered") << op->registered; fmt->close_section(); // linger_op object diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index d18e65374462..b244331f6810 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1061,11 +1061,8 @@ public: struct OSDSession; - struct Op { - OSDSession *session; - xlist::item session_item; - int incarnation; - + struct op_target_t { + int flags; object_t base_oid; object_locator_t base_oloc; object_t target_oid; @@ -1077,6 +1074,32 @@ public: pg_t pgid; ///< last pg we mapped to vector acting; ///< acting for last pg we mapped to int primary; ///< primary for last pg we mapped to + + bool used_replica; + bool paused; + + int osd; ///< the final target osd, or -1 + + op_target_t(object_t oid, object_locator_t oloc, int flags) + : flags(flags), + base_oid(oid), + base_oloc(oloc), + precalc_pgid(false), + primary(-1), + used_replica(false), + paused(false), + osd(-1) + {} + + void dump(Formatter *f) const; + }; + + struct Op { + OSDSession *session; + xlist::item session_item; + int incarnation; + + op_target_t target; bool used_replica; ConnectionRef con; // for rx buffer only @@ -1092,15 +1115,13 @@ public: vector out_handler; vector out_rval; - int flags, priority; + int priority; Context *onack, *oncommit, *ontimeout; ceph_tid_t tid; eversion_t replay_version; // for op replay int attempts; - bool paused; - version_t *objver; epoch_t *reply_epoch; @@ -1116,16 +1137,14 @@ public: Op(const object_t& o, const object_locator_t& ol, vector& op, int f, Context *ac, Context *co, version_t *ov) : session(NULL), session_item(this), incarnation(0), - base_oid(o), base_oloc(ol), - precalc_pgid(false), - primary(-1), - used_replica(false), con(NULL), + target(o, ol, f), + con(NULL), snapid(CEPH_NOSNAP), outbl(NULL), - flags(f), priority(0), onack(ac), oncommit(co), + priority(0), onack(ac), oncommit(co), ontimeout(NULL), tid(0), attempts(0), - paused(false), objver(ov), reply_epoch(NULL), + objver(ov), reply_epoch(NULL), map_dne_bound(0), budgeted(false), should_resend(true) { @@ -1141,8 +1160,8 @@ public: out_rval[i] = NULL; } - if (base_oloc.key == o) - base_oloc.key.clear(); + if (target.base_oloc.key == o) + target.base_oloc.key.clear(); } ~Op() { while (!out_handler.empty()) { @@ -1334,18 +1353,13 @@ public: struct LingerOp : public RefCountedObject { uint64_t linger_id; - object_t oid; - object_locator_t oloc; - pg_t pgid; - vector acting; - int primary; + op_target_t target; snapid_t snap; SnapContext snapc; utime_t mtime; - int flags; vector ops; bufferlist inbl; bufferlist *poutbl; @@ -1360,8 +1374,9 @@ public: ceph_tid_t register_tid; epoch_t map_dne_bound; - LingerOp() : linger_id(0), primary(-1), - snap(CEPH_NOSNAP), flags(0), + LingerOp() : linger_id(0), + target(object_t(), object_locator_t(), 0), + snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), registered(false), on_reg_ack(NULL), on_reg_commit(NULL), @@ -1464,7 +1479,9 @@ public: RECALC_OP_TARGET_OSD_DOWN, }; bool osdmap_full_flag() const; - bool op_should_be_paused(Op *op); + bool target_should_be_paused(op_target_t *op); + + int calc_target(op_target_t *t); int recalc_op_target(Op *op); bool recalc_linger_op_target(LingerOp *op); @@ -1695,8 +1712,8 @@ public: Op *o = new Op(object_t(), oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onack, NULL, NULL); - o->precalc_pgid = true; - o->base_pgid = pg_t(hash, oloc.pool); + o->target.precalc_pgid = true; + o->target.base_pgid = pg_t(hash, oloc.pool); o->priority = op.priority; o->snapid = CEPH_NOSNAP; o->outbl = pbl;