return "";
}
+std::string connect_cluster_pp(Rados &cluster)
+{
+ char *id = getenv("CEPH_CLIENT_ID");
+ if (id) std::cerr << "Client id is: " << id << std::endl;
+
+ int ret;
+ ret = cluster.init(id);
+ if (ret) {
+ std::ostringstream oss;
+ oss << "cluster.init failed with error " << ret;
+ return oss.str();
+ }
+ ret = cluster.conf_read_file(NULL);
+ if (ret) {
+ cluster.shutdown();
+ std::ostringstream oss;
+ oss << "cluster.conf_read_file failed with error " << ret;
+ return oss.str();
+ }
+ cluster.conf_parse_env(NULL);
+ ret = cluster.connect();
+ if (ret) {
+ cluster.shutdown();
+ std::ostringstream oss;
+ oss << "cluster.connect failed with error " << ret;
+ return oss.str();
+ }
+ return "";
+}
+
int destroy_one_pool(const std::string &pool_name, rados_t *cluster)
{
int ret = rados_pool_delete(*cluster, pool_name.c_str());
std::string create_one_pool(const std::string &pool_name, rados_t *cluster);
std::string create_one_pool_pp(const std::string &pool_name,
librados::Rados &cluster);
+std::string connect_cluster_pp(librados::Rados &cluster);
int destroy_one_pool(const std::string &pool_name, rados_t *cluster);
int destroy_one_pool_pp(const std::string &pool_name, librados::Rados &cluster);
#include "include/rados/librados.h"
#include "include/rados/librados.hpp"
+#include "include/atomic.h"
+#include "include/utime.h"
+#include "common/Thread.h"
+#include "common/Clock.h"
#include "test/rados-api/test.h"
#include "gtest/gtest.h"
#include <iostream>
#include <string>
+
using namespace librados;
using ceph::buffer;
using std::map;
using std::string;
static sem_t sem;
+static atomic_t stop_flag;
class WatchNotifyTestCtx : public WatchCtx
{
}
};
+struct WatcherUnwatcher : public Thread {
+ string pool;
+ WatcherUnwatcher(string& _pool) : pool(_pool) {}
+
+ void *entry() {
+ while (!stop_flag.read()) {
+ Rados cluster;
+ IoCtx ioctx;
+ connect_cluster_pp(cluster);
+ cluster.ioctx_create(pool.c_str(), ioctx);
+
+ uint64_t handle;
+ WatchNotifyTestCtx watch_ctx;
+ ioctx.watch("foo", 0, &handle, &watch_ctx);
+ bufferlist bl;
+ ioctx.unwatch("foo", handle);
+ ioctx.close();
+ }
+ return NULL;
+ }
+};
TEST(WatchStress, Stress1) {
ASSERT_EQ(0, sem_init(&sem, 0, 0));
Rados cluster;
IoCtx ioctx;
cluster.ioctx_create(pool_name.c_str(), ioctx);
WatchNotifyTestCtx ctx;
-
+
+ WatcherUnwatcher *thr = new WatcherUnwatcher(pool_name);
+ thr->create();
ASSERT_EQ(0, ioctx.create("foo", false));
for (int i = 0; i < 10000; ++i) {
WatchNotifyTestCtx ctx;
ASSERT_EQ(0, ioctx.watch("foo", 0, &handle, &ctx));
bufferlist bl2;
+ utime_t timestamp = ceph_clock_now(NULL);
ASSERT_EQ(0, ioctx.notify("foo", 0, bl2));
+ timestamp = ceph_clock_now(NULL) - timestamp;
+ ASSERT_LT(timestamp.sec(), 5);
TestAlarm alarm;
sem_wait(&sem);
ioctx.unwatch("foo", handle);
}
-
+ stop_flag.set(1);
+ thr->join();
ioctx.close();
ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster));
sem_destroy(&sem);