vector<OSDOp> opv = info->ops; // need to pass a copy to ops
Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
Context *oncommit = new C_Linger_Commit(this, info);
- Op *o = new Op(info->oid, info->oloc, opv, info->flags | CEPH_OSD_FLAG_READ,
+ Op *o = new Op(info->target.base_oid, info->target.base_oloc,
+ opv, info->target.flags | CEPH_OSD_FLAG_READ,
onack, oncommit,
info->pobjver);
o->snapid = info->snap;
version_t *objver)
{
LingerOp *info = new LingerOp;
- info->oid = oid;
- info->oloc = oloc;
- if (info->oloc.key == oid)
- info->oloc.key.clear();
+ info->target.base_oid = oid;
+ info->target.base_oloc = oloc;
+ if (info->target.base_oloc.key == oid)
+ info->target.base_oloc.key.clear();
info->snapc = snapc;
info->mtime = mtime;
- info->flags = flags | CEPH_OSD_FLAG_WRITE;
+ info->target.flags = flags | CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
info->inbl = inbl;
info->poutbl = NULL;
version_t *objver)
{
LingerOp *info = new LingerOp;
- info->oid = oid;
- info->oloc = oloc;
- if (info->oloc.key == oid)
- info->oloc.key.clear();
+ info->target.base_oid = oid;
+ info->target.base_oloc = oloc;
+ if (info->target.base_oloc.key == oid)
+ info->target.base_oloc.key.clear();
info->snap = snap;
- info->flags = flags;
+ info->target.flags = flags;
info->ops = op.ops;
info->inbl = inbl;
info->poutbl = poutbl;
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend &&
- (!force_resend_writes || !(op->flags & CEPH_OSD_FLAG_WRITE)))
+ (!force_resend_writes || !(op->target.flags & CEPH_OSD_FLAG_WRITE)))
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin(); p != need_resend.end(); ++p) {
Op *op = p->second;
if (op->should_resend) {
- if (op->session && !op->paused) {
+ if (op->session && !op->target.paused) {
logger->inc(l_osdc_op_resend);
send_op(op);
}
if (osdmap->get_epoch() >= op->map_dne_bound) {
// we had a new enough map
ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
- << " concluding pool " << op->pgid.pool() << " dne"
+ << " concluding pool " << op->target.base_pgid.pool() << " dne"
<< dendl;
if (op->onack) {
op->onack->complete(-ENOENT);
++p;
logger->inc(l_osdc_op_resend);
if (op->should_resend) {
- if (!op->paused)
+ if (!op->target.paused)
resend[op->tid] = op;
} else {
cancel_linger_op(op);
logger->set(l_osdc_op_active, ops.size());
logger->inc(l_osdc_op);
- if ((op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
+ if ((op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
logger->inc(l_osdc_op_rmw);
- else if (op->flags & CEPH_OSD_FLAG_WRITE)
+ else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
logger->inc(l_osdc_op_w);
- else if (op->flags & CEPH_OSD_FLAG_READ)
+ else if (op->target.flags & CEPH_OSD_FLAG_READ)
logger->inc(l_osdc_op_r);
- if (op->flags & CEPH_OSD_FLAG_PGOP)
+ if (op->target.flags & CEPH_OSD_FLAG_PGOP)
logger->inc(l_osdc_op_pg);
for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
}
// send?
- ldout(cct, 10) << "op_submit oid " << op->base_oid
- << " " << op->base_oloc << " " << op->target_oloc
+ ldout(cct, 10) << "op_submit oid " << op->target.base_oid
+ << " " << op->target.base_oloc << " " << op->target.target_oloc
<< " " << op->ops << " tid " << op->tid
<< " osd." << (op->session ? op->session->osd : -1)
<< dendl;
- assert(op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
+ assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
- if ((op->flags & CEPH_OSD_FLAG_WRITE) &&
+ if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
ldout(cct, 10) << " paused modify " << op << " tid " << last_tid << dendl;
- op->paused = true;
+ op->target.paused = true;
maybe_request_map();
- } else if ((op->flags & CEPH_OSD_FLAG_READ) &&
+ } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
ldout(cct, 10) << " paused read " << op << " tid " << last_tid << dendl;
- op->paused = true;
+ op->target.paused = true;
maybe_request_map();
- } else if ((op->flags & CEPH_OSD_FLAG_WRITE) && osdmap_full_flag()) {
+ } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && osdmap_full_flag()) {
ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid << dendl;
- op->paused = true;
+ op->target.paused = true;
maybe_request_map();
} else if (op->session) {
send_op(op);
return false; // same primary (tho replicas may have changed)
}
-bool Objecter::op_should_be_paused(Op *op)
+bool Objecter::target_should_be_paused(op_target_t *t)
{
bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || osdmap_full_flag();
- return (op->flags & CEPH_OSD_FLAG_READ && pauserd) ||
- (op->flags & CEPH_OSD_FLAG_WRITE && pausewr);
+ return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
+ (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
}
return p->raw_hash_to_pg(p->hash_key(key, ns));
}
-int Objecter::recalc_op_target(Op *op)
+int Objecter::calc_target(op_target_t *t)
{
- bool is_read = op->flags & CEPH_OSD_FLAG_READ;
- bool is_write = op->flags & CEPH_OSD_FLAG_WRITE;
+ bool is_read = t->flags & CEPH_OSD_FLAG_READ;
+ bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
bool need_check_tiering = false;
- if (op->target_oid.name.empty()) {
- op->target_oid = op->base_oid;
+ if (t->target_oid.name.empty()) {
+ t->target_oid = t->base_oid;
need_check_tiering = true;
}
- if (op->target_oloc.empty()) {
- op->target_oloc = op->base_oloc;
+ if (t->target_oloc.empty()) {
+ t->target_oloc = t->base_oloc;
need_check_tiering = true;
}
if (need_check_tiering &&
- (op->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
- const pg_pool_t *pi = osdmap->get_pg_pool(op->base_oloc.pool);
+ (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+ const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
if (pi) {
if (is_read && pi->has_read_tier())
- op->target_oloc.pool = pi->read_tier;
+ t->target_oloc.pool = pi->read_tier;
if (is_write && pi->has_write_tier())
- op->target_oloc.pool = pi->write_tier;
+ t->target_oloc.pool = pi->write_tier;
}
}
pg_t pgid;
- if (op->precalc_pgid) {
- assert(op->base_oid.name.empty()); // make sure this is a listing op
- ldout(cct, 10) << "recalc_op_target have " << op->base_pgid << " pool "
- << osdmap->have_pg_pool(op->base_pgid.pool()) << dendl;
- if (!osdmap->have_pg_pool(op->base_pgid.pool()))
+ if (t->precalc_pgid) {
+ assert(t->base_oid.name.empty()); // make sure this is a listing op
+ ldout(cct, 10) << "recalc_op_target have " << t->base_pgid << " pool "
+ << osdmap->have_pg_pool(t->base_pgid.pool()) << dendl;
+ if (!osdmap->have_pg_pool(t->base_pgid.pool()))
return RECALC_OP_TARGET_POOL_DNE;
- pgid = osdmap->raw_pg_to_pg(op->base_pgid);
+ pgid = osdmap->raw_pg_to_pg(t->base_pgid);
} else {
- int ret = osdmap->object_locator_to_pg(op->target_oid, op->target_oloc,
+ int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
pgid);
if (ret == -ENOENT)
return RECALC_OP_TARGET_POOL_DNE;
bool need_resend = false;
- bool paused = op_should_be_paused(op);
- if (!paused && paused != op->paused) {
- op->paused = false;
+ bool paused = target_should_be_paused(t);
+ if (!paused && paused != t->paused) {
+ t->paused = false;
need_resend = true;
}
- if (op->pgid != pgid ||
- is_pg_changed(
- op->primary, op->acting, primary, acting, op->used_replica)) {
- op->pgid = pgid;
- op->acting = acting;
- op->primary = primary;
- ldout(cct, 10) << "recalc_op_target tid " << op->tid
- << " pgid " << pgid << " acting " << acting << dendl;
-
- OSDSession *s = NULL;
- op->used_replica = false;
+ if (t->pgid != pgid ||
+ is_pg_changed(t->primary, t->acting, primary, acting, t->used_replica)) {
+ t->pgid = pgid;
+ t->acting = acting;
+ t->primary = primary;
+ ldout(cct, 10) << "calc_op_target "
+ << " pgid " << pgid << " acting " << acting << dendl;
+ t->used_replica = false;
if (primary != -1) {
int osd;
bool read = is_read && !is_write;
- if (read && (op->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
+ if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
int p = rand() % acting.size();
if (p)
- op->used_replica = true;
+ t->used_replica = true;
osd = acting[p];
ldout(cct, 10) << " chose random osd." << osd << " of " << acting << dendl;
- } else if (read && (op->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
+ } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
acting.size() > 1) {
// look for a local replica. prefer the primary if the
// distance is the same.
best = i;
best_locality = locality;
if (i)
- op->used_replica = true;
+ t->used_replica = true;
}
}
assert(best >= 0);
} else {
osd = primary;
}
- s = get_session(osd);
+ t->osd = osd;
}
+ need_resend = true;
+ }
+ if (need_resend) {
+ return RECALC_OP_TARGET_NEED_RESEND;
+ }
+ return RECALC_OP_TARGET_NO_ACTION;
+}
+int Objecter::recalc_op_target(Op *op)
+{
+ int r = calc_target(&op->target);
+ if (r == RECALC_OP_TARGET_NEED_RESEND) {
+ OSDSession *s = NULL;
+ if (op->target.osd >= 0)
+ s = get_session(op->target.osd);
if (op->session != s) {
if (!op->session)
num_homeless_ops--;
else
num_homeless_ops++;
}
- need_resend = true;
}
- if (need_resend) {
- return RECALC_OP_TARGET_NEED_RESEND;
- }
- return RECALC_OP_TARGET_NO_ACTION;
+ return r;
}
bool Objecter::recalc_linger_op_target(LingerOp *linger_op)
{
- int primary;
- vector<int> acting;
- pg_t pgid;
- int ret = osdmap->object_locator_to_pg(linger_op->oid, linger_op->oloc, pgid);
- if (ret == -ENOENT) {
- return RECALC_OP_TARGET_POOL_DNE;
- }
- osdmap->pg_to_acting_osds(pgid, &acting, &primary);
-
- if (pgid != linger_op->pgid ||
- is_pg_changed(
- linger_op->primary, linger_op->acting, primary, acting, true)) {
- linger_op->pgid = pgid;
- linger_op->acting = acting;
- linger_op->primary = primary;
+ int r = calc_target(&linger_op->target);
+ if (r == RECALC_OP_TARGET_NEED_RESEND) {
ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
- << " pgid " << pgid << " acting " << acting << dendl;
+ << " pgid " << linger_op->target.pgid
+ << " acting " << linger_op->target.acting << dendl;
- OSDSession *s = primary != -1 ? get_session(primary) : NULL;
+ OSDSession *s = linger_op->target.osd != -1 ?
+ get_session(linger_op->target.osd) : NULL;
if (linger_op->session != s) {
linger_op->session_item.remove_myself();
linger_op->session = s;
if (s)
s->linger_ops.push_back(&linger_op->session_item);
}
- return RECALC_OP_TARGET_NEED_RESEND;
}
- return RECALC_OP_TARGET_NO_ACTION;
+ return r;
}
void Objecter::cancel_linger_op(Op *op)
{
ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl;
- int flags = op->flags;
+ int flags = op->target.flags;
if (op->oncommit)
flags |= CEPH_OSD_FLAG_ONDISK;
if (op->onack)
op->con->post_rx_buffer(op->tid, *op->outbl);
}
- op->paused = false;
+ op->target.paused = false;
op->incarnation = op->session->incarnation;
op->stamp = ceph_clock_now(cct);
MOSDOp *m = new MOSDOp(client_inc, op->tid,
- op->target_oid, op->target_oloc, op->pgid,
+ op->target.target_oid, op->target.target_oloc,
+ op->target.pgid,
osdmap->get_epoch(),
flags);
if (m->is_redirect_reply()) {
ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
unregister_op(op);
- m->get_redirect().combine_with_locator(op->target_oloc, op->target_oid.name);
+ m->get_redirect().combine_with_locator(op->target.target_oloc,
+ op->target.target_oid.name);
_op_submit(op);
m->put();
return;
}
+void Objecter::op_target_t::dump(Formatter *f) const
+{
+ f->dump_stream("pg") << pgid;
+ f->dump_int("osd", osd);
+ f->dump_stream("object_id") << base_oid;
+ f->dump_stream("object_locator") << base_oloc;
+ f->dump_stream("target_object_id") << target_oid;
+ f->dump_stream("target_object_locator") << target_oloc;
+ f->dump_int("paused", (int)paused);
+ f->dump_int("used_replica", (int)used_replica);
+ f->dump_int("precalc_pgid", (int)precalc_pgid);
+}
+
void Objecter::dump_active()
{
ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless" << dendl;
for (map<ceph_tid_t,Op*>::iterator p = ops.begin(); p != ops.end(); ++p) {
Op *op = p->second;
- ldout(cct, 20) << op->tid << "\t" << op->pgid << "\tosd." << (op->session ? op->session->osd : -1)
- << "\t" << op->base_oid << "\t" << op->ops << dendl;
+ ldout(cct, 20) << op->tid << "\t" << op->target.pgid
+ << "\tosd." << (op->session ? op->session->osd : -1)
+ << "\t" << op->target.base_oid
+ << "\t" << op->ops << dendl;
}
}
Op *op = p->second;
fmt->open_object_section("op");
fmt->dump_unsigned("tid", op->tid);
- fmt->dump_stream("pg") << op->pgid;
- fmt->dump_int("osd", op->session ? op->session->osd : -1);
+ op->target.dump(fmt);
fmt->dump_stream("last_sent") << op->stamp;
fmt->dump_int("attempts", op->attempts);
- fmt->dump_stream("object_id") << op->base_oid;
- fmt->dump_stream("object_locator") << op->base_oloc;
- fmt->dump_stream("target_object_locator") << op->target_oloc;
fmt->dump_stream("snapid") << op->snapid;
fmt->dump_stream("snap_context") << op->snapc;
fmt->dump_stream("mtime") << op->mtime;
LingerOp *op = p->second;
fmt->open_object_section("linger_op");
fmt->dump_unsigned("linger_id", op->linger_id);
- fmt->dump_stream("pg") << op->pgid;
- fmt->dump_int("osd", op->session ? op->session->osd : -1);
- fmt->dump_stream("object_id") << op->oid;
- fmt->dump_stream("object_locator") << op->oloc;
+ op->target.dump(fmt);
fmt->dump_stream("snapid") << op->snap;
fmt->dump_stream("registered") << op->registered;
fmt->close_section(); // linger_op object
struct OSDSession;
- struct Op {
- OSDSession *session;
- xlist<Op*>::item session_item;
- int incarnation;
-
+ struct op_target_t {
+ int flags;
object_t base_oid;
object_locator_t base_oloc;
object_t target_oid;
pg_t pgid; ///< last pg we mapped to
vector<int> acting; ///< acting for last pg we mapped to
int primary; ///< primary for last pg we mapped to
+
+ bool used_replica;
+ bool paused;
+
+ int osd; ///< the final target osd, or -1
+
+ op_target_t(object_t oid, object_locator_t oloc, int flags)
+ : flags(flags),
+ base_oid(oid),
+ base_oloc(oloc),
+ precalc_pgid(false),
+ primary(-1),
+ used_replica(false),
+ paused(false),
+ osd(-1)
+ {}
+
+ void dump(Formatter *f) const;
+ };
+
+ struct Op {
+ OSDSession *session;
+ xlist<Op*>::item session_item;
+ int incarnation;
+
+ op_target_t target;
bool used_replica;
ConnectionRef con; // for rx buffer only
vector<Context*> out_handler;
vector<int*> out_rval;
- int flags, priority;
+ int priority;
Context *onack, *oncommit, *ontimeout;
ceph_tid_t tid;
eversion_t replay_version; // for op replay
int attempts;
- bool paused;
-
version_t *objver;
epoch_t *reply_epoch;
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, version_t *ov) :
session(NULL), session_item(this), incarnation(0),
- base_oid(o), base_oloc(ol),
- precalc_pgid(false),
- primary(-1),
- used_replica(false), con(NULL),
+ target(o, ol, f),
+ con(NULL),
snapid(CEPH_NOSNAP),
outbl(NULL),
- flags(f), priority(0), onack(ac), oncommit(co),
+ priority(0), onack(ac), oncommit(co),
ontimeout(NULL),
tid(0), attempts(0),
- paused(false), objver(ov), reply_epoch(NULL),
+ objver(ov), reply_epoch(NULL),
map_dne_bound(0),
budgeted(false),
should_resend(true) {
out_rval[i] = NULL;
}
- if (base_oloc.key == o)
- base_oloc.key.clear();
+ if (target.base_oloc.key == o)
+ target.base_oloc.key.clear();
}
~Op() {
while (!out_handler.empty()) {
struct LingerOp : public RefCountedObject {
uint64_t linger_id;
- object_t oid;
- object_locator_t oloc;
- pg_t pgid;
- vector<int> acting;
- int primary;
+ op_target_t target;
snapid_t snap;
SnapContext snapc;
utime_t mtime;
- int flags;
vector<OSDOp> ops;
bufferlist inbl;
bufferlist *poutbl;
ceph_tid_t register_tid;
epoch_t map_dne_bound;
- LingerOp() : linger_id(0), primary(-1),
- snap(CEPH_NOSNAP), flags(0),
+ LingerOp() : linger_id(0),
+ target(object_t(), object_locator_t(), 0),
+ snap(CEPH_NOSNAP),
poutbl(NULL), pobjver(NULL),
registered(false),
on_reg_ack(NULL), on_reg_commit(NULL),
RECALC_OP_TARGET_OSD_DOWN,
};
bool osdmap_full_flag() const;
- bool op_should_be_paused(Op *op);
+ bool target_should_be_paused(op_target_t *op);
+
+ int calc_target(op_target_t *t);
int recalc_op_target(Op *op);
bool recalc_linger_op_target(LingerOp *op);
Op *o = new Op(object_t(), oloc,
op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ,
onack, NULL, NULL);
- o->precalc_pgid = true;
- o->base_pgid = pg_t(hash, oloc.pool);
+ o->target.precalc_pgid = true;
+ o->target.base_pgid = pg_t(hash, oloc.pool);
o->priority = op.priority;
o->snapid = CEPH_NOSNAP;
o->outbl = pbl;