#include "include/Context.h"
#include "include/stringify.h"
#include "common/Finisher.h"
+#include "test/librados_test_stub/TestCluster.h"
#include "test/librados_test_stub/TestRadosClient.h"
#include <boost/bind.hpp>
#include <boost/function.hpp>
return out;
}
-TestWatchNotify::TestWatchNotify()
- : m_lock("librados::TestWatchNotify::m_lock") {
+struct TestWatchNotify::ObjectHandler : public TestCluster::ObjectHandler {
+ TestWatchNotify* test_watch_notify;
+ int64_t pool_id;
+ std::string oid;
+
+ ObjectHandler(TestWatchNotify* test_watch_notify, int64_t pool_id,
+ const std::string& oid)
+ : test_watch_notify(test_watch_notify), pool_id(pool_id),
+ oid(oid) {
+ }
+
+ void handle_removed(TestRadosClient* test_rados_client) override {
+ // copy member variables since this object might be deleted
+ auto _test_watch_notify = test_watch_notify;
+ auto _pool_id = pool_id;
+ auto _oid = oid;
+ auto ctx = new FunctionContext([_test_watch_notify, _pool_id, _oid](int r) {
+ _test_watch_notify->handle_object_removed(_pool_id, _oid);
+ });
+ test_rados_client->get_aio_finisher()->queue(ctx);
+ }
+};
+
+TestWatchNotify::TestWatchNotify(TestCluster* test_cluster)
+ : m_test_cluster(test_cluster), m_lock("librados::TestWatchNotify::m_lock") {
}
void TestWatchNotify::flush(TestRadosClient *rados_client) {
std::list<obj_watch_t> *out_watchers) {
Mutex::Locker lock(m_lock);
SharedWatcher watcher = get_watcher(pool_id, o);
+ if (!watcher) {
+ return -ENOENT;
+ }
out_watchers->clear();
for (TestWatchNotify::WatchHandles::iterator it =
m_lock.Lock();
SharedWatcher watcher = get_watcher(pool_id, o);
+ if (!watcher) {
+ m_lock.Unlock();
+ on_finish->complete(-ENOENT);
+ return;
+ }
WatchHandle watch_handle;
watch_handle.rados_client = rados_client;
WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
if (w_it != watcher->watch_handles.end()) {
watcher->watch_handles.erase(w_it);
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- m_file_watchers.erase(it);
- }
+ maybe_remove_watcher(watcher);
break;
}
}
TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
int64_t pool_id, const std::string& oid) {
assert(m_lock.is_locked());
- SharedWatcher &watcher = m_file_watchers[{pool_id, oid}];
- if (!watcher) {
- watcher.reset(new Watcher());
+
+ auto it = m_file_watchers.find({pool_id, oid});
+ if (it == m_file_watchers.end()) {
+ SharedWatcher watcher(new Watcher(pool_id, oid));
+ watcher->object_handler.reset(new ObjectHandler(
+ this, pool_id, oid));
+ int r = m_test_cluster->register_object_handler(
+ pool_id, oid, watcher->object_handler.get());
+ if (r < 0) {
+ // object doesn't exist
+ return SharedWatcher();
+ }
+ m_file_watchers[{pool_id, oid}] = watcher;
+ return watcher;
+ }
+
+ return it->second;
+}
+
+void TestWatchNotify::maybe_remove_watcher(SharedWatcher watcher) {
+ assert(m_lock.is_locked());
+
+ // TODO
+ if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
+ auto pool_id = watcher->pool_id;
+ auto& oid = watcher->oid;
+ if (watcher->object_handler) {
+ m_test_cluster->unregister_object_handler(pool_id, oid,
+ watcher->object_handler.get());
+ watcher->object_handler.reset();
+ }
+
+ m_file_watchers.erase({pool_id, oid});
}
- return watcher;
}
void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
m_lock.Lock();
uint64_t notify_id = ++m_notify_id;
- ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
-
SharedWatcher watcher = get_watcher(pool_id, oid);
+ if (!watcher) {
+ ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
+ m_lock.Unlock();
+ on_notify->complete(-ENOENT);
+ return;
+ }
+
+ ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
SharedNotifyHandle notify_handle(new NotifyHandle());
notify_handle->rados_client = rados_client;
const bufferlist &bl) {
CephContext *cct = rados_client->cct();
- ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
- << ", WatcherID=" << watcher_id << dendl;
-
assert(m_lock.is_locked());
SharedWatcher watcher = get_watcher(pool_id, oid);
+ if (!watcher) {
+ ldout(cct, 1) << "oid=" << oid << ": not found" << dendl;
+ return;
+ }
NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
if (it == watcher->notify_handles.end()) {
return;
}
+ ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
+ << ", WatcherID=" << watcher_id << dendl;
+
bufferlist response;
response.append(bl);
assert(m_lock.is_locked());
SharedWatcher watcher = get_watcher(pool_id, oid);
+ assert(watcher);
NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
if (it == watcher->notify_handles.end()) {
notify_handle->rados_client->get_aio_finisher()->queue(
notify_handle->on_notify, 0);
watcher->notify_handles.erase(notify_id);
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- m_file_watchers.erase({pool_id, oid});
- }
+ maybe_remove_watcher(watcher);
}
void TestWatchNotify::blacklist(uint32_t nonce) {
++w_it;
}
}
- if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
- file_it = m_file_watchers.erase(file_it);
- } else {
- ++file_it;
+
+ ++file_it;
+ maybe_remove_watcher(watcher);
+ }
+}
+
+void TestWatchNotify::handle_object_removed(int64_t pool_id,
+ const std::string& oid) {
+ Mutex::Locker locker(m_lock);
+ auto it = m_file_watchers.find({pool_id, oid});
+ if (it == m_file_watchers.end()) {
+ return;
+ }
+
+ auto watcher = it->second;
+
+ // cancel all in-flight notifications
+ for (auto& notify_handle_pair : watcher->notify_handles) {
+ auto notify_handle = notify_handle_pair.second;
+ notify_handle->rados_client->get_aio_finisher()->queue(
+ notify_handle->on_notify, -ENOENT);
+ }
+
+ // alert all watchers of the loss of connection
+ for (auto& watch_handle_pair : watcher->watch_handles) {
+ auto& watch_handle = watch_handle_pair.second;
+ auto handle = watch_handle.handle;
+ auto watch_ctx2 = watch_handle.watch_ctx2;
+ if (watch_ctx2 != nullptr) {
+ auto ctx = new FunctionContext([handle, watch_ctx2](int) {
+ watch_ctx2->handle_error(handle, -ENOTCONN);
+ });
+ watch_handle.rados_client->get_aio_finisher()->queue(ctx);
}
}
+ m_file_watchers.erase(it);
}
} // namespace librados
namespace librados {
+class TestCluster;
class TestRadosClient;
class TestWatchNotify : boost::noncopyable {
typedef std::map<uint64_t, WatchHandle> WatchHandles;
+ struct ObjectHandler;
+ typedef boost::shared_ptr<ObjectHandler> SharedObjectHandler;
+
struct Watcher {
+ Watcher(int64_t pool_id, const std::string& oid)
+ : pool_id(pool_id), oid(oid) {
+ }
+
+ int64_t pool_id;
+ std::string oid;
+
+ SharedObjectHandler object_handler;
WatchHandles watch_handles;
NotifyHandles notify_handles;
};
typedef boost::shared_ptr<Watcher> SharedWatcher;
- TestWatchNotify();
+ TestWatchNotify(TestCluster* test_cluster);
int list_watchers(int64_t pool_id, const std::string& o,
std::list<obj_watch_t> *out_watchers);
typedef std::pair<int64_t, std::string> PoolFile;
typedef std::map<PoolFile, SharedWatcher> FileWatchers;
+ TestCluster *m_test_cluster;
+
uint64_t m_handle = 0;
uint64_t m_notify_id = 0;
FileWatchers m_file_watchers;
SharedWatcher get_watcher(int64_t pool_id, const std::string& oid);
+ void maybe_remove_watcher(SharedWatcher shared_watcher);
void execute_watch(TestRadosClient *rados_client, int64_t pool_id,
const std::string& o, uint64_t gid, uint64_t *handle,
const WatcherID &watcher_id, const bufferlist &bl);
void finish_notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string &oid, uint64_t notify_id);
+
+ void handle_object_removed(int64_t pool_id, const std::string& oid);
};
} // namespace librados