uint64_t count; /// number of readers or writers
list<OpRequestRef> waiters; /// ops waiting on state change
- ObjState() : state(NONE), count(0) {}
+ /// if set, restart backfill when we can get a read lock
+ bool backfill_read_marker;
+
+ ObjState() : state(NONE), count(0), backfill_read_marker(false) {}
bool get_read(OpRequestRef op) {
if (get_read_lock()) {
return true;
}
bool get_write_lock() {
// don't starve anybody!
- if (!waiters.empty()) {
+ if (!waiters.empty() ||
+ backfill_read_marker) {
return false;
}
switch (state) {
}
bool empty() const { return state == NONE; }
};
- map<hobject_t, ObjState > obj_state;
+ map<hobject_t, ObjState > obj_state; ///< map of rw_lock states
public:
+ RWTracker() {}
+
bool get_read(const hobject_t &hoid, OpRequestRef op) {
return obj_state[hoid].get_read(op);
}
obj_state.erase(hoid);
}
}
- void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake) {
+ void put_write(const hobject_t &hoid, list<OpRequestRef> *to_wake,
+ bool *requeue_recovery) {
obj_state[hoid].put_write(to_wake);
if (obj_state[hoid].empty()) {
+ if (obj_state[hoid].backfill_read_marker)
+ *requeue_recovery = true;
obj_state.erase(hoid);
}
}
+ bool get_backfill_read(const hobject_t &hoid) {
+ ObjState& obj_locker = obj_state[hoid];
+ obj_locker.backfill_read_marker = true;
+ if (obj_locker.get_read_lock()) {
+ return true;
+ } // else
+ return false;
+ }
+ void drop_backfill_read(const hobject_t &hoid, list<OpRequestRef> *ls) {
+ map<hobject_t, ObjState>::iterator i = obj_state.find(hoid);
+ ObjState& obj_locker = i->second;
+ assert(obj_locker.backfill_read_marker = true);
+ obj_locker.put_read(ls);
+ if (obj_locker.empty())
+ obj_state.erase(i);
+ else
+ obj_locker.backfill_read_marker = false;
+ }
} rw_manager;
/**
*/
void release_op_ctx_locks(OpContext *ctx) {
list<OpRequestRef> to_req;
+ bool requeue_recovery = false;
switch (ctx->lock_to_release) {
case OpContext::W_LOCK:
- rw_manager.put_write(ctx->obs->oi.soid, &to_req);
+ rw_manager.put_write(ctx->obs->oi.soid, &to_req, &requeue_recovery);
+ if (requeue_recovery)
+ osd->recovery_wq.queue(this);
break;
case OpContext::R_LOCK:
rw_manager.put_read(ctx->obs->oi.soid, &to_req);