void Objecter::_scan_requests(OSDSession *s,
bool force_resend,
- bool force_resend_writes,
+ bool cluster_full,
+ map<int64_t, bool> *pool_full_map,
map<ceph_tid_t, Op*>& need_resend,
list<LingerOp*>& need_resend_linger,
map<ceph_tid_t, CommandOp*>& need_resend_command)
assert(op->session == s);
++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
- bool unregister;
+ bool unregister, force_resend_writes = cluster_full;
int r = _recalc_linger_op_target(op, lc);
+ if (pool_full_map)
+ force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool];
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend && !force_resend_writes)
Op *op = p->second;
++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
ldout(cct, 10) << " checking op " << op->tid << dendl;
+ bool force_resend_writes = cluster_full;
+ if (pool_full_map)
+ force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool];
int r = _calc_target(&op->target, &op->last_force_resend);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
CommandOp *c = cp->second;
++cp;
ldout(cct, 10) << " checking command " << c->tid << dendl;
+ bool force_resend_writes = cluster_full;
+ if (pool_full_map)
+ force_resend_writes = force_resend_writes || (*pool_full_map)[c->target_pg.pool()];
int r = _calc_command_target(c);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
}
bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
- bool was_full = _osdmap_full_flag();
- bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || was_full;
+ bool cluster_full = _osdmap_full_flag();
+ bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full || _osdmap_has_pool_full();
+ map<int64_t, bool> pool_full_map;
+ for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
+ it != osdmap->get_pools().end(); it++)
+ pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL);
+
list<LingerOp*> need_resend_linger;
map<ceph_tid_t, Op*> need_resend;
map<ceph_tid_t, CommandOp*> need_resend_command;
}
logger->set(l_osdc_map_epoch, osdmap->get_epoch());
- was_full = was_full || _osdmap_full_flag();
- _scan_requests(homeless_session, skipped_map, was_full,
- need_resend, need_resend_linger,
- need_resend_command);
+ cluster_full = cluster_full || _osdmap_full_flag();
+ update_pool_full_map(pool_full_map);
+ _scan_requests(homeless_session, skipped_map, cluster_full,
+ &pool_full_map, need_resend,
+ need_resend_linger, need_resend_command);
// osd addr changes?
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
p != osd_sessions.end(); ) {
OSDSession *s = p->second;
- _scan_requests(s, skipped_map, was_full,
- need_resend, need_resend_linger,
- need_resend_command);
+ _scan_requests(s, skipped_map, cluster_full,
+ &pool_full_map, need_resend,
+ need_resend_linger, need_resend_command);
++p;
if (!osdmap->is_up(s->osd) ||
(s->con &&
for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
p != osd_sessions.end(); ++p) {
OSDSession *s = p->second;
- _scan_requests(s, false, false, need_resend, need_resend_linger,
- need_resend_command);
+ _scan_requests(s, false, false, NULL, need_resend,
+ need_resend_linger, need_resend_command);
}
ldout(cct, 3) << "handle_osd_map decoding full epoch "
<< m->get_last() << dendl;
osdmap->decode(m->maps[m->get_last()]);
- _scan_requests(homeless_session, false, false,
+ _scan_requests(homeless_session, false, false, NULL,
need_resend, need_resend_linger,
need_resend_command);
} else {
}
bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
- bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag();
+ bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || _osdmap_has_pool_full();
// was/is paused?
if (was_pauserd || was_pausewr || pauserd || pausewr || osdmap->get_epoch() < epoch_barrier) {
ldout(cct, 10) << " paused read " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
_maybe_request_map();
- } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && _osdmap_full_flag()) {
+ } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
+ (_osdmap_full_flag() || _osdmap_pool_full(op->target.base_oloc.pool))) {
ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid.read() << dendl;
op->target.paused = true;
_maybe_request_map();
bool Objecter::target_should_be_paused(op_target_t *t)
{
+ const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
- bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag();
+ bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || pi->has_flag(pg_pool_t::FLAG_FULL);
return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
(t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
return true;
}
+ return _osdmap_pool_full(pool_id);
+}
+
+bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
+{
const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
if (pool == NULL) {
ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
return pool->has_flag(pg_pool_t::FLAG_FULL);
}
+bool Objecter::_osdmap_has_pool_full() const
+{
+ for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
+ it != osdmap->get_pools().end(); it++) {
+ if (it->second.has_flag(pg_pool_t::FLAG_FULL))
+ return true;
+ }
+ return false;
+}
+
/**
* Wrapper around osdmap->test_flag for special handling of the FULL flag.
*/
return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
}
+void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
+{
+ for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
+ it != osdmap->get_pools().end(); it++) {
+ if (pool_full_map.find(it->first) == pool_full_map.end()) {
+ pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL);
+ } else {
+ pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL) || pool_full_map[it->first];
+ }
+ }
+}
int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
const string& ns)