#include "common/Cond.h"
#include "common/debug.h"
#include "mds/inode_backtrace.h"
+#include "auth/Crypto.h"
#include <iostream>
#include <fstream>
cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " msg='" << s << "'" << std::endl;
}
};
+
+static const char alphanum_table[]="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
+
+int gen_rand_alphanumeric(char *dest, int size) /* size should be the required string size + 1 */
+{
+ int ret = get_random_bytes(dest, size);
+ if (ret < 0) {
+ cerr << "cannot get random bytes: " << strerror(-ret) << std::endl;
+ return -1;
+ }
+
+ int i;
+ for (i=0; i<size - 1; i++) {
+ int pos = (unsigned)dest[i];
+ dest[i] = alphanum_table[pos & 63];
+ }
+ dest[i] = '\0';
+
+ return 0;
+}
+
+struct obj_info {
+ string name;
+ size_t len;
+};
+
+uint64_t get_random(uint64_t min_val, uint64_t max_val)
+{
+ uint64_t r;
+ get_random_bytes((char *)&r, sizeof(r));
+ r = min_val + r % (max_val - min_val + 1);
+ return r;
+}
+
+class LoadGen {
+ int read_write_ratio;
+ size_t min_obj_len;
+ size_t max_obj_len;
+ size_t min_op_len;
+ size_t max_op_len;
+ size_t target_throughput;
+ size_t total_transfered;
+ size_t pending;
+ int num_objs;
+
+ IoCtx io_ctx;
+ Rados *rados;
+
+ map<int, obj_info> objs;
+
+ utime_t start_time;
+
+ enum {
+ OP_READ,
+ OP_WRITE,
+ };
+
+ struct LoadGenOp {
+ int id;
+ int op;
+ string oid;
+ size_t off;
+ size_t len;
+ LoadGen *lg;
+
+ LoadGenOp() {}
+ LoadGenOp(LoadGen *_lg) : lg(_lg) {}
+ };
+
+ int max_op;
+
+ map<int, LoadGenOp> pending_ops;
+
+ void gen_op(int& op_type, string& oid, size_t& off, size_t& len);
+ void gen_next_op();
+
+ uint64_t cur_rate() {
+ utime_t now = ceph_clock_now(g_ceph_context);
+ now -= start_time;
+ uint64_t ns = now.nsec();
+ float delta = ns / 1000000000;
+ delta += now.sec();
+
+ if (delta == 0)
+ return 0;
+
+ return total_transferred / delta;
+ }
+
+ Mutex lock;
+
+ void operate(LoadGenOp& op);
+
+
+public:
+ LoadGen(Rados *_rados) : rados(_rados), lock("LoadGen") {
+ read_write_ratio = 4;
+ min_obj_len = 1024;
+ max_obj_len = (uint64_t)5 * 1024 * 1024 * 1024;
+ min_op_len = 1024;
+ max_op_len = 2 * 1024 * 1024;
+ target_throughput = 5 * 1024 * 1024; // B/sec
+ total_transfered = 0;
+ pending = 0;
+ num_objs = 1000;
+ max_op = 0;
+ }
+ int bootstrap(const char *pool);
+ int run();
+ void cleanup();
+
+ void io_cb(completion_t c) {
+ Mutex::Locker l(lock);
+
+ }
+};
+
+static void _load_gen_cb(completion_t c, void *param)
+{
+ LoadGen *lg = (LoadGen *)lg;
+ lg->io_cb(c);
+}
+
+int LoadGen::bootstrap(const char *pool)
+{
+ char buf[128];
+ int i;
+
+ if (!pool) {
+ cerr << "ERROR: pool name was not specified" << std::endl;
+ return -EINVAL;
+ }
+
+ int ret = rados->ioctx_create(pool, io_ctx);
+ if (ret < 0) {
+ cerr << "error opening pool " << pool << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
+ return ret;
+ }
+
+ int buf_len = 1;
+ bufferptr p = buffer::create(buf_len);
+ bufferlist bl;
+ memset(p.c_str(), 0, buf_len);
+ bl.push_back(p);
+
+ vector<librados::AioCompletion *> completions;
+ for (i = 0; i < num_objs; i++) {
+ obj_info info;
+ gen_rand_alphanumeric(buf, 16);
+ info.name = "obj-";
+ info.name.append(buf);
+ info.len = get_random(min_obj_len, max_obj_len);
+
+ librados::AioCompletion *c = rados->aio_create_completion(NULL, NULL, NULL);
+ completions.push_back(c);
+ // generate object
+ ret = io_ctx.aio_write(info.name, c, bl, buf_len, info.len - buf_len);
+ if (ret < 0) {
+ cerr << "couldn't write obj: " << info.name << " ret=" << ret << std::endl;
+ return ret;
+ }
+ objs[i] = info;
+ }
+
+ vector<librados::AioCompletion *>::iterator iter;
+ for (iter = completions.begin(); iter != completions.end(); ++iter) {
+ AioCompletion *c = *iter;
+ c->wait_for_complete();
+ ret = c->get_return_value();
+ c->release();
+ if (ret < 0) {
+ cerr << "aio_write failed" << std::endl;
+ return ret;
+ }
+ }
+ return 0;
+}
+
+void operate(LoadGenOp& op)
+{
+ librados::AioCompletion *c = rados->aio_create_completion(NULL, NULL, NULL);
+ int ret;
+
+ switch (op.type) {
+ case OP_READ:
+ ret = io_ctx.aio_read(op.oid, c, &op.bl, op.len, op.off);
+ break;
+ case OP_WRITE:
+ bufferptr p = buffer::create(op.len);
+ memset(p.c_str(), 0, op.len);
+ op.bl.push_back(p);
+
+ ret = io_ctx.aio_write(op.oid, c, op.bl, op.len, op.off);
+ break;
+ }
+}
+
+void LoadGen::gen_op(LoadGenOp& op)
+{
+ int i = get_random(0, objs.size() - 1);
+ obj_info& info = objs[i];
+ op.oid = info.name;
+
+ size_t len = get_random(min_op_len, max_op_len);
+ if (len > info.len)
+ len = info.len;
+ size_t off = get_random(0, info.len);
+
+ if (off + len > info.len)
+ off = info.len - len;
+
+ op.off = off;
+ op.len = len;
+
+ i = get_random(0, read_write_ratio + 1);
+ if (i == 0)
+ op.type = OP_WRITE;
+ else
+ op.type = OP_READ;
+}
+
+uint64_t LoadGen::gen_next_op()
+{
+ Mutex::Locker l(lock);
+
+ LoadGenOp op(this);
+ gen_op(op);
+ op.id = max_op++;
+ ops[op.id] = op;
+ cout << (op_type == OP_READ ? "READ" : "WRITE") << " : oid=" << oid << " off=" << off << " len=" << len << std::endl;
+ operate(op);
+
+ return op.len;
+}
+
+int LoadGen::run()
+{
+ start_time = ceph_clock_now(g_ceph_context);
+
+ cout << "warmup" << std::endl;
+ // warmup
+ for (int i = 0; i < 100; i++) {
+ gen_next_op();
+ }
+
+ while (1) {
+ usleep(1000);
+ }
+
+
+
+ return 0;
+}
+
+void LoadGen::cleanup()
+{
+ cout << "cleaning up objects" << std::endl;
+ map<int, obj_info>::iterator iter;
+ for (iter = objs.begin(); iter != objs.end(); ++iter) {
+ obj_info& info = iter->second;
+ int ret = io_ctx.remove(info.name);
+ if (ret < 0)
+ cerr << "couldn't remove obj: " << info.name << " ret=" << ret << std::endl;
+ }
+}
+
/**********************************************
**********************************************/
ret = io_ctx.notify(oid, 0, bl);
if (ret != 0)
cerr << "error calling notify: " << ret << std::endl;
+ } else if (strcmp(nargs[0], "load-gen") == 0) {
+ if (!pool_name)
+ usage();
+ LoadGen lg(&rados);
+ cout << "preparing objects" << std::endl;
+ ret = lg.bootstrap(pool_name);
+ if (ret < 0) {
+ cerr << "load-gen bootstrap failed" << std::endl;
+ exit(1);
+ }
+ lg.run();
+ lg.cleanup();
} else {
cerr << "unrecognized command " << nargs[0] << std::endl;
usage();