return false;
}
+ bool object_existed_at(const string &oid, int snap = -1) const
+ {
+ ObjectDesc contents;
+ bool found = find_object(oid, &contents, snap);
+ return found && contents.exists;
+ }
+
void remove_snap(int snap)
{
map<int, map<string,ObjectDesc> >::iterator next_iter = pool_obj_cont.find(snap);
string oid;
int roll_back_to;
bool done;
+ librados::ObjectWriteOperation zero_write_op1;
+ librados::ObjectWriteOperation zero_write_op2;
librados::ObjectWriteOperation op;
- librados::AioCompletion *comp;
+ vector<librados::AioCompletion *> comps;
ceph::shared_ptr<int> in_use;
+ int last_finished;
+ int outstanding;
RollbackOp(int n,
RadosTestContext *context,
TestOpStat *stat = 0)
: TestOp(n, context, stat),
oid(_oid), roll_back_to(-1),
- done(false), comp(NULL)
+ done(false),
+ comps(3, NULL),
+ last_finished(-1), outstanding(3)
{}
void _begin()
cout << "rollback oid " << oid << " to " << roll_back_to << std::endl;
+ bool existed_before = context->object_existed_at(oid);
+ bool existed_after = context->object_existed_at(oid, roll_back_to);
+
context->roll_back(oid, roll_back_to);
uint64_t snap = context->snaps[roll_back_to];
+ outstanding -= (!existed_before) + (!existed_after);
+
context->state_lock.Unlock();
+ bufferlist bl, bl2;
+ zero_write_op1.append(bl);
+ zero_write_op2.append(bl2);
+
if (context->pool_snaps) {
op.snap_rollback(snap);
} else {
op.selfmanaged_snap_rollback(snap);
}
- pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
- new pair<TestOp*, TestOp::CallbackInfo*>(this,
- new TestOp::CallbackInfo(0));
- comp = context->rados.aio_create_completion((void*) cb_arg, NULL,
- &write_callback);
- context->io_ctx.aio_operate(context->prefix+oid, comp, &op);
+ if (existed_before) {
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(this,
+ new TestOp::CallbackInfo(0));
+ comps[0] =
+ context->rados.aio_create_completion((void*) cb_arg, NULL,
+ &write_callback);
+ context->io_ctx.aio_operate(
+ context->prefix+oid, comps[0], &zero_write_op1);
+ }
+ {
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(this,
+ new TestOp::CallbackInfo(1));
+ comps[1] =
+ context->rados.aio_create_completion((void*) cb_arg, NULL,
+ &write_callback);
+ context->io_ctx.aio_operate(
+ context->prefix+oid, comps[1], &op);
+ }
+ if (existed_after) {
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(this,
+ new TestOp::CallbackInfo(2));
+ comps[2] =
+ context->rados.aio_create_completion((void*) cb_arg, NULL,
+ &write_callback);
+ context->io_ctx.aio_operate(
+ context->prefix+oid, comps[2], &zero_write_op2);
+ }
}
void _finish(CallbackInfo *info)
{
Mutex::Locker l(context->state_lock);
+ uint64_t tid = info->id;
+ cout << num << ": finishing rollback tid " << tid
+ << " to " << context->prefix + oid << std::endl;
+ assert((int)(info->id) > last_finished);
+ last_finished = info->id;
+
int r;
- if ((r = comp->get_return_value())) {
+ if ((r = comps[last_finished]->get_return_value()) != 0) {
cerr << "err " << r << std::endl;
assert(0);
}
- done = true;
- context->update_object_version(oid, comp->get_version64());
- context->oid_in_use.erase(oid);
- context->oid_not_in_use.insert(oid);
- in_use = ceph::shared_ptr<int>();
- context->kick();
+ if (--outstanding == 0) {
+ done = true;
+ context->update_object_version(oid, comps[tid]->get_version64());
+ context->oid_in_use.erase(oid);
+ context->oid_not_in_use.insert(oid);
+ in_use = ceph::shared_ptr<int>();
+ context->kick();
+ }
}
bool finished()