// messages ------------------------------
-void Objecter::init_unlocked()
+void Objecter::init()
{
- assert(!initialized);
+ assert(!initialized.read());
if (!logger) {
PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
lderr(cct) << "error registering admin socket command: "
<< cpp_strerror(ret) << dendl;
}
-}
-void Objecter::init_locked()
-{
- assert(client_lock.is_locked());
- assert(!initialized);
+ rwlock.get_read();
schedule_tick();
- if (osdmap->get_epoch() == 0)
- maybe_request_map();
+ if (osdmap->get_epoch() == 0) {
+ int r = _maybe_request_map();
+ assert (r == 0 || osdmap->get_epoch() > 0);
+ }
- initialized = true;
+ rwlock.unlock();
+
+ initialized.set(1);
}
-void Objecter::shutdown_locked()
+void Objecter::shutdown()
{
- assert(client_lock.is_locked());
- assert(initialized);
- initialized = false;
+ assert(initialized.read());
+ initialized.set(0);
+
+ RWLock::WLocker wl(rwlock);
map<int,OSDSession*>::iterator p;
while (!osd_sessions.empty()) {
timer.cancel_event(tick_event);
tick_event = NULL;
}
-}
-void Objecter::shutdown_unlocked()
-{
if (m_request_state_hook) {
AdminSocket* admin_socket = cct->get_admin_socket();
admin_socket->unregister_command("objecter_requests");
}
}
-void Objecter::send_linger(LingerOp *info)
+void Objecter::_send_linger(LingerOp *info)
{
+ assert(rwlock.is_wlocked());
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
vector<OSDOp> opv = info->ops; // need to pass a copy to ops
- Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
+ Context *onack = (!info->registered && info->on_reg_ack) ?
+ new C_Linger_Ack(this, info) : NULL;
Context *oncommit = new C_Linger_Commit(this, info);
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
opv, info->target.flags | CEPH_OSD_FLAG_READ,
o->snapc = info->snapc;
o->mtime = info->mtime;
+ o->target = info->target;
+ o->tid = last_tid.inc();
+
// do not resend this; we will send a new op to reregister
o->should_resend = false;
- if (info->session) {
- int r = recalc_op_target(o);
- if (r == RECALC_OP_TARGET_POOL_DNE) {
- _send_linger_map_check(info);
- }
- }
-
if (info->register_tid) {
// repeat send. cancel old registeration op, if any.
- if (ops.count(info->register_tid)) {
- Op *o = ops[info->register_tid];
- op_cancel_map_check(o);
- cancel_linger_op(o);
+ info->session->lock.get_write();
+ if (info->session->ops.count(info->register_tid)) {
+ Op *o = info->session->ops[info->register_tid];
+ _op_cancel_map_check(o);
+ _cancel_linger_op(o);
}
- info->register_tid = _op_submit(o);
+ info->session->lock.unlock();
+
+ info->register_tid = _op_submit(o, lc);
} else {
// first send
- // populate info->pgid and info->acting so we
- // don't resend the linger op on the next osdmap update
- recalc_linger_op_target(info);
- info->register_tid = op_submit(o);
- }
-
- OSDSession *s = o->session;
- if (info->session != s) {
- info->session_item.remove_myself();
- info->session = s;
- if (info->session)
- s->linger_ops.push_back(&info->session_item);
+ info->register_tid = _op_submit_with_budget(o, lc);
}
logger->inc(l_osdc_linger_send);
map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
if (iter != linger_ops.end()) {
LingerOp *info = iter->second;
- info->session_item.remove_myself();
+ info->session->lock.get_write();
+ info->session->linger_ops.erase(linger_id);
+ info->session->lock.unlock();
linger_ops.erase(iter);
info->put();
- logger->set(l_osdc_linger_active, linger_ops.size());
+
+ logger->dec(l_osdc_linger_active);
}
}
info->on_reg_ack = onack;
info->on_reg_commit = oncommit;
- info->linger_id = ++max_linger_id;
- linger_ops[info->linger_id] = info;
-
- logger->set(l_osdc_linger_active, linger_ops.size());
-
- send_linger(info);
-
+ RWLock::WLocker wl(rwlock);
+ _linger_submit(info);
+ logger->inc(l_osdc_linger_active);
return info->linger_id;
}
if (info->target.base_oloc.key == oid)
info->target.base_oloc.key.clear();
info->snap = snap;
- info->target.flags = flags;
+ info->target.flags = flags | CEPH_OSD_FLAG_READ;
info->ops = op.ops;
info->inbl = inbl;
info->poutbl = poutbl;
info->pobjver = objver;
info->on_reg_commit = onfinish;
+ RWLock::WLocker wl(rwlock);
+ _linger_submit(info);
+ logger->inc(l_osdc_linger_active);
+ return info->linger_id;
+}
+
+void Objecter::_linger_submit(LingerOp *info)
+{
+ assert(rwlock.is_wlocked());
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
info->linger_id = ++max_linger_id;
+ ldout(cct, 10) << __func__ << " info " << info
+ << " linger_id " << info->linger_id << dendl;
linger_ops[info->linger_id] = info;
- logger->set(l_osdc_linger_active, linger_ops.size());
+ OSDSession *s = NULL;
+ _calc_target(&info->target);
+ int r = _get_session(info->target.osd, &s, lc);
+ assert(r == 0);
- send_linger(info);
+ info->session = s;
- return info->linger_id;
+ s->lock.get_write();
+ s->linger_ops[info->linger_id] = info;
+ s->lock.unlock();
+ _send_linger(info);
}
+
+
void Objecter::dispatch(Message *m)
{
switch (m->get_type()) {
}
}
-void Objecter::scan_requests(bool force_resend,
+void Objecter::_scan_requests(OSDSession *s,
+ bool force_resend,
bool force_resend_writes,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
map<ceph_tid_t, CommandOp*>& need_resend_command)
{
+ assert(rwlock.is_wlocked());
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+ RWLock::WLocker wl(s->lock);
+
// check for changed linger mappings (_before_ regular ops)
- map<ceph_tid_t,LingerOp*>::iterator lp = linger_ops.begin();
- while (lp != linger_ops.end()) {
+ map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
+ while (lp != s->linger_ops.end()) {
LingerOp *op = lp->second;
++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
- int r = recalc_linger_op_target(op);
+ int r = _calc_target(&op->target);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend && !force_resend_writes)
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
+ if (op->session) {
+ _session_linger_op_remove(op);
+ }
need_resend_linger.push_back(op);
- linger_cancel_map_check(op);
+ _linger_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
- check_linger_pool_dne(op);
+ _check_linger_pool_dne(op);
break;
}
}
// check for changed request mappings
- map<ceph_tid_t,Op*>::iterator p = ops.begin();
- while (p != ops.end()) {
+ map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
+ while (p != s->ops.end()) {
Op *op = p->second;
++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
ldout(cct, 10) << " checking op " << op->tid << dendl;
- int r = recalc_op_target(op);
+ int r = _calc_target(&op->target);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend &&
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
+ if (op->session) {
+ _session_op_remove(op);
+ }
need_resend[op->tid] = op;
- op_cancel_map_check(op);
+ _op_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
- check_op_pool_dne(op);
+ _check_op_pool_dne(op);
break;
}
}
// commands
- map<ceph_tid_t,CommandOp*>::iterator cp = command_ops.begin();
- while (cp != command_ops.end()) {
+ map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
+ while (cp != s->command_ops.end()) {
CommandOp *c = cp->second;
++cp;
ldout(cct, 10) << " checking command " << c->tid << dendl;
- int r = recalc_command_target(c);
+ int r = _recalc_command_target(c);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
// resend if skipped map; otherwise do nothing.
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
need_resend_command[c->tid] = c;
- command_cancel_map_check(c);
+ _command_cancel_map_check(c);
break;
case RECALC_OP_TARGET_POOL_DNE:
case RECALC_OP_TARGET_OSD_DNE:
case RECALC_OP_TARGET_OSD_DOWN:
- check_command_map_dne(c);
+ _check_command_map_dne(c);
break;
}
}
void Objecter::handle_osd_map(MOSDMap *m)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ RWLock::WLocker wl(rwlock);
+
assert(osdmap);
if (m->fsid != monc->get_fsid()) {
- ldout(cct, 0) << "handle_osd_map fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
+ ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
+ << " != " << monc->get_fsid() << dendl;
m->put();
return;
}
ldout(cct, 3) << "handle_osd_map ignoring epochs ["
<< m->get_first() << "," << m->get_last()
<< "] <= " << osdmap->get_epoch() << dendl;
- }
- else {
+ } else {
ldout(cct, 3) << "handle_osd_map got epochs ["
<< m->get_first() << "," << m->get_last()
<< "] > " << osdmap->get_epoch()
if (osdmap->get_epoch() == e-1 &&
m->incremental_maps.count(e)) {
- ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e << dendl;
+ ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
+ << dendl;
OSDMap::Incremental inc(m->incremental_maps[e]);
osdmap->apply_incremental(inc);
logger->inc(l_osdc_map_inc);
}
else {
if (e && e > m->get_oldest()) {
- ldout(cct, 3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
- maybe_request_map();
+ ldout(cct, 3) << "handle_osd_map requesting missing epoch "
+ << osdmap->get_epoch()+1 << dendl;
+ int r = _maybe_request_map();
+ assert(r == 0);
break;
}
- ldout(cct, 3) << "handle_osd_map missing epoch " << osdmap->get_epoch()+1
+ ldout(cct, 3) << "handle_osd_map missing epoch "
+ << osdmap->get_epoch()+1
<< ", jumping to " << m->get_oldest() << dendl;
e = m->get_oldest() - 1;
skipped_map = true;
logger->set(l_osdc_map_epoch, osdmap->get_epoch());
was_full = was_full || osdmap_full_flag();
- scan_requests(skipped_map, was_full, need_resend, need_resend_linger,
- need_resend_command);
+ _scan_requests(&homeless_session, skipped_map, was_full,
+ need_resend, need_resend_linger,
+ need_resend_command);
// osd addr changes?
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
p != osd_sessions.end(); ) {
OSDSession *s = p->second;
+ _scan_requests(s, skipped_map, was_full,
+ need_resend, need_resend_linger,
+ need_resend_command);
++p;
- if (osdmap->is_up(s->osd)) {
- if (s->con && s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)
- close_session(s);
- } else {
+ if (!osdmap->is_up(s->osd) ||
+ (s->con &&
+ s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
close_session(s);
}
}
} else {
// first map. we want the full thing.
if (m->maps.count(m->get_last())) {
- ldout(cct, 3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl;
+ for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+ p != osd_sessions.end(); ++p) {
+ OSDSession *s = p->second;
+ _scan_requests(s, false, false, need_resend, need_resend_linger,
+ need_resend_command);
+ }
+ ldout(cct, 3) << "handle_osd_map decoding full epoch "
+ << m->get_last() << dendl;
osdmap->decode(m->maps[m->get_last()]);
- scan_requests(false, false, need_resend, need_resend_linger,
- need_resend_command);
+ _scan_requests(&homeless_session, false, false,
+ need_resend, need_resend_linger,
+ need_resend_command);
} else {
- ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
+ ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
+ << dendl;
monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || osdmap_full_flag();
// was/is paused?
- if (was_pauserd || was_pausewr || pauserd || pausewr)
- maybe_request_map();
+ if (was_pauserd || was_pausewr || pauserd || pausewr) {
+ int r = _maybe_request_map();
+ assert(r == 0);
+ }
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
// resend requests
- for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin(); p != need_resend.end(); ++p) {
+ for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
+ p != need_resend.end(); ++p) {
Op *op = p->second;
+ OSDSession *s = op->session;
+ bool mapped_session = false;
+ if (!s) {
+ int r = _map_session(&op->target, &s, lc);
+ assert(r == 0);
+ mapped_session = true;
+ }
+ s->lock.get_write();
+ if (mapped_session) {
+ _session_op_assign(op, s);
+ }
if (op->should_resend) {
- if (op->session && !op->target.paused) {
+ if (!op->session->is_homeless() && !op->target.paused) {
logger->inc(l_osdc_op_resend);
- send_op(op);
+ _send_op(op);
}
} else {
- cancel_linger_op(op);
+ _cancel_linger_op(op);
}
+ s->lock.unlock();
}
- for (list<LingerOp*>::iterator p = need_resend_linger.begin(); p != need_resend_linger.end(); ++p) {
+ for (list<LingerOp*>::iterator p = need_resend_linger.begin();
+ p != need_resend_linger.end(); ++p) {
LingerOp *op = *p;
- if (op->session) {
+ if (!op->session) {
+ _calc_target(&op->target);
+ int r = _get_session(op->target.osd, &op->session, lc);
+ assert(r == 0);
+ }
+ if (!op->session->is_homeless()) {
logger->inc(l_osdc_linger_resend);
- send_linger(op);
+ _send_linger(op);
}
}
- for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin(); p != need_resend_command.end(); ++p) {
+ for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
+ p != need_resend_command.end(); ++p) {
CommandOp *c = p->second;
if (c->session) {
_send_command(c);
}
}
- dump_active();
+ _dump_active();
// finish any Contexts that were waiting on a map update
map<epoch_t,list< pair< Context*, int > > >::iterator p =
monc->sub_got("osdmap", osdmap->get_epoch());
- if (!waiting_for_map.empty())
- maybe_request_map();
+ if (!waiting_for_map.empty()) {
+ int r = _maybe_request_map();
+ assert(r == 0);
+ }
}
// op pool check
lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest r=" << r << " tid=" << tid
<< " latest " << latest << dendl;
- Mutex::Locker l(objecter->client_lock);
+ RWLock::WLocker wl(objecter->rwlock);
map<ceph_tid_t, Op*>::iterator iter =
objecter->check_latest_map_ops.find(tid);
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;
- objecter->check_op_pool_dne(op);
+ objecter->_check_op_pool_dne(op);
}
-void Objecter::check_op_pool_dne(Op *op)
+int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap)
{
+ RWLock::RLocker rl(rwlock);
+
+ const map<int64_t, pg_pool_t>& pools = osdmap->get_pools();
+ map<int64_t, pg_pool_t>::const_iterator iter = pools.find(poolid);
+ if (iter == pools.end()) {
+ return -ENOENT;
+ }
+ const pg_pool_t& pg_pool = iter->second;
+ map<snapid_t, pool_snap_info_t>::const_iterator p;
+ for (p = pg_pool.snaps.begin();
+ p != pg_pool.snaps.end();
+ ++p) {
+ if (p->second.name == snap_name) {
+ *snap = p->first;
+ return 0;
+ }
+ }
+ return -ENOENT;
+}
+
+int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info)
+{
+ RWLock::RLocker rl(rwlock);
+
+ const map<int64_t, pg_pool_t>& pools = osdmap->get_pools();
+ map<int64_t, pg_pool_t>::const_iterator iter = pools.find(poolid);
+ if (iter == pools.end()) {
+ return -ENOENT;
+ }
+ const pg_pool_t& pg_pool = iter->second;
+ map<snapid_t,pool_snap_info_t>::const_iterator p = pg_pool.snaps.find(snap);
+ if (p == pg_pool.snaps.end())
+ return -ENOENT;
+ *info = p->second;
+
+ return 0;
+}
+
+int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
+{
+ RWLock::RLocker rl(rwlock);
+
+ const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
+ for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
+ p != pi->snaps.end();
+ ++p) {
+ snaps->push_back(p->first);
+ }
+ return 0;
+}
+
+void Objecter::_check_op_pool_dne(Op *op)
+{
+ assert(rwlock.is_wlocked());
+
ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
<< " current " << osdmap->get_epoch()
<< " map_dne_bound " << op->map_dne_bound
if (op->oncommit) {
op->oncommit->complete(-ENOENT);
}
- op->session_item.remove_myself();
- ops.erase(op->tid);
- delete op;
+ RWLock::WLocker wl(op->session->lock);
+ _finish_op(op);
}
} else {
_send_op_map_check(op);
void Objecter::_send_op_map_check(Op *op)
{
- assert(client_lock.is_locked());
+ assert(rwlock.is_wlocked());
// ask the monitor
if (check_latest_map_ops.count(op->tid) == 0) {
check_latest_map_ops[op->tid] = op;
}
}
-void Objecter::op_cancel_map_check(Op *op)
+void Objecter::_op_cancel_map_check(Op *op)
{
- assert(client_lock.is_locked());
+ assert(rwlock.is_wlocked());
map<ceph_tid_t, Op*>::iterator iter =
check_latest_map_ops.find(op->tid);
if (iter != check_latest_map_ops.end()) {
return;
}
- Mutex::Locker l(objecter->client_lock);
+ RWLock::WLocker wl(objecter->rwlock);
map<uint64_t, LingerOp*>::iterator iter =
objecter->check_latest_map_lingers.find(linger_id);
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;
- objecter->check_linger_pool_dne(op);
+ objecter->_check_linger_pool_dne(op);
}
-void Objecter::check_linger_pool_dne(LingerOp *op)
+void Objecter::_check_linger_pool_dne(LingerOp *op)
{
- ldout(cct, 10) << "check_linger_pool_dne linger_id " << op->linger_id
+ assert(rwlock.is_wlocked());
+
+ ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
<< " current " << osdmap->get_epoch()
<< " map_dne_bound " << op->map_dne_bound
<< dendl;
}
}
-void Objecter::linger_cancel_map_check(LingerOp *op)
+void Objecter::_linger_cancel_map_check(LingerOp *op)
{
+ assert(rwlock.is_wlocked());
+
map<uint64_t, LingerOp*>::iterator iter =
check_latest_map_lingers.find(op->linger_id);
if (iter != check_latest_map_lingers.end()) {
return;
}
- Mutex::Locker l(objecter->client_lock);
+ RWLock::WLocker wl(objecter->rwlock);
map<uint64_t, CommandOp*>::iterator iter =
objecter->check_latest_map_commands.find(tid);
if (c->map_dne_bound == 0)
c->map_dne_bound = latest;
- objecter->check_command_map_dne(c);
+ objecter->_check_command_map_dne(c);
}
-void Objecter::check_command_map_dne(CommandOp *c)
+void Objecter::_check_command_map_dne(CommandOp *c)
{
- ldout(cct, 10) << "check_command_map_dne tid " << c->tid
+ assert(rwlock.is_wlocked());
+
+ ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
<< " current " << osdmap->get_epoch()
<< " map_dne_bound " << c->map_dne_bound
<< dendl;
void Objecter::_send_command_map_check(CommandOp *c)
{
+ assert(rwlock.is_wlocked());
+
// ask the monitor
if (check_latest_map_commands.count(c->tid) == 0) {
c->get();
}
}
-void Objecter::command_cancel_map_check(CommandOp *c)
+void Objecter::_command_cancel_map_check(CommandOp *c)
{
+ assert(rwlock.is_wlocked());
+
map<uint64_t, CommandOp*>::iterator iter =
check_latest_map_commands.find(c->tid);
if (iter != check_latest_map_commands.end()) {
-Objecter::OSDSession *Objecter::get_session(int osd)
+int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
{
+ assert(rwlock.is_locked());
+
+ if (osd < 0) {
+ *session = &homeless_session;
+ return 0;
+ }
+
map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
- if (p != osd_sessions.end())
- return p->second;
+ if (p != osd_sessions.end()) {
+ OSDSession *s = p->second;
+ s->get();
+ *session = s;
+ return 0;
+ }
+ if (!lc.is_wlocked()) {
+ return -EAGAIN;
+ }
OSDSession *s = new OSDSession(osd);
osd_sessions[osd] = s;
s->con = messenger->get_connection(osdmap->get_inst(osd));
logger->inc(l_osdc_osd_session_open);
logger->inc(l_osdc_osd_sessions, osd_sessions.size());
- return s;
+ s->get();
+ *session = s;
+ return 0;
}
-void Objecter::reopen_session(OSDSession *s)
+void Objecter::put_session(Objecter::OSDSession *s)
{
+ if (s && !s->is_homeless()) {
+ s->put();
+ }
+}
+
+void Objecter::_reopen_session(OSDSession *s)
+{
+ assert(s->lock.is_locked());
+
entity_inst_t inst = osdmap->get_inst(s->osd);
ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " << inst << dendl;
if (s->con) {
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
+ s->lock.get_write();
s->ops.clear();
s->linger_ops.clear();
s->command_ops.clear();
osd_sessions.erase(s->osd);
- delete s;
+ s->lock.unlock();
+ s->put();
logger->set(l_osdc_osd_sessions, osd_sessions.size());
}
C_Objecter_GetVersion(Objecter *o, Context *c)
: objecter(o), oldest(0), newest(0), fin(c) {}
void finish(int r) {
- if (r >= 0)
- objecter->_get_latest_version(oldest, newest, fin);
- else if (r == -EAGAIN) { // try again as instructed
+ if (r >= 0) {
+ objecter->get_latest_version(oldest, newest, fin);
+ } else if (r == -EAGAIN) { // try again as instructed
objecter->wait_for_latest_osdmap(fin);
} else {
// it doesn't return any other error codes!
monc->get_version("osdmap", &c->newest, &c->oldest, c);
}
+void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
+{
+ RWLock::WLocker wl(rwlock);
+ _get_latest_version(oldest, newest, fin);
+}
+
void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
{
+ assert(rwlock.is_wlocked());
if (osdmap->get_epoch() >= newest) {
ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
if (fin)
}
ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
- wait_for_new_map(fin, newest, 0);
+ _wait_for_new_map(fin, newest, 0);
}
void Objecter::maybe_request_map()
{
+ RWLock::RLocker rl(rwlock);
+ int r;
+ do {
+ r = _maybe_request_map();
+ } while (r == -EAGAIN);
+}
+
+int Objecter::_maybe_request_map()
+{
+ assert(rwlock.is_locked());
int flag = 0;
if (osdmap_full_flag()) {
- ldout(cct, 10) << "maybe_request_map subscribing (continuous) to next osd map (FULL flag is set)" << dendl;
+ ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next osd map (FULL flag is set)" << dendl;
} else {
- ldout(cct, 10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
+ ldout(cct, 10) << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
flag = CEPH_SUBSCRIBE_ONETIME;
}
epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
- if (monc->sub_want("osdmap", epoch, flag))
+ if (monc->sub_want("osdmap", epoch, flag)) {
monc->renew_subs();
+ }
+ return 0;
}
-void Objecter::wait_for_new_map(Context *c, epoch_t epoch, int err)
+void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
{
+ assert(rwlock.is_wlocked());
waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
- maybe_request_map();
+ int r = _maybe_request_map();
+ assert(r == 0);
+}
+
+void Objecter::wait_for_new_map(Context *c, epoch_t epoch, int err)
+{
+ RWLock::WLocker wl(rwlock);
+ _wait_for_new_map(c, epoch, err);
}
void Objecter::kick_requests(OSDSession *session)
{
ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
+ RWLock::WLocker wl(rwlock);
+
+ _kick_requests(session);
+}
+
+void Objecter::_kick_requests(OSDSession *session)
+{
+ assert(rwlock.is_locked());
+
+ session->lock.get_write();
+
// resend ops
map<ceph_tid_t,Op*> resend; // resend in tid order
- for (xlist<Op*>::iterator p = session->ops.begin(); !p.end();) {
- Op *op = *p;
+ for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin(); p != session->ops.end();) {
+ Op *op = p->second;
++p;
logger->inc(l_osdc_op_resend);
if (op->should_resend) {
if (!op->target.paused)
resend[op->tid] = op;
} else {
- cancel_linger_op(op);
+ _cancel_linger_op(op);
}
}
+
while (!resend.empty()) {
- send_op(resend.begin()->second);
+ _send_op(resend.begin()->second);
resend.erase(resend.begin());
}
// resend lingers
map<uint64_t, LingerOp*> lresend; // resend in order
- for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j) {
+ for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin(); j != session->linger_ops.end(); ++j) {
logger->inc(l_osdc_linger_resend);
- lresend[(*j)->linger_id] = *j;
- }
- while (!lresend.empty()) {
- send_linger(lresend.begin()->second);
- lresend.erase(lresend.begin());
+ lresend[j->first] = j->second;
}
// resend commands
map<uint64_t,CommandOp*> cresend; // resend in order
- for (xlist<CommandOp*>::iterator k = session->command_ops.begin(); !k.end(); ++k) {
+ for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin(); k != session->command_ops.end(); ++k) {
logger->inc(l_osdc_command_resend);
- cresend[(*k)->tid] = *k;
+ cresend[k->first] = k->second;
}
while (!cresend.empty()) {
_send_command(cresend.begin()->second);
cresend.erase(cresend.begin());
}
+ session->lock.unlock();
+
+ while (!lresend.empty()) {
+ _send_linger(lresend.begin()->second);
+ lresend.erase(lresend.begin());
+ }
}
void Objecter::schedule_tick()
void Objecter::tick()
{
+ if (!initialized.read())
+ return;
+
+ assert(rwlock.is_locked());
+
ldout(cct, 10) << "tick" << dendl;
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
// we are only called by C_Tick
assert(tick_event);
set<OSDSession*> toping;
+ int r = 0;
+
// look for laggy requests
utime_t cutoff = ceph_clock_now(cct);
cutoff -= cct->_conf->objecter_timeout; // timeout
- unsigned laggy_ops = 0;
- for (map<ceph_tid_t,Op*>::iterator p = ops.begin();
- p != ops.end();
- ++p) {
- Op *op = p->second;
- if (op->session && op->stamp < cutoff) {
- ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd << " is laggy" << dendl;
- toping.insert(op->session);
- ++laggy_ops;
- }
- }
- for (map<uint64_t,LingerOp*>::iterator p = linger_ops.begin();
- p != linger_ops.end();
- ++p) {
- LingerOp *op = p->second;
- if (op->session) {
- ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
- toping.insert(op->session);
- } else {
- ldout(cct, 10) << " lingering tid " << p->first << " does not have session" << dendl;
+ unsigned laggy_ops;
+
+ do {
+ laggy_ops = 0;
+ for (map<int,OSDSession*>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
+ p != s->ops.end();
+ ++p) {
+ Op *op = p->second;
+ assert(op->session);
+ if (op->stamp < cutoff) {
+ ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd << " is laggy" << dendl;
+ toping.insert(op->session);
+ ++laggy_ops;
+ }
+ }
+ for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
+ p != s->linger_ops.end();
+ ++p) {
+ LingerOp *op = p->second;
+ assert(op->session);
+ ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
+ toping.insert(op->session);
+ }
+ for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
+ p != s->command_ops.end();
+ ++p) {
+ CommandOp *op = p->second;
+ assert(op->session);
+ ldout(cct, 10) << " pinging osd that serves command tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
+ toping.insert(op->session);
+ }
}
- }
- for (map<uint64_t,CommandOp*>::iterator p = command_ops.begin();
- p != command_ops.end();
- ++p) {
- CommandOp *op = p->second;
- if (op->session) {
- ldout(cct, 10) << " pinging osd that serves command tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
- toping.insert(op->session);
- } else {
- ldout(cct, 10) << " command tid " << p->first << " does not have session" << dendl;
+ if (num_homeless_ops.read() || !toping.empty()) {
+ r = _maybe_request_map();
+ if (r == -EAGAIN) {
+ toping.clear();
+ }
}
- }
+ } while (r == -EAGAIN);
+
logger->set(l_osdc_op_laggy, laggy_ops);
logger->set(l_osdc_osd_laggy, toping.size());
- if (num_homeless_ops || !toping.empty())
- maybe_request_map();
-
if (!toping.empty()) {
// send a ping to these osds, to ensure we detect any session resets
// (osd reply message policy is lossy)
- for (set<OSDSession*>::iterator i = toping.begin();
+ for (set<OSDSession*>::const_iterator i = toping.begin();
i != toping.end();
++i) {
(*i)->con->send_message(new MPing);
}
}
-
+
// reschedule
schedule_tick();
}
void Objecter::resend_mon_ops()
{
- assert(client_lock.is_locked());
+ RWLock::WLocker wl(rwlock);
+
ldout(cct, 10) << "resend_mon_ops" << dendl;
for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin(); p!=poolstat_ops.end(); ++p) {
- poolstat_submit(p->second);
+ _poolstat_submit(p->second);
logger->inc(l_osdc_poolstat_resend);
}
for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin(); p!=statfs_ops.end(); ++p) {
- fs_stats_submit(p->second);
+ _fs_stats_submit(p->second);
logger->inc(l_osdc_statfs_resend);
}
C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op),
objecter(objecter) {}
void finish(int r) {
- // note that objecter lock == timer lock, and is already held
- objecter->op_cancel(op->tid, -ETIMEDOUT);
+ objecter->op_cancel(op->session, op->tid, -ETIMEDOUT);
}
};
ceph_tid_t Objecter::op_submit(Op *op)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ RWLock::RLocker rl(rwlock);
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
+ return _op_submit_with_budget(op, lc);
+}
+
+ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc)
+{
+ assert(initialized.read());
assert(op->ops.size() == op->out_bl.size());
assert(op->ops.size() == op->out_rval.size());
// throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks.
- take_op_budget(op);
+ _take_op_budget(op);
- return _op_submit(op);
+ return _op_submit(op, lc);
}
-ceph_tid_t Objecter::_op_submit(Op *op)
+ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
{
- // pick tid if we haven't got one yet
- if (op->tid == ceph_tid_t(0)) {
- ceph_tid_t mytid = ++last_tid;
- op->tid = mytid;
- }
- assert(client_inc >= 0);
+ assert(rwlock.is_locked());
+
+ ldout(cct, 10) << __func__ << " op " << op << dendl;
// pick target
- num_homeless_ops++; // initially; recalc_op_target() will decrement if it finds a target
- int r = recalc_op_target(op);
- bool check_for_latest_map = (r == RECALC_OP_TARGET_POOL_DNE);
+ int r;
+ assert(op->session == NULL);
+ OSDSession *s = NULL;
+
+ bool check_for_latest_map;
+
+ while (true) {
+ r = _calc_target(&op->target);
+ check_for_latest_map = (r == RECALC_OP_TARGET_POOL_DNE);
+ if (_get_session(op->target.osd, &s, lc) == -EAGAIN ||
+ check_for_latest_map) {
+ lc.promote();
+ continue;
+ }
+ break;
+ }
+ assert(s); // may be homeless
+
+ inflight_ops.inc();
// add to gather set(s)
if (op->onack) {
- ++num_unacked;
+ num_unacked.inc();
} else {
ldout(cct, 20) << " note: not requesting ack" << dendl;
}
if (op->oncommit) {
- ++num_uncommitted;
+ num_uncommitted.inc();
} else {
ldout(cct, 20) << " note: not requesting commit" << dendl;
}
- ops[op->tid] = op;
-
- logger->set(l_osdc_op_active, ops.size());
+ logger->inc(l_osdc_op_active);
logger->inc(l_osdc_op);
+
if ((op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
logger->inc(l_osdc_op_rmw);
else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
}
// send?
- ldout(cct, 10) << "op_submit oid " << op->target.base_oid
+ ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
<< " " << op->target.base_oloc << " " << op->target.target_oloc
<< " " << op->ops << " tid " << op->tid
- << " osd." << (op->session ? op->session->osd : -1)
+ << " osd." << (!op->session->is_homeless() ? op->session->osd : -1)
<< dendl;
assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
+ bool need_send = false;
+
if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
- ldout(cct, 10) << " paused modify " << op << " tid " << last_tid << dendl;
+ ldout(cct, 10) << " paused modify " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
- maybe_request_map();
+ _maybe_request_map();
} else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
- ldout(cct, 10) << " paused read " << op << " tid " << last_tid << dendl;
+ ldout(cct, 10) << " paused read " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
- maybe_request_map();
+ _maybe_request_map();
} else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && osdmap_full_flag()) {
- ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid << dendl;
+ ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
- maybe_request_map();
- } else if (op->session) {
- send_op(op);
+ _maybe_request_map();
+ } else if (!s->is_homeless()) {
+ need_send = true;
} else {
- maybe_request_map();
+ _maybe_request_map();
+ }
+
+ MOSDOp *m = _prepare_osd_op(op);
+
+ s->lock.get_write();
+ if (op->tid == 0)
+ op->tid = last_tid.inc();
+ _session_op_assign(op, s);
+
+ if (need_send) {
+ _send_op(op, m);
}
+ s->lock.unlock();
if (check_for_latest_map) {
_send_op_map_check(op);
}
- ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
-
+ ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
+
return op->tid;
}
-int Objecter::op_cancel(ceph_tid_t tid, int r)
+int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
- map<ceph_tid_t, Op*>::iterator p = ops.find(tid);
- if (p == ops.end()) {
+ s->lock.get_write();
+
+ map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
+ if (p == s->ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
}
op->oncommit->complete(r);
op->oncommit = NULL;
}
- op_cancel_map_check(op);
- finish_op(op);
+ _op_cancel_map_check(op);
+ _finish_op(op);
+ s->lock.unlock();
+
return 0;
}
+int Objecter::op_cancel(ceph_tid_t tid, int r)
+{
+ int ret = 0;
+
+ rwlock.get_write();
+
+start:
+
+ for (map<int, OSDSession *>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ s->lock.get_read();
+ if (s->ops.find(tid) != s->ops.end()) {
+ s->lock.unlock();
+ ret = op_cancel(s, tid, r);
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ }
+ rwlock.unlock();
+ return ret;
+ }
+ s->lock.unlock();
+ }
+
+ if (homeless_session.ops.find(tid) != homeless_session.ops.end()) {
+ ret = op_cancel(&homeless_session, tid, r);
+ }
+ rwlock.unlock();
+
+ return ret;
+}
+
bool Objecter::is_pg_changed(
int oldprimary,
const vector<int>& oldacting,
return p->raw_hash_to_pg(p->hash_key(key, ns));
}
-int Objecter::calc_target(op_target_t *t)
+int Objecter::_calc_target(op_target_t *t)
{
+ assert(rwlock.is_locked());
+
bool is_read = t->flags & CEPH_OSD_FLAG_READ;
bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
} else {
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
pgid);
- if (ret == -ENOENT)
+ if (ret == -ENOENT) {
+ t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
+ }
}
int primary;
vector<int> acting;
t->pgid = pgid;
t->acting = acting;
t->primary = primary;
- ldout(cct, 10) << __func__ << " pgid " << pgid
- << " acting " << acting << dendl;
+ ldout(cct, 10) << __func__ << " "
+ << " pgid " << pgid << " acting " << acting << dendl;
t->used_replica = false;
if (primary == -1) {
t->osd = -1;
return RECALC_OP_TARGET_NO_ACTION;
}
-int Objecter::recalc_op_target(Op *op)
+int Objecter::_map_session(op_target_t *target, OSDSession **s,
+ RWLock::Context& lc)
{
- int r = calc_target(&op->target);
- if (r == RECALC_OP_TARGET_NEED_RESEND) {
- OSDSession *s = NULL;
- if (op->target.osd >= 0)
- s = get_session(op->target.osd);
- if (op->session != s) {
- if (!op->session)
- num_homeless_ops--;
- op->session_item.remove_myself();
- op->session = s;
- if (s)
- s->ops.push_back(&op->session_item);
- else
- num_homeless_ops++;
+ int r = _calc_target(target);
+ if (r < 0) {
+ return r;
+ }
+ return _get_session(target->osd, s, lc);
+}
+
+void Objecter::_session_op_assign(Op *op, OSDSession *to)
+{
+ assert(rwlock.is_locked());
+ assert(to->lock.is_locked());
+ assert(!op->session);
+ assert(op->tid);
+
+ op->session = to;
+ to->ops[op->tid] = op;
+
+ if (to->is_homeless()) {
+ num_homeless_ops.inc();
+ }
+}
+
+void Objecter::_session_op_remove(Op *op)
+{
+ assert(rwlock.is_locked());
+ OSDSession *s = op->session;
+ assert(s);
+ assert(s->lock.is_locked());
+
+ if (s->is_homeless()) {
+ num_homeless_ops.dec();
+ }
+ s->ops.erase(op->tid);
+ op->session = NULL;
+}
+
+void Objecter::_session_linger_op_remove(LingerOp *info)
+{
+ assert(rwlock.is_locked());
+ OSDSession *s = info->session;
+ assert(s);
+ assert(s->lock.is_locked());
+
+ s->linger_ops.erase(info->linger_id);
+ info->session = NULL;
+}
+
+int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession)
+{
+ int r;
+ do {
+ r = _get_session(osd, psession, lc);
+ if (r == -EAGAIN) {
+ assert(!lc.is_wlocked());
+
+ if (!_promote_lock_check_race(lc)) {
+ return r;
+ }
}
+ } while (r == -EAGAIN);
+ assert(r == 0);
+
+ return 0;
+}
+
+int Objecter::_get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession)
+{
+ return _get_osd_session(op->target.osd, lc, psession);
+}
+
+void Objecter::_session_op_validate(Op *op, RWLock::Context& lc, bool session_locked)
+{
+ assert(rwlock.is_locked());
+
+ if (op->session && op->session->osd != op->target.osd) {
+ OSDSession *orig_session = op->session;
+ _session_op_remove(op);
+ put_session(orig_session);
+ }
+}
+
+int Objecter::_recalc_op_target(Op *op, RWLock::Context& lc,
+ bool src_session_locked)
+{
+ assert(rwlock.is_locked());
+
+ int r = _calc_target(&op->target);
+ if (r == RECALC_OP_TARGET_NEED_RESEND) {
+ _session_op_validate(op, lc, src_session_locked);
}
return r;
}
-bool Objecter::recalc_linger_op_target(LingerOp *linger_op)
+bool Objecter::_promote_lock_check_race(RWLock::Context& lc)
+{
+ epoch_t epoch = osdmap->get_epoch();
+ lc.promote();
+ return (epoch == osdmap->get_epoch());
+}
+
+int Objecter::_recalc_linger_op_target(LingerOp *linger_op, RWLock::Context& lc)
{
- int r = calc_target(&linger_op->target);
+ assert(rwlock.is_locked());
+
+ int r = _calc_target(&linger_op->target);
if (r == RECALC_OP_TARGET_NEED_RESEND) {
ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
<< " pgid " << linger_op->target.pgid
<< " acting " << linger_op->target.acting << dendl;
- OSDSession *s = linger_op->target.osd != -1 ?
- get_session(linger_op->target.osd) : NULL;
+ OSDSession *s;
+ r = _get_osd_session(linger_op->target.osd, lc, &s);
+ if (r < 0) {
+ return r;
+ }
+
+ s->lock.get_write();
+
if (linger_op->session != s) {
- linger_op->session_item.remove_myself();
linger_op->session = s;
- if (s)
- s->linger_ops.push_back(&linger_op->session_item);
+ s->linger_ops[linger_op->register_tid] = linger_op;
}
+
+ s->lock.unlock();
+ put_session(s);
+ return RECALC_OP_TARGET_NEED_RESEND;
}
return r;
}
-void Objecter::cancel_linger_op(Op *op)
+void Objecter::_cancel_linger_op(Op *op)
{
ldout(cct, 15) << "cancel_op " << op->tid << dendl;
delete op->onack;
delete op->oncommit;
- finish_op(op);
+ _finish_op(op);
}
-void Objecter::finish_op(Op *op)
+void Objecter::_finish_op(Op *op)
{
ldout(cct, 15) << "finish_op " << op->tid << dendl;
- op->session_item.remove_myself();
if (op->budgeted)
put_op_budget(op);
- ops.erase(op->tid);
- logger->set(l_osdc_op_active, ops.size());
+ assert(op->session->lock.is_wlocked());
+
+ op->session->ops.erase(op->tid);
+
+ logger->dec(l_osdc_op_active);
+
assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
- if (op->ontimeout)
+ if (op->ontimeout) {
timer.cancel_event(op->ontimeout);
+ }
+
+ op->session->put();
+
+ inflight_ops.dec();
delete op;
}
-void Objecter::send_op(Op *op)
+void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
{
- ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl;
+ ldout(cct, 15) << "finish_op " << tid << dendl;
+ RWLock::RLocker rl(rwlock);
+
+ RWLock::WLocker wl(session->lock);
+
+ map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
+ if (iter == session->ops.end())
+ return;
+
+ Op *op = iter->second;
+
+ _finish_op(op);
+}
+
+MOSDOp *Objecter::_prepare_osd_op(Op *op)
+{
+ assert(rwlock.is_locked());
int flags = op->target.flags;
flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
if (op->onack)
flags |= CEPH_OSD_FLAG_ACK;
- assert(op->session->con);
-
- // preallocated rx buffer?
- if (op->con) {
- ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
- op->con->revoke_rx_buffer(op->tid);
- }
- if (op->outbl && op->outbl->length()) {
- ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl;
- op->con = op->session->con;
- op->con->post_rx_buffer(op->tid, *op->outbl);
- }
-
op->target.paused = false;
- op->incarnation = op->session->incarnation;
op->stamp = ceph_clock_now(cct);
- MOSDOp *m = new MOSDOp(client_inc, op->tid,
+ MOSDOp *m = new MOSDOp(client_inc.read(), op->tid,
op->target.target_oid, op->target.target_oloc,
op->target.pgid,
osdmap->get_epoch(),
logger->inc(l_osdc_op_send);
logger->inc(l_osdc_op_send_bytes, m->get_data().length());
+ return m;
+}
+
+void Objecter::_send_op(Op *op, MOSDOp *m)
+{
+ assert(rwlock.is_locked());
+ assert(op->session->lock.is_locked());
+
+ if (!m) {
+ assert(op->tid > 0);
+ m = _prepare_osd_op(op);
+ }
+
+ ldout(cct, 15) << "_send_op " << op->tid << " to osd." << op->session->osd << dendl;
+
+ ConnectionRef con = op->session->con;
+ assert(con);
+
+ // preallocated rx buffer?
+ if (op->con) {
+ ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
+ op->con->revoke_rx_buffer(op->tid);
+ }
+ if (op->outbl && op->outbl->length()) {
+ ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con << dendl;
+ op->con = con;
+ op->con->post_rx_buffer(op->tid, *op->outbl);
+ }
+
+ op->incarnation = op->session->incarnation;
+
+ m->set_tid(op->tid);
+
op->session->con->send_message(m);
}
return op_budget;
}
-void Objecter::throttle_op(Op *op, int op_budget)
+void Objecter::_throttle_op(Op *op, int op_budget)
{
+ assert(rwlock.is_locked());
+
+ bool locked_for_write = rwlock.is_wlocked();
+
if (!op_budget)
op_budget = calc_op_budget(op);
if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
- client_lock.Unlock();
+ rwlock.unlock();
op_throttle_bytes.get(op_budget);
- client_lock.Lock();
+ rwlock.get(locked_for_write);
}
if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
- client_lock.Unlock();
+ rwlock.unlock();
op_throttle_ops.get(1);
- client_lock.Lock();
+ rwlock.get(locked_for_write);
}
}
void Objecter::unregister_op(Op *op)
{
- if (op->onack)
- num_unacked--;
- if (op->oncommit)
- num_uncommitted--;
- ops.erase(op->tid);
+ op->session->lock.get_write();
+ op->session->ops.erase(op->tid);
+ op->session->lock.unlock();
+ op->session->put();
+ op->session = NULL;
+
+ inflight_ops.dec();
}
/* This function DOES put the passed message before returning */
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
// get pio
ceph_tid_t tid = m->get_tid();
- if (ops.count(tid) == 0) {
+ int osd_num = (int)m->get_source().num();
+
+ RWLock::RLocker l(rwlock);
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
+
+ map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
+ if (siter == osd_sessions.end()) {
+ ldout(cct, 7) << "handle_osd_op_reply " << tid
+ << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ?
+ " onnvram":" ack"))
+ << " ... unknown osd" << dendl;
+ m->put();
+ return;
+ }
+
+ OSDSession *s = siter->second;
+ s->get();
+
+ s->lock.get_write();
+
+ map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
+ if (iter == s->ops.end()) {
ldout(cct, 7) << "handle_osd_op_reply " << tid
<< (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
<< " ... stray" << dendl;
+ s->lock.unlock();
+ s->put();
m->put();
return;
}
<< " in " << m->get_pg()
<< " attempt " << m->get_retry_attempt()
<< dendl;
- Op *op = ops[tid];
+ Op *op = iter->second;
if (m->get_retry_attempt() >= 0) {
if (m->get_retry_attempt() != (op->attempts - 1)) {
<< "; last attempt " << (op->attempts - 1) << " sent to "
<< op->session->con->get_peer_addr() << dendl;
m->put();
+ s->lock.unlock();
+ s->put();
return;
}
} else {
if (m->is_redirect_reply()) {
ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
- unregister_op(op);
+ if (op->onack)
+ num_unacked.dec();
+ if (op->oncommit)
+ num_uncommitted.dec();
+ _session_op_remove(op);
+ s->lock.unlock();
+ s->put();
+
+ // FIXME: two redirects could race and reorder
+
+ op->tid = 0;
m->get_redirect().combine_with_locator(op->target.target_oloc,
op->target.target_oid.name);
op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
- _op_submit(op);
+ _op_submit(op, lc);
m->put();
return;
}
if (rc == -EAGAIN) {
ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
- unregister_op(op);
- _op_submit(op);
+
+ // new tid
+ s->ops.erase(op->tid);
+ op->tid = last_tid.inc();
+
+ _send_op(op);
+ s->lock.unlock();
m->put();
return;
}
+ l.unlock();
+ lc.set_state(RWLock::Context::Untaken);
+
if (op->objver)
*op->objver = m->get_user_version();
if (op->reply_epoch)
op->replay_version = m->get_replay_version();
onack = op->onack;
op->onack = 0; // only do callback once
- num_unacked--;
+ num_unacked.dec();
logger->inc(l_osdc_op_ack);
}
if (op->oncommit && (m->is_ondisk() || rc)) {
ldout(cct, 15) << "handle_osd_op_reply safe" << dendl;
oncommit = op->oncommit;
op->oncommit = 0;
- num_uncommitted--;
+ num_uncommitted.dec();
logger->inc(l_osdc_op_commit);
}
// done with this tid?
if (!op->onack && !op->oncommit) {
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
- finish_op(op);
+ _finish_op(op);
}
-
- ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
+
+ ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
+
+ // serialize completions
+ s->completion_lock.Lock();
+ s->lock.unlock();
// do callbacks
if (onack) {
if (oncommit) {
oncommit->complete(rc);
}
+ s->completion_lock.Unlock();
m->put();
+ s->put();
}
uint32_t Objecter::list_objects_seek(ListContext *list_context,
uint32_t pos)
{
- assert(client_lock.is_locked());
+ RWLock::RLocker rl(rwlock);
pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
ldout(cct, 10) << "list_objects_seek " << list_context
<< " pos " << pos << " -> " << actual << dendl;
void Objecter::list_objects(ListContext *list_context, Context *onfinish)
{
- assert(client_lock.is_locked());
ldout(cct, 10) << "list_objects" << dendl;
ldout(cct, 20) << " pool_id " << list_context->pool_id
<< " pool_snap_seq " << list_context->pool_snap_seq
return;
}
+ rwlock.get_read();
const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
int pg_num = pool->get_pg_num();
+ rwlock.unlock();
if (list_context->starting_pg_num == 0) { // there can't be zero pgs!
list_context->starting_pg_num = pg_num;
list_context->bl.clear();
C_List *onack = new C_List(list_context, onfinish, this);
object_locator_t oloc(list_context->pool_id, list_context->nspace);
+
pg_read(list_context->current_pg, oloc, op,
&list_context->bl, 0, onack, &onack->epoch);
}
PoolOp *op = new PoolOp;
if (!op)
return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = pool;
op->name = snap_name;
op->onfinish = onfinish;
ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = pool;
C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
op->onfinish = fin;
PoolOp *op = new PoolOp;
if (!op)
return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = pool;
op->name = snap_name;
op->onfinish = onfinish;
<< snap << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = pool;
op->onfinish = onfinish;
op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
PoolOp *op = new PoolOp;
if (!op)
return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = 0;
op->name = name;
op->onfinish = onfinish;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = pool;
op->name = "delete";
op->onfinish = onfinish;
ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
PoolOp *op = new PoolOp;
if (!op) return -ENOMEM;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pool = pool;
op->name = "change_pool_auid";
op->onfinish = onfinish;
C_CancelPoolOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
objecter(objecter) {}
void finish(int r) {
- // note that objecter lock == timer lock, and is already held
objecter->pool_op_cancel(tid, -ETIMEDOUT);
}
};
void Objecter::pool_op_submit(PoolOp *op)
{
+ RWLock::WLocker wl(rwlock);
+
if (mon_timeout > 0) {
op->ontimeout = new C_CancelPoolOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
void Objecter::_pool_op_submit(PoolOp *op)
{
+ assert(rwlock.is_wlocked());
+
ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
op->name, op->pool_op,
*/
void Objecter::handle_pool_op_reply(MPoolOpReply *m)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ rwlock.get_read();
+
ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
ceph_tid_t tid = m->get_tid();
- if (pool_ops.count(tid)) {
- PoolOp *op = pool_ops[tid];
+ map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
+ if (iter != pool_ops.end()) {
+ PoolOp *op = iter->second;
ldout(cct, 10) << "have request " << tid << " at " << op << " Op: " << ceph_pool_op_name(op->pool_op) << dendl;
if (op->blp)
op->blp->claim(m->response_data);
if (m->version > last_seen_osdmap_version)
last_seen_osdmap_version = m->version;
if (osdmap->get_epoch() < m->epoch) {
- ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl;
- wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
+ rwlock.unlock();
+ rwlock.get_write();
+ if (osdmap->get_epoch() < m->epoch) {
+ ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl;
+ _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
+ }
}
else {
op->onfinish->complete(m->replyCode);
}
op->onfinish = NULL;
- finish_pool_op(op);
+ if (!rwlock.is_wlocked()) {
+ rwlock.unlock();
+ rwlock.get_write();
+ }
+ iter = pool_ops.find(tid);
+ if (iter != pool_ops.end()) {
+ _finish_pool_op(op);
+ }
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
+ rwlock.unlock();
+
ldout(cct, 10) << "done" << dendl;
m->put();
}
int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ RWLock::WLocker wl(rwlock);
map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
if (it == pool_ops.end()) {
PoolOp *op = it->second;
if (op->onfinish)
op->onfinish->complete(r);
- finish_pool_op(op);
+
+ _finish_pool_op(op);
return 0;
}
-void Objecter::finish_pool_op(PoolOp *op)
+void Objecter::_finish_pool_op(PoolOp *op)
{
+ assert(rwlock.is_wlocked());
pool_ops.erase(op->tid);
logger->set(l_osdc_poolop_active, pool_ops.size());
- if (op->ontimeout)
+ if (op->ontimeout) {
timer.cancel_event(op->ontimeout);
+ }
delete op;
}
ldout(cct, 10) << "get_pool_stats " << pools << dendl;
PoolStatOp *op = new PoolStatOp;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->pools = pools;
op->pool_stats = result;
op->onfinish = onfinish;
op->ontimeout = new C_CancelPoolStatOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
+
+ RWLock::WLocker wl(rwlock);
+
poolstat_ops[op->tid] = op;
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
- poolstat_submit(op);
+ _poolstat_submit(op);
}
-void Objecter::poolstat_submit(PoolStatOp *op)
+void Objecter::_poolstat_submit(PoolStatOp *op)
{
- ldout(cct, 10) << "poolstat_submit " << op->tid << dendl;
+ ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid, op->pools, last_seen_pgmap_version));
op->last_submit = ceph_clock_now(cct);
void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
ceph_tid_t tid = m->get_tid();
- if (poolstat_ops.count(tid)) {
+ RWLock::WLocker wl(rwlock);
+ map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
+ if (iter != poolstat_ops.end()) {
PoolStatOp *op = poolstat_ops[tid];
ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
*op->pool_stats = m->pool_stats;
- if (m->version > last_seen_pgmap_version)
+ if (m->version > last_seen_pgmap_version) {
last_seen_pgmap_version = m->version;
+ }
op->onfinish->complete(0);
- finish_pool_stat_op(op);
+ _finish_pool_stat_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ RWLock::WLocker wl(rwlock);
map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
if (it == poolstat_ops.end()) {
PoolStatOp *op = it->second;
if (op->onfinish)
op->onfinish->complete(r);
- finish_pool_stat_op(op);
+ _finish_pool_stat_op(op);
return 0;
}
-void Objecter::finish_pool_stat_op(PoolStatOp *op)
+void Objecter::_finish_pool_stat_op(PoolStatOp *op)
{
+ assert(rwlock.is_wlocked());
+
poolstat_ops.erase(op->tid);
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
- if (op->ontimeout)
+ if (op->ontimeout) {
timer.cancel_event(op->ontimeout);
+ }
delete op;
}
C_CancelStatfsOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
objecter(objecter) {}
void finish(int r) {
- // note that objecter lock == timer lock, and is already held
objecter->statfs_op_cancel(tid, -ETIMEDOUT);
}
};
void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
{
ldout(cct, 10) << "get_fs_stats" << dendl;
+ RWLock::WLocker l(rwlock);
StatfsOp *op = new StatfsOp;
- op->tid = ++last_tid;
+ op->tid = last_tid.inc();
op->stats = &result;
op->onfinish = onfinish;
op->ontimeout = NULL;
logger->set(l_osdc_statfs_active, statfs_ops.size());
- fs_stats_submit(op);
+ _fs_stats_submit(op);
}
-void Objecter::fs_stats_submit(StatfsOp *op)
+void Objecter::_fs_stats_submit(StatfsOp *op)
{
+ assert(rwlock.is_wlocked());
+
ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid, last_seen_pgmap_version));
op->last_submit = ceph_clock_now(cct);
void Objecter::handle_fs_stats_reply(MStatfsReply *m)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ RWLock::WLocker wl(rwlock);
+
ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
ceph_tid_t tid = m->get_tid();
if (m->h.version > last_seen_pgmap_version)
last_seen_pgmap_version = m->h.version;
op->onfinish->complete(0);
- finish_statfs_op(op);
+ _finish_statfs_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ RWLock::WLocker wl(rwlock);
map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
if (it == statfs_ops.end()) {
StatfsOp *op = it->second;
if (op->onfinish)
op->onfinish->complete(r);
- finish_statfs_op(op);
+ _finish_statfs_op(op);
return 0;
}
-void Objecter::finish_statfs_op(StatfsOp *op)
+void Objecter::_finish_statfs_op(StatfsOp *op)
{
+ assert(rwlock.is_wlocked());
+
statfs_ops.erase(op->tid);
logger->set(l_osdc_statfs_active, statfs_ops.size());
- if (op->ontimeout)
+ if (op->ontimeout) {
timer.cancel_event(op->ontimeout);
+ }
delete op;
}
int osd = osdmap->identify_osd(con->get_peer_addr());
if (osd >= 0) {
ldout(cct, 1) << "ms_handle_reset on osd." << osd << dendl;
+ rwlock.get_read();
map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
if (p != osd_sessions.end()) {
OSDSession *session = p->second;
- reopen_session(session);
- kick_requests(session);
+ session->lock.get_write();
+ _reopen_session(session);
+ _kick_requests(session);
+ session->lock.unlock();
+ rwlock.unlock();
maybe_request_map();
+ } else {
+ rwlock.unlock();
}
} else {
ldout(cct, 10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl;
f->dump_int("precalc_pgid", (int)precalc_pgid);
}
-void Objecter::dump_active()
+void Objecter::_dump_active(OSDSession *s)
{
- ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless" << dendl;
- for (map<ceph_tid_t,Op*>::iterator p = ops.begin(); p != ops.end(); ++p) {
+ for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin(); p != s->ops.end(); ++p) {
Op *op = p->second;
ldout(cct, 20) << op->tid << "\t" << op->target.pgid
<< "\tosd." << (op->session ? op->session->osd : -1)
}
}
-void Objecter::dump_requests(Formatter *fmt) const
+void Objecter::_dump_active()
+{
+ ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless" << dendl;
+ for (map<int, OSDSession *>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ s->lock.get_read();
+ _dump_active(s);
+ s->lock.unlock();
+ }
+ _dump_active(&homeless_session);
+}
+
+void Objecter::dump_active()
{
- assert(client_lock.is_locked());
+ rwlock.get_read();
+ _dump_active();
+ rwlock.unlock();
+}
+void Objecter::dump_requests(Formatter *fmt)
+{
fmt->open_object_section("requests");
dump_ops(fmt);
dump_linger_ops(fmt);
fmt->close_section(); // requests object
}
-void Objecter::dump_ops(Formatter *fmt) const
+void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
{
- fmt->open_array_section("ops");
- for (map<ceph_tid_t,Op*>::const_iterator p = ops.begin();
- p != ops.end();
+ for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
+ p != s->ops.end();
++p) {
Op *op = p->second;
fmt->open_object_section("op");
fmt->close_section(); // op object
}
+}
+
+void Objecter::dump_ops(Formatter *fmt)
+{
+ fmt->open_array_section("ops");
+ rwlock.get_read();
+ for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ s->lock.get_read();
+ _dump_ops(s, fmt);
+ s->lock.unlock();
+ }
+ rwlock.unlock();
+ _dump_ops(&homeless_session, fmt);
fmt->close_section(); // ops array
}
-void Objecter::dump_linger_ops(Formatter *fmt) const
+void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
{
- fmt->open_array_section("linger_ops");
- for (map<uint64_t, LingerOp*>::const_iterator p = linger_ops.begin();
- p != linger_ops.end();
+ for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
+ p != s->linger_ops.end();
++p) {
LingerOp *op = p->second;
fmt->open_object_section("linger_op");
fmt->dump_stream("registered") << op->registered;
fmt->close_section(); // linger_op object
}
+}
+
+void Objecter::dump_linger_ops(Formatter *fmt)
+{
+ fmt->open_array_section("linger_ops");
+ rwlock.get_read();
+ for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ s->lock.get_read();
+ _dump_linger_ops(s, fmt);
+ s->lock.unlock();
+ }
+ rwlock.unlock();
+ _dump_linger_ops(&homeless_session, fmt);
fmt->close_section(); // linger_ops array
}
-void Objecter::dump_command_ops(Formatter *fmt) const
+void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
{
- fmt->open_array_section("command_ops");
- for (map<uint64_t, CommandOp*>::const_iterator p = command_ops.begin();
- p != command_ops.end();
+ for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
+ p != s->command_ops.end();
++p) {
CommandOp *op = p->second;
fmt->open_object_section("command_op");
fmt->dump_stream("target_pg") << op->target_pg;
fmt->close_section(); // command_op object
}
+}
+
+void Objecter::dump_command_ops(Formatter *fmt)
+{
+ fmt->open_array_section("command_ops");
+ rwlock.get_read();
+ for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ s->lock.get_read();
+ _dump_command_ops(s, fmt);
+ s->lock.unlock();
+ }
+ rwlock.unlock();
+ _dump_command_ops(&homeless_session, fmt);
fmt->close_section(); // command_ops array
}
Formatter *f = new_formatter(format);
if (!f)
f = new_formatter("json-pretty");
- m_objecter->client_lock.Lock();
+ RWLock::RLocker rl(m_objecter->rwlock);
m_objecter->dump_requests(f);
- m_objecter->client_lock.Unlock();
f->flush(out);
delete f;
return true;
void Objecter::handle_command_reply(MCommandReply *m)
{
- map<ceph_tid_t,CommandOp*>::iterator p = command_ops.find(m->get_tid());
- if (p == command_ops.end()) {
+ int osd_num = (int)m->get_source().num();
+
+ RWLock::WLocker wl(rwlock);
+
+ map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
+ if (siter == osd_sessions.end()) {
+ ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " osd not found" << dendl;
+ m->put();
+ return;
+ }
+
+ OSDSession *s = siter->second;
+
+ s->lock.get_read();
+ map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
+ if (p == s->command_ops.end()) {
ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " not found" << dendl;
m->put();
+ s->lock.unlock();
return;
}
ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " got reply from wrong connection "
<< m->get_connection() << " " << m->get_source_inst() << dendl;
m->put();
+ s->lock.unlock();
return;
}
if (c->poutbl)
c->poutbl->claim(m->get_data());
+
+ s->lock.unlock();
+
+
_finish_command(c, m->r, m->rs);
m->put();
}
class C_CancelCommandOp : public Context
{
+ Objecter::OSDSession *s;
ceph_tid_t tid;
Objecter *objecter;
public:
- C_CancelCommandOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
- objecter(objecter) {}
+ C_CancelCommandOp(Objecter::OSDSession *s, ceph_tid_t tid, Objecter *objecter) : s(s), tid(tid),
+ objecter(objecter) {}
void finish(int r) {
- // note that objecter lock == timer lock, and is already held
- objecter->command_op_cancel(tid, -ETIMEDOUT);
+ objecter->command_op_cancel(s, tid, -ETIMEDOUT);
}
};
-int Objecter::_submit_command(CommandOp *c, ceph_tid_t *ptid)
+int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
{
- ceph_tid_t tid = ++last_tid;
+ RWLock::WLocker wl(rwlock);
+
+ ceph_tid_t tid = last_tid.inc();
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
+ homeless_session.command_ops[tid] = c;
+ num_homeless_ops.inc();
+ c->session = &homeless_session;
+ (void)_recalc_command_target(c);
if (osd_timeout > 0) {
- c->ontimeout = new C_CancelCommandOp(tid, this);
+ c->ontimeout = new C_CancelCommandOp(c->session, tid, this);
timer.add_event_after(osd_timeout, c->ontimeout);
}
- command_ops[tid] = c;
- num_homeless_ops++;
- (void)recalc_command_target(c);
- if (c->session)
+ if (!c->session->is_homeless()) {
_send_command(c);
- else
- maybe_request_map();
+ } else {
+ int r = _maybe_request_map();
+ assert(r != -EAGAIN); /* because rwlock is already write-locked */
+ }
if (c->map_check_error)
_send_command_map_check(c);
*ptid = tid;
- logger->set(l_osdc_command_active, command_ops.size());
+ logger->inc(l_osdc_command_active);
+
return 0;
}
-int Objecter::recalc_command_target(CommandOp *c)
+int Objecter::_recalc_command_target(CommandOp *c)
{
- OSDSession *s = NULL;
+ assert(rwlock.is_wlocked());
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
+ OSDSession *s = &homeless_session;
c->map_check_error = 0;
if (c->target_osd >= 0) {
if (!osdmap->exists(c->target_osd)) {
c->map_check_error_str = "osd down";
return RECALC_OP_TARGET_OSD_DOWN;
}
- s = get_session(c->target_osd);
+ int r = _get_session(c->target_osd, &s, lc);
+ assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
} else {
if (!osdmap->have_pg_pool(c->target_pg.pool())) {
c->map_check_error = -ENOENT;
int primary;
vector<int> acting;
osdmap->pg_to_acting_osds(c->target_pg, &acting, &primary);
- if (primary != -1)
- s = get_session(primary);
+ if (primary != -1) {
+ int r = _get_session(primary, &s, lc);
+ assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ }
}
+
if (c->session != s) {
- ldout(cct, 10) << "recalc_command_target " << c->tid << " now " << c->session << dendl;
- if (s) {
- if (!c->session)
- num_homeless_ops--;
- c->session = s;
- s->command_ops.push_back(&c->session_item);
- } else {
- c->session = NULL;
- num_homeless_ops++;
+ ldout(cct, 10) << "_recalc_command_target " << c->tid << " now " << c->session << dendl;
+ if (c->session) {
+ if (c->session->is_homeless()) {
+ num_homeless_ops.dec();
+ }
+ if (c->tid) {
+ c->session->lock.get_write();
+ c->session->command_ops.erase(c->tid);
+ c->session->lock.unlock();
+ }
+ put_session(c->session);
+ }
+ c->session = s;
+ s->lock.get_write();
+ if (c->tid) {
+ s->command_ops[c->tid] = c;
}
+ if (s->is_homeless())
+ num_homeless_ops.inc();
+ s->lock.unlock();
return RECALC_OP_TARGET_NEED_RESEND;
+ } else {
+ put_session(s);
}
- ldout(cct, 20) << "recalc_command_target " << c->tid << " no change, " << c->session << dendl;
+
+ ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl;
return RECALC_OP_TARGET_NO_ACTION;
}
logger->inc(l_osdc_command_send);
}
-int Objecter::command_op_cancel(ceph_tid_t tid, int r)
+int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
{
- assert(client_lock.is_locked());
- assert(initialized);
+ assert(initialized.read());
+
+ RWLock::WLocker wl(rwlock);
- map<ceph_tid_t, CommandOp*>::iterator it = command_ops.find(tid);
- if (it == command_ops.end()) {
+ map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
+ if (it == s->command_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
}
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
CommandOp *op = it->second;
- command_cancel_map_check(op);
+ _command_cancel_map_check(op);
_finish_command(op, -ETIMEDOUT, "");
return 0;
}
void Objecter::_finish_command(CommandOp *c, int r, string rs)
{
+ assert(rwlock.is_wlocked());
+
ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl;
- c->session_item.remove_myself();
if (c->prs)
*c->prs = rs;
if (c->onfinish)
c->onfinish->complete(r);
- command_ops.erase(c->tid);
- if (c->ontimeout)
+ c->session->lock.get_write();
+ c->session->command_ops.erase(c->tid);
+ c->session->lock.unlock();
+ if (c->ontimeout) {
timer.cancel_event(c->ontimeout);
+ }
c->put();
- logger->set(l_osdc_command_active, command_ops.size());
+ logger->dec(l_osdc_command_active);
}
#include "include/types.h"
#include "include/buffer.h"
-#include "include/xlist.h"
#include "osd/OSDMap.h"
#include "messages/MOSDOp.h"
#include "common/admin_socket.h"
#include "common/Timer.h"
+#include "common/RWLock.h"
#include "include/rados/rados_types.h"
#include "include/rados/rados_types.hpp"
CephContext *cct;
std::multimap<string,string> crush_location;
- bool initialized;
+ atomic_t initialized;
private:
- ceph_tid_t last_tid;
- int client_inc;
+ atomic64_t last_tid;
+ atomic_t inflight_ops;
+ atomic_t client_inc;
uint64_t max_linger_id;
- int num_unacked;
- int num_uncommitted;
- int global_op_flags; // flags which are applied to each IO op
+ atomic_t num_unacked;
+ atomic_t num_uncommitted;
+ atomic_t global_op_flags; // flags which are applied to each IO op
bool keep_balanced_budget;
bool honor_osdmap_full;
void maybe_request_map();
private:
+ int _maybe_request_map();
+
version_t last_seen_osdmap_version;
version_t last_seen_pgmap_version;
- Mutex &client_lock;
- SafeTimer &timer;
+ RWLock rwlock;
+ RWTimer timer;
PerfCounters *logger;
struct Op {
OSDSession *session;
- xlist<Op*>::item session_item;
int incarnation;
op_target_t target;
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, version_t *ov) :
- session(NULL), session_item(this), incarnation(0),
+ session(NULL), incarnation(0),
target(o, ol, f),
con(NULL),
snapid(CEPH_NOSNAP),
// -- osd commands --
struct CommandOp : public RefCountedObject {
- xlist<CommandOp*>::item session_item;
OSDSession *session;
ceph_tid_t tid;
vector<string> cmd;
utime_t last_submit;
CommandOp()
- : session_item(this), session(NULL),
+ : session(NULL),
tid(0), poutbl(NULL), prs(NULL), target_osd(-1),
map_dne_bound(0),
map_check_error(0),
onfinish(NULL), ontimeout(NULL) {}
};
- int _submit_command(CommandOp *c, ceph_tid_t *ptid);
- int recalc_command_target(CommandOp *c);
+ int submit_command(CommandOp *c, ceph_tid_t *ptid);
+ int _recalc_command_target(CommandOp *c);
void _send_command(CommandOp *c);
- int command_op_cancel(ceph_tid_t tid, int r);
+ int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
void _finish_command(CommandOp *c, int r, string rs);
void handle_command_reply(MCommandReply *m);
Context *on_reg_ack, *on_reg_commit;
OSDSession *session;
- xlist<LingerOp*>::item session_item;
ceph_tid_t register_tid;
epoch_t map_dne_bound;
poutbl(NULL), pobjver(NULL),
registered(false),
on_reg_ack(NULL), on_reg_commit(NULL),
- session(NULL), session_item(this),
+ session(NULL),
register_tid(0),
map_dne_bound(0) {}
};
// -- osd sessions --
- struct OSDSession {
- xlist<Op*> ops;
- xlist<LingerOp*> linger_ops;
- xlist<CommandOp*> command_ops;
+ struct OSDSession : public RefCountedObject {
+ RWLock lock;
+ Mutex completion_lock;
+
+ // pending ops
+ map<ceph_tid_t,Op*> ops;
+ map<uint64_t, LingerOp*> linger_ops;
+ map<ceph_tid_t,CommandOp*> command_ops;
+
int osd;
int incarnation;
ConnectionRef con;
- OSDSession(int o) : osd(o), incarnation(0), con(NULL) {}
+ OSDSession(int o) : lock("OSDSession"), completion_lock("OSDSession::completion_lock"), osd(o), incarnation(0), con(NULL) {}
+
+ bool is_homeless() { return (osd == -1); }
};
map<int,OSDSession*> osd_sessions;
private:
- // pending ops
- map<ceph_tid_t,Op*> ops;
- int num_homeless_ops;
map<uint64_t, LingerOp*> linger_ops;
+
map<ceph_tid_t,PoolStatOp*> poolstat_ops;
map<ceph_tid_t,StatfsOp*> statfs_ops;
map<ceph_tid_t,PoolOp*> pool_ops;
- map<ceph_tid_t,CommandOp*> command_ops;
+ atomic_t num_homeless_ops;
+
+ OSDSession homeless_session;
// ops waiting for an osdmap with a new pool or confirmation that
// the pool does not exist (may be expanded to other uses later)
double mon_timeout, osd_timeout;
- void send_op(Op *op);
- void cancel_linger_op(Op *op);
- void finish_op(Op *op);
+ MOSDOp *_prepare_osd_op(Op *op);
+ void _send_op(Op *op, MOSDOp *m = NULL);
+ void _cancel_linger_op(Op *op);
+ void finish_op(OSDSession *session, ceph_tid_t tid);
+ void _finish_op(Op *op);
static bool is_pg_changed(
int oldprimary,
const vector<int>& oldacting,
RECALC_OP_TARGET_OSD_DOWN,
};
bool osdmap_full_flag() const;
- bool target_should_be_paused(op_target_t *op);
-
- int calc_target(op_target_t *t);
- int recalc_op_target(Op *op);
- bool recalc_linger_op_target(LingerOp *op);
- void send_linger(LingerOp *info);
+ bool target_should_be_paused(op_target_t *op);
+ int _calc_target(op_target_t *t);
+ int _map_session(op_target_t *op, OSDSession **s,
+ RWLock::Context& lc);
+ void _session_op_remove(Op *op);
+ void _session_linger_op_remove(LingerOp *info);
+ void _session_op_assign(Op *op, OSDSession *s);
+ int _get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession);
+ int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked);
+ int _get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession);
+ void _session_op_validate(Op *op, RWLock::Context& lc, bool session_locked);
+ int _recalc_op_target(Op *op, RWLock::Context& lc, bool session_locked = false);
+ int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc);
+
+ void _linger_submit(LingerOp *info);
+ void _send_linger(LingerOp *info);
void _linger_ack(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);
- void check_op_pool_dne(Op *op);
+ void _check_op_pool_dne(Op *op);
void _send_op_map_check(Op *op);
- void op_cancel_map_check(Op *op);
- void check_linger_pool_dne(LingerOp *op);
+ void _op_cancel_map_check(Op *op);
+ void _check_linger_pool_dne(LingerOp *op);
void _send_linger_map_check(LingerOp *op);
- void linger_cancel_map_check(LingerOp *op);
- void check_command_map_dne(CommandOp *op);
+ void _linger_cancel_map_check(LingerOp *op);
+ void _check_command_map_dne(CommandOp *op);
void _send_command_map_check(CommandOp *op);
- void command_cancel_map_check(CommandOp *op);
+ void _command_cancel_map_check(CommandOp *op);
void kick_requests(OSDSession *session);
+ void _kick_requests(OSDSession *session);
- OSDSession *get_session(int osd);
- void reopen_session(OSDSession *session);
+ int _get_session(int osd, OSDSession **session, RWLock::Context& lc);
+ void put_session(OSDSession *s);
+ void _reopen_session(OSDSession *session);
void close_session(OSDSession *session);
void _list_reply(ListContext *list_context, int r, Context *final_finish,
* If throttle_op needs to throttle it will unlock client_lock.
*/
int calc_op_budget(Op *op);
- void throttle_op(Op *op, int op_size=0);
- void take_op_budget(Op *op) {
+ void _throttle_op(Op *op, int op_size=0);
+ void _take_op_budget(Op *op) {
+ assert(rwlock.is_locked());
int op_budget = calc_op_budget(op);
if (keep_balanced_budget) {
- throttle_op(op, op_budget);
+ _throttle_op(op, op_budget);
} else {
op_throttle_bytes.take(op_budget);
op_throttle_ops.take(1);
public:
Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
- OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout,
+ OSDMap *om, double mon_timeout,
double osd_timeout) :
messenger(m), monc(mc), osdmap(om), cct(cct_),
initialized(false),
keep_balanced_budget(false), honor_osdmap_full(true),
last_seen_osdmap_version(0),
last_seen_pgmap_version(0),
- client_lock(l), timer(t),
+ rwlock("Objecter::rwlock"),
+ timer(cct, rwlock),
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
num_homeless_ops(0),
+ homeless_session(-1),
mon_timeout(mon_timeout),
osd_timeout(osd_timeout),
op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
assert(!logger);
}
- void init_unlocked();
- void init_locked();
- void shutdown_locked();
- void shutdown_unlocked();
+ void init();
+ void shutdown();
/**
* Tell the objecter to throttle outgoing ops according to its
void set_honor_osdmap_full() { honor_osdmap_full = true; }
void unset_honor_osdmap_full() { honor_osdmap_full = false; }
- void scan_requests(bool force_resend,
+ void _scan_requests(OSDSession *s,
+ bool force_resend,
bool force_resend_writes,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
void handle_osd_map(class MOSDMap *m);
void wait_for_osd_map();
+ int pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap);
+ int pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info);
+ int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
private:
+ bool _promote_lock_check_race(RWLock::Context& lc);
+
// low-level
- ceph_tid_t _op_submit(Op *op);
+ ceph_tid_t _op_submit(Op *op, RWLock::Context& lc);
+ ceph_tid_t _op_submit_with_budget(Op *op, RWLock::Context& lc);
inline void unregister_op(Op *op);
// public interface
public:
ceph_tid_t op_submit(Op *op);
bool is_active() {
- return !(ops.empty() && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty());
+ return !((!inflight_ops.read()) && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty());
}
/**
* Output in-flight requests
*/
+ void _dump_active(OSDSession *s);
+ void _dump_active();
void dump_active();
- void dump_requests(Formatter *fmt) const;
- void dump_ops(Formatter *fmt) const;
- void dump_linger_ops(Formatter *fmt) const;
- void dump_command_ops(Formatter *fmt) const;
+ void dump_requests(Formatter *fmt);
+ void _dump_ops(const OSDSession *s, Formatter *fmt);
+ void dump_ops(Formatter *fmt);
+ void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
+ void dump_linger_ops(Formatter *fmt);
+ void _dump_command_ops(const OSDSession *s, Formatter *fmt);
+ void dump_command_ops(Formatter *fmt);
void dump_pool_ops(Formatter *fmt) const;
void dump_pool_stat_ops(Formatter *fmt) const;
void dump_statfs_ops(Formatter *fmt) const;
- int get_client_incarnation() const { return client_inc; }
- void set_client_incarnation(int inc) { client_inc = inc; }
+ int get_client_incarnation() const { return client_inc.read(); }
+ void set_client_incarnation(int inc) { client_inc.set(inc); }
void wait_for_new_map(Context *c, epoch_t epoch, int err=0);
+ void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
void wait_for_latest_osdmap(Context *fin);
+ void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
/** Get the current set of global op flags */
- int get_global_op_flags() { return global_op_flags; }
- /** Add a flag to the global op flags */
- void add_global_op_flags(int flag) { global_op_flags |= flag; }
- /** Clear the passed flags from the global op flag set */
- void clear_global_op_flag(int flags) { global_op_flags &= ~flags; }
+ int get_global_op_flags() { return global_op_flags.read(); }
+ /** Add a flag to the global op flags, not really atomic operation */
+ void add_global_op_flags(int flag) { global_op_flags.set(global_op_flags.read() | flag); }
+ /** Clear the passed flags from the global op flag set, not really atomic operation */
+ void clear_global_op_flag(int flags) { global_op_flags.set(global_op_flags.read() & ~flags); }
/// cancel an in-progress request with the given return code
+ int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
int op_cancel(ceph_tid_t tid, int r);
// commands
c->prs = prs;
c->onfinish = onfinish;
c->target_osd = osd;
- return _submit_command(c, ptid);
+ return submit_command(c, ptid);
}
int pg_command(pg_t pgid, vector<string>& cmd,
const bufferlist& inbl, ceph_tid_t *ptid,
c->prs = prs;
c->onfinish = onfinish;
c->target_pg = pgid;
- return _submit_command(c, ptid);
+ return submit_command(c, ptid);
}
// mid-level helpers
ObjectOperation& op,
const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit, version_t *objver = NULL) {
- Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->priority = op.priority;
o->mtime = mtime;
o->snapc = snapc;
ObjectOperation& op,
snapid_t snapid, bufferlist *pbl, int flags,
Context *onack, version_t *objver = NULL) {
- Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onack, NULL, objver);
+ Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver);
o->priority = op.priority;
o->snapid = snapid;
o->outbl = pbl;
Context *onack,
epoch_t *reply_epoch) {
Op *o = new Op(object_t(), oloc,
- op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ,
+ op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ,
onack, NULL, NULL);
o->target.precalc_pgid = true;
o->target.base_pgid = pg_t(hash, oloc.pool);
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_STAT;
C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, fin, 0, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, fin, 0, objver);
o->snapid = snap;
o->outbl = &fin->bl;
return op_submit(o);
ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap;
o->outbl = pbl;
return op_submit(o);
ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap;
o->outbl = pbl;
return op_submit(o);
ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap;
o->outbl = pbl;
return op_submit(o);
ops[i].op.xattr.value_len = 0;
if (name)
ops[i].indata.append(name);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap;
o->outbl = pbl;
return op_submit(o);
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, fin, 0, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, fin, 0, objver);
o->snapid = snap;
o->outbl = &fin->bl;
return op_submit(o);
snapid_t snap, bufferlist *pbl, int flags,
Context *onfinish,
version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
- return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
+ return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, objver);
}
const SnapContext& snapc, int flags,
Context *onack, Context *oncommit,
version_t *objver = NULL) {
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = bl;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = bl;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq;
ops[i].indata = bl;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.extent.offset = 0;
ops[i].op.extent.length = bl.length();
ops[i].indata = bl;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.extent.offset = trunc_size;
ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.op = CEPH_OSD_OP_ZERO;
ops[i].op.extent.offset = off;
ops[i].op.extent.length = len;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_CREATE;
ops[i].op.flags = create_flags;
- Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_DELETE;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
vector<OSDOp> ops;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = op;
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->snapc = snapc;
return op_submit(o);
}
if (name)
ops[i].indata.append(name);
ops[i].indata.append(bl);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
ops[i].op.xattr.value_len = 0;
if (name)
ops[i].indata.append(name);
- Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+ Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);
void handle_pool_op_reply(MPoolOpReply *m);
int pool_op_cancel(ceph_tid_t tid, int r);
- void finish_pool_op(PoolOp *op);
+ void _finish_pool_op(PoolOp *op);
// --------------------------
// pool stats
private:
- void poolstat_submit(PoolStatOp *op);
+ void _poolstat_submit(PoolStatOp *op);
public:
void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
Context *onfinish);
int pool_stat_op_cancel(ceph_tid_t tid, int r);
- void finish_pool_stat_op(PoolStatOp *op);
+ void _finish_pool_stat_op(PoolStatOp *op);
// ---------------------------
// df stats
private:
- void fs_stats_submit(StatfsOp *op);
+ void _fs_stats_submit(StatfsOp *op);
public:
void handle_fs_stats_reply(MStatfsReply *m);
void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
int statfs_op_cancel(ceph_tid_t tid, int r);
- void finish_statfs_op(StatfsOp *op);
+ void _finish_statfs_op(StatfsOp *op);
// ---------------------------
// some scatter/gather hackery