* @return true on success, false if we are queued
*/
bool get_rw_locks(OpContext *ctx) {
- if (ctx->op->may_write() || ctx->op->may_cache()) {
- /* If snapset_obc, !obc->obs->exists and we need to
- * get a write lock on the snapdir as well as the
- * head. Fortunately, we are guarranteed to get a
- * write lock on the head if !obc->obs->exists
- */
- if (ctx->snapset_obc) {
- assert(!ctx->obc->obs.exists);
+ /* If snapset_obc, !obc->obs->exists and we will always take the
+ * snapdir lock *before* the head lock. Since all callers will do
+ * this (read or write) if we get the first we will be guaranteed
+ * to get the second.
+ */
+ if (ctx->snapset_obc) {
+ assert(!ctx->obc->obs.exists);
+ if (ctx->op->may_write() || ctx->op->may_cache()) {
if (ctx->snapset_obc->get_write(ctx->op)) {
ctx->release_snapset_obc = true;
ctx->lock_to_release = OpContext::W_LOCK;
} else {
return false;
}
- // we are creating it and have the only ref
- bool got = ctx->obc->get_write(ctx->op);
- assert(got);
- return true;
} else {
- if (ctx->obc->get_write(ctx->op)) {
- ctx->lock_to_release = OpContext::W_LOCK;
- return true;
+ assert(ctx->op->may_read());
+ if (ctx->snapset_obc->get_read(ctx->op)) {
+ ctx->release_snapset_obc = true;
+ ctx->lock_to_release = OpContext::R_LOCK;
} else {
return false;
}
}
+ }
+ if (ctx->op->may_write() || ctx->op->may_cache()) {
+ if (ctx->obc->get_write(ctx->op)) {
+ ctx->lock_to_release = OpContext::W_LOCK;
+ return true;
+ } else {
+ assert(!ctx->snapset_obc);
+ return false;
+ }
} else {
assert(ctx->op->may_read());
if (ctx->obc->get_read(ctx->op)) {
ctx->lock_to_release = OpContext::R_LOCK;
return true;
} else {
+ assert(!ctx->snapset_obc);
return false;
}
}
bool requeue_recovery = false;
bool requeue_recovery_clone = false;
bool requeue_recovery_snapset = false;
- if (ctx->snapset_obc && ctx->release_snapset_obc) {
- ctx->snapset_obc->put_write(&to_req, &requeue_recovery_snapset);
- ctx->release_snapset_obc = false;
- }
switch (ctx->lock_to_release) {
case OpContext::W_LOCK:
+ if (ctx->snapset_obc && ctx->release_snapset_obc) {
+ ctx->snapset_obc->put_write(&to_req, &requeue_recovery_snapset);
+ ctx->release_snapset_obc = false;
+ }
ctx->obc->put_write(&to_req, &requeue_recovery);
if (ctx->clone_obc)
ctx->clone_obc->put_write(&to_req, &requeue_recovery_clone);
break;
case OpContext::R_LOCK:
+ if (ctx->snapset_obc && ctx->release_snapset_obc) {
+ ctx->snapset_obc->put_read(&to_req);
+ ctx->release_snapset_obc = false;
+ }
ctx->obc->put_read(&to_req);
break;
case OpContext::NONE:
default:
assert(0);
};
+ assert(ctx->release_snapset_obc == false);
ctx->lock_to_release = OpContext::NONE;
if (requeue_recovery || requeue_recovery_clone || requeue_recovery_snapset)
osd->recovery_wq.queue(this);