TEST_OP_READ,
TEST_OP_WRITE,
TEST_OP_WRITE_EXCL,
+ TEST_OP_WRITESAME,
TEST_OP_DELETE,
TEST_OP_SNAP_CREATE,
TEST_OP_SNAP_REMOVE,
}
};
+class WriteSameOp : public TestOp {
+public:
+ string oid;
+ ContDesc cont;
+ set<librados::AioCompletion *> waiting;
+ librados::AioCompletion *rcompletion;
+ uint64_t waiting_on;
+ uint64_t last_acked_tid;
+
+ librados::ObjectReadOperation read_op;
+ librados::ObjectWriteOperation write_op;
+ bufferlist rbuffer;
+
+ WriteSameOp(int n,
+ RadosTestContext *context,
+ const string &oid,
+ TestOpStat *stat = 0)
+ : TestOp(n, context, stat),
+ oid(oid), rcompletion(NULL), waiting_on(0),
+ last_acked_tid(0)
+ {}
+
+ void _begin()
+ {
+ context->state_lock.Lock();
+ done = 0;
+ stringstream acc;
+ acc << context->prefix << "OID: " << oid << " snap " << context->current_snap << std::endl;
+ string prefix = acc.str();
+
+ cont = ContDesc(context->seq_num, context->current_snap, context->seq_num, prefix);
+
+ ContentsGenerator *cont_gen;
+ cont_gen = new VarLenGenerator(
+ context->max_size, context->min_stride_size, context->max_stride_size);
+ context->update_object(cont_gen, oid, cont);
+
+ context->oid_in_use.insert(oid);
+ context->oid_not_in_use.erase(oid);
+
+ map<uint64_t, uint64_t> ranges;
+
+ cont_gen->get_ranges_map(cont, ranges);
+ std::cout << num << ": seq_num " << context->seq_num << " ranges " << ranges << std::endl;
+ context->seq_num++;
+
+ waiting_on = ranges.size();
+ ContentsGenerator::iterator gen_pos = cont_gen->get_iterator(cont);
+ uint64_t tid = 1;
+ for (map<uint64_t, uint64_t>::iterator i = ranges.begin();
+ i != ranges.end();
+ ++i, ++tid) {
+ gen_pos.seek(i->first);
+ bufferlist to_write = gen_pos.gen_bl_advance(i->second);
+ assert(to_write.length() == i->second);
+ assert(to_write.length() > 0);
+ std::cout << num << ": writing " << context->prefix+oid
+ << " from " << i->first
+ << " to " << i->first + i->second << " tid " << tid << std::endl;
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(this,
+ new TestOp::CallbackInfo(tid));
+ librados::AioCompletion *completion =
+ context->rados.aio_create_completion((void*) cb_arg, NULL,
+ &write_callback);
+ waiting.insert(completion);
+ librados::ObjectWriteOperation op;
+ /* no writesame multiplication factor for now */
+ op.writesame(i->first, to_write.length(), to_write);
+
+ context->io_ctx.aio_operate(
+ context->prefix+oid, completion,
+ &op);
+ }
+
+ bufferlist contbl;
+ ::encode(cont, contbl);
+ pair<TestOp*, TestOp::CallbackInfo*> *cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(
+ this,
+ new TestOp::CallbackInfo(++tid));
+ librados::AioCompletion *completion = context->rados.aio_create_completion(
+ (void*) cb_arg, NULL, &write_callback);
+ waiting.insert(completion);
+ waiting_on++;
+ write_op.setxattr("_header", contbl);
+ write_op.truncate(cont_gen->get_length(cont));
+ context->io_ctx.aio_operate(
+ context->prefix+oid, completion, &write_op);
+
+ cb_arg =
+ new pair<TestOp*, TestOp::CallbackInfo*>(
+ this,
+ new TestOp::CallbackInfo(++tid));
+ rcompletion = context->rados.aio_create_completion(
+ (void*) cb_arg, NULL, &write_callback);
+ waiting_on++;
+ read_op.read(0, 1, &rbuffer, 0);
+ context->io_ctx.aio_operate(
+ context->prefix+oid, rcompletion,
+ &read_op,
+ librados::OPERATION_ORDER_READS_WRITES, // order wrt previous write/update
+ 0);
+ context->state_lock.Unlock();
+ }
+
+ void _finish(CallbackInfo *info)
+ {
+ assert(info);
+ context->state_lock.Lock();
+ uint64_t tid = info->id;
+
+ cout << num << ": finishing writesame tid " << tid << " to " << context->prefix + oid << std::endl;
+
+ if (tid <= last_acked_tid) {
+ cerr << "Error: finished tid " << tid
+ << " when last_acked_tid was " << last_acked_tid << std::endl;
+ assert(0);
+ }
+ last_acked_tid = tid;
+
+ assert(!done);
+ waiting_on--;
+ if (waiting_on == 0) {
+ uint64_t version = 0;
+ for (set<librados::AioCompletion *>::iterator i = waiting.begin();
+ i != waiting.end();
+ ) {
+ assert((*i)->is_complete());
+ if (int err = (*i)->get_return_value()) {
+ cerr << "Error: oid " << oid << " writesame returned error code "
+ << err << std::endl;
+ }
+ if ((*i)->get_version64() > version)
+ version = (*i)->get_version64();
+ (*i)->release();
+ waiting.erase(i++);
+ }
+
+ context->update_object_version(oid, version);
+ if (rcompletion->get_version64() != version) {
+ cerr << "Error: racing read on " << oid << " returned version "
+ << rcompletion->get_version64() << " rather than version "
+ << version << std::endl;
+ assert(0 == "racing read got wrong version");
+ }
+
+ {
+ ObjectDesc old_value;
+ assert(context->find_object(oid, &old_value, -1));
+ if (old_value.deleted())
+ std::cout << num << ": left oid " << oid << " deleted" << std::endl;
+ else
+ std::cout << num << ": left oid " << oid << " "
+ << old_value.most_recent() << std::endl;
+ }
+
+ rcompletion->release();
+ context->oid_in_use.erase(oid);
+ context->oid_not_in_use.insert(oid);
+ context->kick();
+ done = true;
+ }
+ context->state_lock.Unlock();
+ }
+
+ bool finished()
+ {
+ return done;
+ }
+
+ string getType()
+ {
+ return "WriteSameOp";
+ }
+};
+
class DeleteOp : public TestOp {
public:
string oid;
<< context.current_snap << std::endl;
return new WriteOp(m_op, &context, oid, false, true, m_stats);
+ case TEST_OP_WRITESAME:
+ oid = *(rand_choose(context.oid_not_in_use));
+ cout << m_op << ": " << "writesame oid "
+ << oid << " current snap is "
+ << context.current_snap << std::endl;
+ return new WriteSameOp(m_op, &context, oid, m_stats);
+
case TEST_OP_DELETE:
oid = *(rand_choose(context.oid_not_in_use));
cout << m_op << ": " << "delete oid " << oid << " current snap is "
{ TEST_OP_READ, "read", true },
{ TEST_OP_WRITE, "write", false },
{ TEST_OP_WRITE_EXCL, "write_excl", false },
+ { TEST_OP_WRITESAME, "writesame", false },
{ TEST_OP_DELETE, "delete", true },
{ TEST_OP_SNAP_CREATE, "snap_create", true },
{ TEST_OP_SNAP_REMOVE, "snap_remove", true },