for (epoch_t e = osdmap->get_epoch() + 1;
e <= m->get_last();
e++) {
+
+ bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
+ bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
+
if (m->incremental_maps.count(e)) {
dout(3) << "handle_osd_map decoding incremental epoch " << e << dendl;
OSDMap::Incremental inc(m->incremental_maps[e]);
// scan pgs for changes
scan_pgs(changed_pgs);
+
+ // kick paused
+ if (was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
+ for (hash_map<tid_t,ReadOp*>::iterator p = op_read.begin();
+ p != op_read.end();
+ p++) {
+ if (p->second->paused) {
+ p->second->paused = false;
+ read_submit(p->second);
+ }
+ }
+ }
+ if (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
+ for (hash_map<tid_t,ModifyOp*>::iterator p = op_modify.begin();
+ p != op_modify.end();
+ p++) {
+ if (p->second->paused) {
+ p->second->paused = false;
+ modify_submit(p->second);
+ }
+ }
+ }
assert(e == osdmap->get_epoch());
}
<< " osd" << pg.acker()
<< dendl;
- if (pg.acker() >= 0) {
+ if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
+ dout(10) << " paused read " << rd << " tid " << last_tid << dendl;
+ rd->paused = true;
+ maybe_request_map();
+ } else if (pg.acker() >= 0) {
int flags = rd->flags;
if (rd->onfinish)
flags |= CEPH_OSD_OP_ACK;
<< " " << wr->layout
<< " osd" << pg.primary()
<< dendl;
- if (pg.primary() >= 0) {
+
+ if (osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
+ dout(10) << " paused modify " << wr << " tid " << last_tid << dendl;
+ wr->paused = true;
+ maybe_request_map();
+ } else if (pg.primary() >= 0) {
MOSDOp *m = new MOSDOp(client_inc, wr->tid,
wr->oid, wr->layout, osdmap->get_epoch(),
flags | CEPH_OSD_OP_MODIFY);
int attempts;
int inc_lock;
+ bool paused;
+
ReadOp(object_t o, ceph_object_layout& ol, vector<ceph_osd_op>& op, int f, Context *of) :
oid(o), layout(ol),
pbl(0), psize(0), flags(f), onfinish(of),
- tid(0), attempts(0), inc_lock(-1) {
+ tid(0), attempts(0), inc_lock(-1),
+ paused(false) {
ops.swap(op);
}
};
int inc_lock;
eversion_t version;
+ bool paused;
+
ModifyOp(object_t o, ceph_object_layout& l, vector<ceph_osd_op>& op,
const SnapContext& sc, int f, Context *ac, Context *co) :
oid(o), layout(l), snapc(sc), flags(f), onack(ac), oncommit(co),
- tid(0), attempts(0), inc_lock(-1) {
+ tid(0), attempts(0), inc_lock(-1),
+ paused(false) {
ops.swap(op);
}
};