#include <boost/function.hpp>
#define dout_subsys ceph_subsys_rados
+#undef dout_prefix
+#define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
namespace librados {
+std::ostream& operator<<(std::ostream& out,
+ const TestWatchNotify::WatcherID &watcher_id) {
+ out << "(" << watcher_id.first << "," << watcher_id.second << ")";
+ return out;
+}
+
TestWatchNotify::TestWatchNotify(CephContext *cct, Finisher *finisher)
: m_cct(cct), m_finisher(finisher), m_handle(), m_notify_id(),
m_lock("librados::TestWatchNotify::m_lock"),
}
void TestWatchNotify::flush() {
+ ldout(m_cct, 20) << "enter" << dendl;
// block until we know no additional async notify callbacks will occur
Mutex::Locker locker(m_lock);
while (m_pending_notifies > 0) {
++m_pending_notifies;
uint64_t notify_id = ++m_notify_id;
+ ldout(m_cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
+
SharedWatcher watcher = get_watcher(oid);
SharedNotifyHandle notify_handle(new NotifyHandle());
void TestWatchNotify::notify_ack(const std::string& o, uint64_t notify_id,
uint64_t handle, uint64_t gid,
bufferlist& bl) {
- ldout(m_cct, 20) << __func__ << ": notify_id=" << notify_id << ", "
- << "handle=" << handle << ", "
- << "gid=" << gid << dendl;
+ ldout(m_cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
+ << ", gid=" << gid << dendl;
Mutex::Locker lock(m_lock);
WatcherID watcher_id = std::make_pair(gid, handle);
ack_notify(o, notify_id, watcher_id, bl);
watcher->watch_handles[watch_handle.handle] = watch_handle;
*handle = watch_handle.handle;
+
+ ldout(m_cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
+ << dendl;
return 0;
}
int TestWatchNotify::unwatch(uint64_t handle) {
+ ldout(m_cct, 20) << "handle=" << handle << dendl;
Mutex::Locker locker(m_lock);
for (FileWatchers::iterator it = m_file_watchers.begin();
it != m_file_watchers.end(); ++it) {
void TestWatchNotify::execute_notify(const std::string &oid,
bufferlist &bl, uint64_t notify_id) {
+ ldout(m_cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
+
Mutex::Locker lock(m_lock);
SharedWatcher watcher = get_watcher(oid);
WatchHandles &watch_handles = watcher->watch_handles;
NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
if (n_it == watcher->notify_handles.end()) {
+ ldout(m_cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": not found" << dendl;
return;
}
uint64_t notify_id,
const WatcherID &watcher_id,
const bufferlist &bl) {
+ ldout(m_cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
+ << ", WatcherID=" << watcher_id << dendl;
+
assert(m_lock.is_locked());
SharedWatcher watcher = get_watcher(oid);
NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
if (it == watcher->notify_handles.end()) {
+ ldout(m_cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
+ << ", WatcherID=" << watcher_id << ": not found" << dendl;
return;
}
void TestWatchNotify::finish_notify(const std::string &oid,
uint64_t notify_id) {
+ ldout(m_cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
+
assert(m_lock.is_locked());
SharedWatcher watcher = get_watcher(oid);
NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
if (it == watcher->notify_handles.end()) {
+ ldout(m_cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": not found" << dendl;
return;
}
SharedNotifyHandle notify_handle = it->second;
if (!notify_handle->pending_watcher_ids.empty()) {
+ ldout(m_cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": pending watchers, returning" << dendl;
return;
}
+ ldout(m_cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
+ << ": completing" << dendl;
+
if (notify_handle->pbl != NULL) {
::encode(notify_handle->notify_responses, *notify_handle->pbl);
::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);