+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
/*
- * workload_generator.cc
+ * Ceph - scalable distributed file system
*
- * Created on: Mar 12, 2012
- * Author: jecluis
+ * Copyright (C) 2012 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
*/
#include <stdio.h>
#include <string.h>
const coll_t WorkloadGenerator::META_COLL("meta");
const coll_t WorkloadGenerator::TEMP_COLL("temp");
-void sig_handler(int sig) {
- if (sig == SIGINT) {
- wrkldgen->stop();
- }
-}
-
WorkloadGenerator::WorkloadGenerator(vector<const char*> args) :
- NUM_COLLS(DEF_NUM_COLLS),
- NUM_OBJ_PER_COLL(DEF_NUM_OBJ_PER_COLL),
- store(0), rng(time(NULL)), in_flight(0),
- lock("State Lock"), stop_running(false)
-{
- init_args(args);
-
- ::mkdir("workload_gen_dir", 0777);
- ObjectStore *store_ptr = new FileStore(string("workload_gen_dir"),
- string("workload_gen_journal"));
- store.reset(store_ptr);
- store->mkfs();
- store->mount();
-
- osr = new ObjectStore::Sequencer[NUM_COLLS];
-
- init();
+ m_allow_coll_destruction(false),
+ m_prob_destroy_coll(def_prob_destroy_coll),
+ m_prob_create_coll(def_prob_create_coll),
+ m_num_colls(def_num_colls), m_num_obj_per_coll(def_num_obj_per_coll),
+ m_store(0), m_in_flight(0), m_lock("State Lock") {
+
+ init_args(args);
+ dout(0) << "data = " << g_conf->osd_data << dendl;
+ dout(0) << "journal = " << g_conf->osd_journal << dendl;
+ dout(0) << "journal size = " << g_conf->osd_journal_size << dendl;
+
+ ::mkdir(g_conf->osd_data.c_str(), 0755);
+ ObjectStore *store_ptr = new FileStore(g_conf->osd_data, g_conf->osd_journal);
+ m_store.reset(store_ptr);
+ m_store->mkfs();
+ m_store->mount();
+
+ m_osr.resize(m_num_colls);
+
+ init();
}
void WorkloadGenerator::init_args(vector<const char*> args) {
- for (std::vector<const char*>::iterator i = args.begin();
- i != args.end(); ) {
- string val;
-
- if (ceph_argparse_double_dash(args, i)) {
- break;
- } else if (ceph_argparse_witharg(args, i, &val,
- "-C", "--num-collections", (char*)NULL)) {
- NUM_COLLS = strtoll(val.c_str(), NULL, 10);
- } else if (ceph_argparse_witharg(args, i, &val,
- "-O", "--num-objects", (char*)NULL)) {
- NUM_OBJ_PER_COLL = strtoll(val.c_str(), NULL, 10);
- }
-// else if (ceph_argparse_witharg(args, i, &val,
-// "-o", "--outfn", (char*)NULL)) {
-// outfn = val;
-// }
- }
+ for (std::vector<const char*>::iterator i = args.begin(); i != args.end();) {
+ string val;
+ int allow_coll_dest;
+
+ if (ceph_argparse_double_dash(args, i)) {
+ break;
+ } else if (ceph_argparse_witharg(args, i, &val, "-C", "--num-collections",
+ (char*) NULL)) {
+ m_num_colls = strtoll(val.c_str(), NULL, 10);
+ } else if (ceph_argparse_witharg(args, i, &val, "-O", "--num-objects",
+ (char*) NULL)) {
+ m_num_obj_per_coll = strtoll(val.c_str(), NULL, 10);
+ } else if (ceph_argparse_binary_flag(args, i, &allow_coll_dest, NULL,
+ "--allow-coll-destruction", (char*) NULL)) {
+ m_allow_coll_destruction = (allow_coll_dest ? true : false);
+ }
+ }
}
void WorkloadGenerator::init() {
- dout(0) << "Initializing..." << dendl;
+ dout(0) << "Initializing..." << dendl;
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
- t->create_collection(META_COLL);
- t->create_collection(TEMP_COLL);
-// store->queue_transaction(&osr, t);
- store->apply_transaction(*t);
+ t->create_collection(META_COLL);
+ t->create_collection(TEMP_COLL);
+ m_store->apply_transaction(*t);
- wait_for_ready();
+ wait_for_ready();
- char buf[100];
- for (int i = 0; i < NUM_COLLS; i ++) {
- memset(buf, 0, 100);
- snprintf(buf, 100, "0.%d_head", i);
- coll_t coll(buf);
+ char buf[100];
+ for (int i = 0; i < m_num_colls; i++) {
+ memset(buf, 0, 100);
+ snprintf(buf, 100, "0.%d_head", i);
+ coll_t coll(buf);
- dout(0) << "Creating collection " << coll.to_str() << dendl;
+ dout(0) << "Creating collection " << coll.to_str() << dendl;
- t = new ObjectStore::Transaction;
+ t = new ObjectStore::Transaction;
- t->create_collection(coll);
+ t->create_collection(coll);
- memset(buf, 0, 100);
- snprintf(buf, 100, "pglog_0.%d_head", i);
- hobject_t coll_meta_obj(sobject_t(object_t(buf), CEPH_NOSNAP));
- t->touch(META_COLL, coll_meta_obj);
+ memset(buf, 0, 100);
+ snprintf(buf, 100, "pglog_0.%d_head", i);
+ hobject_t coll_meta_obj(sobject_t(object_t(buf), CEPH_NOSNAP));
+ t->touch(META_COLL, coll_meta_obj);
- store->queue_transaction(&osr[i], t,
- new C_WorkloadGeneratorOnReadable(this, t));
- in_flight++;
- }
+ m_store->queue_transaction(&m_osr[i], t,
+ new C_WorkloadGeneratorOnReadable(this, t));
+ m_in_flight++;
+ }
- wait_for_done();
- dout(0) << "Done initializing!" << dendl;
+ wait_for_done();
+ dout(0) << "Done initializing!" << dendl;
}
int WorkloadGenerator::get_random_collection_nr() {
- return (rand() % NUM_COLLS);
+ return (rand() % m_num_colls);
}
int WorkloadGenerator::get_random_object_nr(int coll_nr) {
- return ((rand() % NUM_OBJ_PER_COLL) + (coll_nr*NUM_OBJ_PER_COLL));
+ return ((rand() % m_num_obj_per_coll) + (coll_nr * m_num_obj_per_coll));
}
coll_t WorkloadGenerator::get_collection_by_nr(int nr) {
+ char buf[100];
+ memset(buf, 0, 100);
- char buf[100];
- memset(buf, 0, 100);
-
- snprintf(buf, 100, "0.%d_head", nr);
- return coll_t(buf);
+ snprintf(buf, 100, "0.%d_head", nr);
+ return coll_t(buf);
}
hobject_t WorkloadGenerator::get_object_by_nr(int nr) {
+ char buf[100];
+ memset(buf, 0, 100);
+ snprintf(buf, 100, "%d", nr);
- char buf[100];
- memset(buf, 0, 100);
- snprintf(buf, 100, "%d", nr);
-
- return hobject_t(sobject_t(object_t(buf), CEPH_NOSNAP));
+ return hobject_t(sobject_t(object_t(buf), CEPH_NOSNAP));
}
hobject_t WorkloadGenerator::get_coll_meta_object(coll_t coll) {
- char buf[100];
- memset(buf, 0, 100);
- snprintf(buf, 100, "pglog_%s", coll.c_str());
+ char buf[100];
+ memset(buf, 0, 100);
+ snprintf(buf, 100, "pglog_%s", coll.c_str());
- return hobject_t(sobject_t(object_t(buf), CEPH_NOSNAP));
+ return hobject_t(sobject_t(object_t(buf), CEPH_NOSNAP));
}
/**
* a couple of MB.
*/
size_t WorkloadGenerator::get_random_byte_amount(size_t min, size_t max) {
- size_t diff = max - min;
-
- return (size_t) (min + (rand() % diff));
+ size_t diff = max - min;
+ return (size_t) (min + (rand() % diff));
}
void WorkloadGenerator::get_filled_byte_array(bufferlist& bl, size_t size) {
+ static const char alphanum[] = "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ bufferptr bp(size);
+ for (unsigned int i = 0; i < size - 1; i++) {
+ bp[i] = alphanum[rand() % sizeof(alphanum)];
+ }
+ bp[size - 1] = '\0';
+ bl.append(bp);
+}
- static const char alphanum[] =
- "0123456789"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz";
- bufferptr bp(size);
- for (unsigned int i = 0; i < size-1; i ++) {
- bp[i] = alphanum[rand() % sizeof(alphanum)];
- }
- bp[size-1] = '\0';
- bl.append(bp);
+void WorkloadGenerator::do_write_object(ObjectStore::Transaction *t,
+ coll_t coll, hobject_t obj) {
+
+ size_t bytes = get_random_byte_amount(min_write_bytes, max_write_bytes);
+ bufferlist bl;
+ get_filled_byte_array(bl, bytes);
+ t->write(coll, obj, 0, bl.length(), bl);
}
-int WorkloadGenerator::do_write_object(ObjectStore::Transaction *t,
- coll_t coll, hobject_t obj) {
+void WorkloadGenerator::do_setattr_object(ObjectStore::Transaction *t,
+ coll_t coll, hobject_t obj) {
- size_t bytes = get_random_byte_amount(MIN_WRITE_BYTES, MAX_WRITE_BYTES);
- bufferlist bl;
- get_filled_byte_array(bl, bytes);
- t->write(coll, obj, 0, bl.length(), bl);
+ size_t size;
+ size = get_random_byte_amount(min_xattr_obj_bytes, max_xattr_obj_bytes);
- return 0;
+ bufferlist bl;
+ get_filled_byte_array(bl, size);
+ t->setattr(coll, obj, "objxattr", bl);
}
-int WorkloadGenerator::do_setattr_object(ObjectStore::Transaction *t,
- coll_t coll, hobject_t obj) {
- size_t size;
- size = get_random_byte_amount(MIN_XATTR_OBJ_BYTES, MAX_XATTR_OBJ_BYTES);
+void WorkloadGenerator::do_setattr_collection(ObjectStore::Transaction *t,
+ coll_t coll) {
- bufferlist bl;
- get_filled_byte_array(bl, size);
- t->setattr(coll, obj, "objxattr", bl);
+ size_t size;
+ size = get_random_byte_amount(min_xattr_coll_bytes, max_xattr_coll_bytes);
- return 0;
+ bufferlist bl;
+ get_filled_byte_array(bl, size);
+ t->collection_setattr(coll, "collxattr", bl);
}
-int WorkloadGenerator::do_setattr_collection(ObjectStore::Transaction *t,
- coll_t coll) {
-
- size_t size;
- size = get_random_byte_amount(MIN_XATTR_COLL_BYTES, MAX_XATTR_COLL_BYTES);
+void WorkloadGenerator::do_append_log(ObjectStore::Transaction *t,
+ coll_t coll) {
- bufferlist bl;
- get_filled_byte_array(bl, size);
- t->collection_setattr(coll, "collxattr", bl);
+ bufferlist bl;
+ get_filled_byte_array(bl, log_append_bytes);
+ hobject_t log_obj = get_coll_meta_object(coll);
- return 0;
+ struct stat st;
+ int err = m_store->stat(META_COLL, log_obj, &st);
+ assert(err >= 0);
+ t->write(META_COLL, log_obj, st.st_size, bl.length(), bl);
}
-int WorkloadGenerator::do_append_log(ObjectStore::Transaction *t,
- coll_t coll) {
+void WorkloadGenerator::do_destroy_collection(ObjectStore::Transaction *t) {
- bufferlist bl;
- get_filled_byte_array(bl, LOG_APPEND_BYTES);
- hobject_t log_obj = get_coll_meta_object(coll);
+}
+
+void WorkloadGenerator::do_create_collection(ObjectStore::Transaction *t) {
- struct stat st;
- int err = store->stat(META_COLL, log_obj, &st);
- assert(err >= 0);
- t->write(META_COLL, log_obj, st.st_size, bl.length(), bl);
+}
- return 0;
+bool WorkloadGenerator::allow_collection_destruction() {
+ return (this->m_allow_coll_destruction);
}
void WorkloadGenerator::run() {
- do {
- lock.Lock();
- wait_for_ready();
+ do {
+ m_lock.Lock();
+ wait_for_ready();
- int coll_nr = get_random_collection_nr();
- int obj_nr = get_random_object_nr(coll_nr);
- coll_t coll = get_collection_by_nr(coll_nr);
- hobject_t obj = get_object_by_nr(obj_nr);
+ int coll_nr = get_random_collection_nr();
+ int obj_nr = get_random_object_nr(coll_nr);
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- int err;
+ bool do_destroy = false;
+ if (allow_collection_destruction()) {
+ int rnd_probability = (1 + (rand() % 100));
- err = do_write_object(t, coll, obj);
- assert(err == 0);
+ if (rnd_probability <= m_prob_destroy_coll)
+ do_destroy = true;
+ }
- err = do_setattr_object(t, coll, obj);
- assert(err == 0);
+ coll_t coll = get_collection_by_nr(coll_nr);
+ hobject_t obj = get_object_by_nr(obj_nr);
- err = do_setattr_collection(t, coll);
- assert(err == 0);
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
- err = do_append_log(t, coll);
- assert(err == 0);
+ do_write_object(t, coll, obj);
+ do_setattr_object(t, coll, obj);
+ do_setattr_collection(t, coll);
+ do_append_log(t, coll);
- store->queue_transaction(&osr[coll_nr], t,
- new C_WorkloadGeneratorOnReadable(this, t));
+ m_store->queue_transaction(&m_osr[coll_nr], t,
+ new C_WorkloadGeneratorOnReadable(this, t));
- in_flight++;
+ m_in_flight++;
- lock.Unlock();
- } while (!stop_running);
+ m_lock.Unlock();
+ } while (true);
}
void WorkloadGenerator::print_results() {
}
-
-int main(int argc, char *argv[]) {
- vector<const char*> args;
- argv_to_vec(argc, (const char **)argv, args);
-
- global_init(args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
- common_init_finish(g_ceph_context);
- g_ceph_context->_conf->set_val("osd_journal_size", "400");
- g_ceph_context->_conf->apply_changes(NULL);
-
-
- WorkloadGenerator *wrkldgen_ptr = new WorkloadGenerator(args);
- wrkldgen.reset(wrkldgen_ptr);
- wrkldgen->run();
- wrkldgen->print_results();
-
- return 0;
+int main(int argc, const char *argv[]) {
+ vector<const char*> args;
+ args.push_back("--osd-journal-size");
+ args.push_back("400");
+ args.push_back("--osd-data");
+ args.push_back("workload_gen_dir");
+ args.push_back("--osd-journal");
+ args.push_back("workload_gen_journal");
+ argv_to_vec(argc, argv, args);
+
+ global_init(args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+ common_init_finish(g_ceph_context);
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ WorkloadGenerator *wrkldgen_ptr = new WorkloadGenerator(args);
+ wrkldgen.reset(wrkldgen_ptr);
+ wrkldgen->run();
+ wrkldgen->print_results();
+
+ return 0;
}
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
/*
- * workload_generator.h
+ * Ceph - scalable distributed file system
*
- * Created on: Mar 12, 2012
- * Author: jecluis
+ * Copyright (C) 2012 New Dream Network
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
*/
-
#ifndef WORKLOAD_GENERATOR_H_
#define WORKLOAD_GENERATOR_H_
#include "os/FileStore.h"
#include <boost/scoped_ptr.hpp>
-#include <boost/random/mersenne_twister.hpp>
-#include <boost/random/uniform_int.hpp>
-#include <pthread.h>
-
-typedef boost::mt11213b gen_type;
+#include <queue>
+#include <set>
-class WorkloadGenerator
-{
+class WorkloadGenerator {
private:
- static const int NUM_THREADS = 2;
- static const int MAX_IN_FLIGHT = 50;
+ /* kept in upper case for consistency with coll_t's */
+ static const coll_t META_COLL;
+ static const coll_t TEMP_COLL;
+
+ static const int max_in_flight = 50;
+
+ static const int def_num_obj_per_coll = 6000;
+ static const int def_num_colls = 30;
+ static const int def_prob_destroy_coll = 30;
+ static const int def_prob_create_coll = 10;
+
+ static const size_t min_write_bytes = 1;
+ static const size_t max_write_mb = 5;
+ static const size_t max_write_bytes = (max_write_mb * 1024 * 1024);
- static const int DEF_NUM_OBJ_PER_COLL = 6000;
- static const int DEF_NUM_COLLS = 30;
+ static const size_t min_xattr_obj_bytes = 2;
+ static const size_t max_xattr_obj_bytes = 300;
+ static const size_t min_xattr_coll_bytes = 4;
+ static const size_t max_xattr_coll_bytes = 600;
- static const coll_t META_COLL;
- static const coll_t TEMP_COLL;
+ static const size_t log_append_bytes = 1024;
- static const size_t MIN_WRITE_BYTES = 1;
- static const size_t MAX_WRITE_MB = 5;
- static const size_t MAX_WRITE_BYTES = (MAX_WRITE_MB * 1024 * 1024);
+ /* probabilities for creating or destroying a collection */
+ bool m_allow_coll_destruction;
+ int m_prob_destroy_coll;
+ int m_prob_create_coll;
- static const size_t MIN_XATTR_OBJ_BYTES = 2;
- static const size_t MAX_XATTR_OBJ_BYTES = 300;
- static const size_t MIN_XATTR_COLL_BYTES = 4;
- static const size_t MAX_XATTR_COLL_BYTES = 600;
-// static const size_t XATTR_NAME_BYTES = 30;
+ int m_num_colls;
+ int m_num_obj_per_coll;
- static const size_t LOG_APPEND_BYTES = 1024;
+ boost::scoped_ptr<ObjectStore> m_store;
- int NUM_COLLS;
- int NUM_OBJ_PER_COLL;
+ int m_in_flight;
+ vector<ObjectStore::Sequencer> m_osr;
- boost::scoped_ptr<ObjectStore> store;
+ Mutex m_lock;
+ Cond m_cond;
- gen_type rng;
- int in_flight;
- ObjectStore::Sequencer *osr;
+ set<coll_t> m_available_collections;
+ set<coll_t> m_removed_collections;
- Mutex lock;
- Cond cond;
- bool stop_running;
+ void wait_for_ready() {
+ while (m_in_flight >= max_in_flight)
+ m_cond.Wait(m_lock);
+ }
- void wait_for_ready() {
- while (in_flight >= MAX_IN_FLIGHT)
- cond.Wait(lock);
- }
+ void wait_for_done() {
+ Mutex::Locker locker(m_lock);
+ while (m_in_flight)
+ m_cond.Wait(m_lock);
+ }
- void wait_for_done() {
- Mutex::Locker locker(lock);
- while (in_flight)
- cond.Wait(lock);
- }
+ void init_args(vector<const char*> args);
+ void init();
- void init_args(vector<const char*> args);
- void init();
+ int get_random_collection_nr();
+ int get_random_object_nr(int coll_nr);
- int get_random_collection_nr();
- int get_random_object_nr(int coll_nr);
+ coll_t get_collection_by_nr(int nr);
+ hobject_t get_object_by_nr(int nr);
+ hobject_t get_coll_meta_object(coll_t coll);
- coll_t get_collection_by_nr(int nr);
- hobject_t get_object_by_nr(int nr);
- hobject_t get_coll_meta_object(coll_t coll);
+ size_t get_random_byte_amount(size_t min, size_t max);
+ void get_filled_byte_array(bufferlist& bl, size_t size);
- size_t get_random_byte_amount(size_t min, size_t max);
- void get_filled_byte_array(bufferlist& bl, size_t size);
+ void do_write_object(ObjectStore::Transaction *t,
+ coll_t coll, hobject_t obj);
+ void do_setattr_object(ObjectStore::Transaction *t,
+ coll_t coll, hobject_t obj);
+ void do_setattr_collection(ObjectStore::Transaction *t, coll_t coll);
+ void do_append_log(ObjectStore::Transaction *t, coll_t coll);
- int do_write_object(ObjectStore::Transaction *t,
- coll_t coll, hobject_t obj);
- int do_setattr_object(ObjectStore::Transaction *t,
- coll_t coll, hobject_t obj);
- int do_setattr_collection(ObjectStore::Transaction *t, coll_t coll);
- int do_append_log(ObjectStore::Transaction *t, coll_t coll);
+ bool allow_collection_destruction();
+ void do_destroy_collection(ObjectStore::Transaction *t);
+ void do_create_collection(ObjectStore::Transaction *t);
public:
- WorkloadGenerator(vector<const char*> args);
- ~WorkloadGenerator() {
- store->umount();
- }
-
- class C_WorkloadGeneratorOnReadable : public Context {
- WorkloadGenerator *state;
- ObjectStore::Transaction *t;
-
- public:
- C_WorkloadGeneratorOnReadable(WorkloadGenerator *state,
- ObjectStore::Transaction *t) : state(state), t(t) {}
-
- void finish(int r) {
- dout(0) << "Got one back!" << dendl;
- Mutex::Locker locker(state->lock);
- state->in_flight--;
- state->cond.Signal();
- }
- };
-
-
- void run(void);
- void print_results(void);
- void stop() {
- stop_running = true;
- }
+ WorkloadGenerator(vector<const char*> args);
+ ~WorkloadGenerator() {
+ m_store->umount();
+ }
+
+ class C_WorkloadGeneratorOnReadable: public Context {
+ WorkloadGenerator *m_state;
+ ObjectStore::Transaction *m_tx;
+
+ public:
+ C_WorkloadGeneratorOnReadable(WorkloadGenerator *state,
+ ObjectStore::Transaction *t) :
+ m_state(state), m_tx(t) {
+ }
+
+ void finish(int r) {
+ dout(0) << "Got one back!" << dendl;
+ Mutex::Locker locker(m_state->m_lock);
+ m_state->m_in_flight--;
+ m_state->m_cond.Signal();
+ }
+ };
+
+ void run(void);
+ void print_results(void);
};
#endif /* WORKLOAD_GENERATOR_H_ */