From 2954799ad6e95c91de8e787c3f9e55eb86fd3d90 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 29 Oct 2010 16:52:07 -0700 Subject: [PATCH] librados: notify waits for completion --- src/include/librados.hpp | 3 +- src/include/rados.h | 4 +- src/librados.cc | 187 +++++++++++++++++++++++++++++++++++++-- src/msg/Message.cc | 6 ++ src/osdc/Objecter.h | 4 + src/testradospp.cc | 16 ++-- 6 files changed, 204 insertions(+), 16 deletions(-) diff --git a/src/include/librados.hpp b/src/include/librados.hpp index 4660481654ccc..f8113003232cf 100644 --- a/src/include/librados.hpp +++ b/src/include/librados.hpp @@ -146,12 +146,13 @@ public: class WatchCtx { public: - virtual bool finish(int r) = 0; + virtual void notify(uint8_t opcode, uint64_t ver) = 0; }; // watch/notify int watch(pool_t pool, const string& o, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx); int unwatch(pool_t pool, const string& o, uint64_t ver, uint64_t cookie); + int notify(pool_t pool, const string& o, uint64_t ver); }; } diff --git a/src/include/rados.h b/src/include/rados.h index 65de7d79c0eb3..a09f3f0ac824c 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -187,8 +187,8 @@ enum { CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4, CEPH_OSD_OP_SPARSE_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 5, - CEPH_OSD_OP_NOTIFY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 5, - CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6, + CEPH_OSD_OP_NOTIFY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6, + CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 7, /* write */ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, diff --git a/src/librados.cc b/src/librados.cc index c81a2b046c9f0..1ac75beb34913 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -41,6 +41,8 @@ using namespace std; #include "include/librados.h" #include "include/librados.hpp" +#include "messages/MWatchNotify.h" + #define RADOS_LIST_MAX_ENTRIES 1024 #define DOUT_SUBSYS rados #undef dout_prefix @@ -59,7 +61,6 @@ static void rados_tls_destructor(void *val) RadosTlsInfo *info = (RadosTlsInfo *)val; pthread_key_t key = info->tls_key; delete info; - cout << "pthread_getspecific NULL " << std::endl; pthread_setspecific(key, NULL); } @@ -394,8 +395,93 @@ public: return c; } + + // watch/notify + struct WatchContext { + PoolCtx pool_ctx; + const object_t oid; + uint64_t cookie; + uint64_t ver; + librados::Rados::WatchCtx *ctx; + + WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx) : pool_ctx(_pc), oid(_oc), ctx(_ctx) {} + ~WatchContext() { + delete ctx; + } + void notify(RadosClient *client, MWatchNotify *m) { + ctx->notify(m->opcode, m->ver); + if (m->opcode != WATCH_NOTIFY_COMPLETE) { + client->_notify_ack(pool_ctx, oid, m->notify_id, m->ver); + } + } + }; + + struct C_NotifyComplete : public librados::Rados::WatchCtx { + Mutex *lock; + Cond *cond; + bool *done; + + C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d) : lock(_l), cond(_c), done(_d) {} + + void notify(uint8_t opcode, uint64_t ver) { + if (opcode != WATCH_NOTIFY_COMPLETE) + cerr << "WARNING: C_NotifyComplete got response: opcode=" << (int)opcode << " ver=" << ver << std::endl; + lock->Lock(); + *done = true; + cond->Signal(); + lock->Unlock(); + } + }; + + Mutex watch_lock; + uint64_t max_watch_cookie; + map watchers; + + pthread_key_t tls_key; + + RadosTlsInfo *tls_info() { + struct RadosTlsInfo *info = (RadosTlsInfo *)pthread_getspecific(tls_key); + if (!info) { + info = new RadosTlsInfo(tls_key); + pthread_setspecific(tls_key, info); + } + return info; + } + + void set_sync_op_version(eversion_t& ver) { + RadosTlsInfo *info = tls_info(); + if (info) + info->objver = ver; + } + + int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx, uint64_t *cookie) { + WatchContext *wc = new WatchContext(pool, oid, ctx); + if (!wc) + return -ENOMEM; + watch_lock.Lock(); + *cookie = ++max_watch_cookie; + watchers[*cookie] = wc; + watch_lock.Unlock(); + return 0; + } + + void unregister_watcher(uint64_t cookie) { + watch_lock.Lock(); + map::iterator iter = watchers.find(cookie); + if (iter != watchers.end()) { + WatchContext *ctx = iter->second; + delete ctx; + watchers.erase(iter); + } + watch_lock.Unlock(); + } + + void watch_notify(MWatchNotify *m); + int watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx); int unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie); + int notify(PoolCtx& pool, const object_t& oid, uint64_t ver); + int _notify_ack(PoolCtx& pool, const object_t& oid, uint64_t notify_id, uint64_t ver); eversion_t last_version() { eversion_t ver; @@ -544,6 +630,10 @@ bool RadosClient::_dispatch(Message *m) case CEPH_MSG_POOLOP_REPLY: objecter->handle_pool_op_reply((MPoolOpReply*)m); break; + + case CEPH_MSG_WATCH_NOTIFY: + watch_notify((MWatchNotify *)m); + break; default: return false; } @@ -1460,22 +1550,33 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map::iterator iter = watchers.find(m->cookie); + if (iter != watchers.end()) + wc = iter->second; + watch_lock.Unlock(); + + if (!wc) + return; + + wc->notify(this, m); +} + int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx) { utime_t ut = g_clock.now(); bufferlist inbl, outbl; - WatchContext *wc = new WatchContext(ctx); - - watch_lock.Lock(); - *cookie = ++max_watch_cookie; - watchers[*cookie] = wc; - watch_lock.Unlock(); + int r = register_watcher(pool, oid, ctx, cookie); + if (r < 0) + return r; Mutex mylock("RadosClient::watch::mylock"); Cond cond; bool done; - int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t objver; @@ -1496,6 +1597,24 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ return r; } + +/* this is called with RadosClient::lock held */ +int RadosClient::_notify_ack(PoolCtx& pool, const object_t& oid, uint64_t notify_id, uint64_t ver) +{ + utime_t ut = g_clock.now(); + + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + eversion_t objver; + + ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); + ObjectOperation rd; + rd.notify_ack(notify_id, ver); + objecter->read(oid, layout, rd, pool.snap_seq, NULL, 0, 0, 0); + + return 0; +} + int RadosClient::unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie) { utime_t ut = g_clock.now(); @@ -1534,6 +1653,49 @@ int RadosClient::unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie) return r; }// --------------------------------------------- +int RadosClient::notify(PoolCtx& pool, const object_t& oid, uint64_t ver) +{ + utime_t ut = g_clock.now(); + bufferlist inbl, outbl; + + Mutex mylock("RadosClient::notify::mylock"); + Mutex mylock_all("RadosClient::notify::mylock_all"); + Cond cond, cond_all; + bool done, done_all; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + uint64_t cookie; + C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); + + r = register_watcher(pool, oid, ctx, &cookie); + if (r < 0) + return r; + + ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); + ObjectOperation rd; + rd.notify(cookie, ver); + lock.Lock(); + objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack, &objver); + lock.Unlock(); + + mylock_all.Lock(); + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + while (!done_all) + cond_all.Wait(mylock_all); + mylock_all.Unlock(); + + unregister_watcher(cookie); + + set_sync_op_version(objver); + + return r; +} + namespace librados { Rados::Rados() : client(NULL) @@ -2014,6 +2176,15 @@ int Rados::unwatch(pool_t pool, const string& o, object_t oid(o); return client->unwatch(*(RadosClient::PoolCtx *)pool, oid, cookie); } + +int Rados::notify(pool_t pool, const string& o, uint64_t ver) +{ + if (!client) + return -EINVAL; + object_t oid(o); + return client->notify(*(RadosClient::PoolCtx *)pool, oid, ver); +} + } // --------------------------------------------- diff --git a/src/msg/Message.cc b/src/msg/Message.cc index a38adb8e0f7c7..c6017b5303be7 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -125,6 +125,8 @@ using namespace std; #include "messages/MClass.h" #include "messages/MClassAck.h" +#include "messages/MWatchNotify.h" + #include "config.h" #define DEBUGLVL 10 // debug level of output @@ -286,6 +288,10 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer, m = new MOSDMap; break; + case CEPH_MSG_WATCH_NOTIFY: + m = new MWatchNotify; + break; + case MSG_OSD_PG_NOTIFY: m = new MOSDPGNotify; break; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 13eb2057454bd..e22614aa5aae9 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -193,6 +193,10 @@ struct ObjectOperation { void notify(uint64_t cookie, uint64_t ver) { add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1); } + + void notify_ack(uint64_t notify_id, uint64_t ver) { + add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0); + } }; diff --git a/src/testradospp.cc b/src/testradospp.cc index 59a546a02859b..6e2975e08ba71 100644 --- a/src/testradospp.cc +++ b/src/testradospp.cc @@ -33,9 +33,8 @@ void buf_to_hex(const unsigned char *buf, int len, char *str) class C_Watch : public Rados::WatchCtx { public: C_Watch() {} - bool finish(int r) { - cout << "C_Watch::finish()" << std::endl; - return true; + void notify(uint8_t opcode, uint64_t ver) { + cout << "C_Watch::notify() opcode=" << (int)opcode << " ver=" << ver << std::endl; } }; @@ -69,13 +68,20 @@ int main(int argc, const char **argv) r = rados.write(pool, oid, 0, bl, bl.length()); uint64_t objver = rados.get_last_ver(); - cout << "rados.write returned " << r << " last_ver=" << rados.get_last_ver() << std::endl; + cout << "rados.write returned " << r << " last_ver=" << objver << std::endl; uint64_t cookie; C_Watch wc; - r = rados.watch(pool, oid, 0, &cookie, &wc); + r = rados.watch(pool, oid, objver, &cookie, &wc); cout << "rados.watch returned " << r << std::endl; + cout << "*** press enter to continue ***" << std::endl; + getchar(); + r = rados.notify(pool, oid, objver); + cout << "rados.notify returned " << r << std::endl; + cout << "*** press enter to continue ***" << std::endl; + getchar(); + exit(0); -- 2.39.5