From e27a89397919c8e24a8c9f9f02e56e875e2a877f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 27 Jun 2011 17:20:34 -0700 Subject: [PATCH] rados: encode bufferlist in watch-notify --- src/include/rados/librados.h | 2 +- src/include/rados/librados.hpp | 4 ++-- src/librados.cc | 25 ++++++++++++-------- src/librbd.cc | 7 +++--- src/messages/MWatchNotify.h | 10 +++++--- src/osd/ReplicatedPG.cc | 11 +++++---- src/osd/Watch.h | 3 ++- src/osd/osd_types.h | 1 + src/rados.cc | 42 +++++++++++++++++++++++++++++++++- src/rbd.cc | 4 ++-- 10 files changed, 83 insertions(+), 26 deletions(-) diff --git a/src/include/rados/librados.h b/src/include/rados/librados.h index ae735bb2e96da..7708f5d897d52 100644 --- a/src/include/rados/librados.h +++ b/src/include/rados/librados.h @@ -199,7 +199,7 @@ typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg); int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, rados_watchcb_t watchcb, void *arg); int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle); -int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver); +int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len); #ifdef __cplusplus } diff --git a/src/include/rados/librados.hpp b/src/include/rados/librados.hpp index 59ae98fa18caf..02a51191035c4 100644 --- a/src/include/rados/librados.hpp +++ b/src/include/rados/librados.hpp @@ -74,7 +74,7 @@ namespace librados class WatchCtx { public: virtual ~WatchCtx(); - virtual void notify(uint8_t opcode, uint64_t ver) = 0; + virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0; }; struct AioCompletion { @@ -245,7 +245,7 @@ namespace librados int watch(const std::string& o, uint64_t ver, uint64_t *handle, librados::WatchCtx *ctx); int unwatch(const std::string& o, uint64_t handle); - int notify(const std::string& o, uint64_t ver); + int notify(const std::string& o, uint64_t ver, bufferlist& bl); void set_notify_timeout(uint32_t timeout); // assert version for next sync operations diff --git a/src/librados.cc b/src/librados.cc index a893dd329ce8d..547ad3ab3ac4d 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -695,7 +695,7 @@ public: io_ctx_impl->put(); } void notify(RadosClient *client, MWatchNotify *m) { - ctx->notify(m->opcode, m->ver); + ctx->notify(m->opcode, m->ver, m->bl); if (m->opcode != WATCH_NOTIFY_COMPLETE) { client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver); } @@ -711,7 +711,7 @@ public: *done = false; } - void notify(uint8_t opcode, uint64_t ver) { + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { *done = true; cond->Signal(); } @@ -750,7 +750,7 @@ public: int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie); - int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver); + int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl); int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver); eversion_t last_version(IoCtxImpl& io) { @@ -2306,7 +2306,7 @@ unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie) }// --------------------------------------------- int librados::RadosClient:: -notify(IoCtxImpl& io, const object_t& oid, uint64_t ver) +notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl) { utime_t ut = ceph_clock_now(cct); bufferlist inbl, outbl; @@ -2332,6 +2332,7 @@ notify(IoCtxImpl& io, const object_t& oid, uint64_t ver) uint32_t timeout = io.notify_timeout; ::encode(prot_ver, inbl); ::encode(timeout, inbl); + ::encode(bl, inbl); rd.notify(cookie, ver, inbl); objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &objver); lock.Unlock(); @@ -2889,10 +2890,10 @@ unwatch(const string& oid, uint64_t handle) } int librados::IoCtx:: -notify(const string& oid, uint64_t ver) +notify(const string& oid, uint64_t ver, bufferlist& bl) { object_t obj(oid); - return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver); + return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl); } void librados::IoCtx:: @@ -3844,7 +3845,7 @@ struct C_WatchCB : public librados::WatchCtx { rados_watchcb_t wcb; void *arg; C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {} - void notify(uint8_t opcode, uint64_t ver) { + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { wcb(opcode, ver, arg); } }; @@ -3867,9 +3868,15 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) return ctx->client->unwatch(*ctx, oid, cookie); } -int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver) +int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len) { librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; object_t oid(o); - return ctx->client->notify(*ctx, oid, ver); + bufferlist bl; + if (buf) { + bufferptr p = buffer::create(buf_len); + memcpy(p.c_str(), buf, buf_len); + bl.push_back(p); + } + return ctx->client->notify(*ctx, oid, ver, bl); } diff --git a/src/librbd.cc b/src/librbd.cc index 54daffbbe36f4..3d04d564027f3 100644 --- a/src/librbd.cc +++ b/src/librbd.cc @@ -118,7 +118,7 @@ namespace librbd { lock("librbd::WatchCtx") {} virtual ~WatchCtx() {} void invalidate(); - virtual void notify(uint8_t opcode, uint64_t ver); + virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl); }; struct AioCompletion; @@ -301,7 +301,7 @@ void WatchCtx::invalidate() valid = false; } -void WatchCtx::notify(uint8_t opcode, uint64_t ver) +void WatchCtx::notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { Mutex::Locker l(lock); ldout(ictx->cct, 1) << " got notification opcode=" << (int)opcode << " ver=" << ver << " cookie=" << cookie << dendl; @@ -488,7 +488,8 @@ int notify_change(IoCtx& io_ctx, const string& oid, uint64_t *pver, ImageCtx *ic ver = *pver; else ver = io_ctx.get_last_version(); - io_ctx.notify(oid, ver); + bufferlist bl; + io_ctx.notify(oid, ver, bl); return 0; } diff --git a/src/messages/MWatchNotify.h b/src/messages/MWatchNotify.h index d0fc4858c4536..9c62ff919922d 100644 --- a/src/messages/MWatchNotify.h +++ b/src/messages/MWatchNotify.h @@ -25,10 +25,11 @@ class MWatchNotify : public Message { uint64_t ver; uint64_t notify_id; uint8_t opcode; + bufferlist bl; MWatchNotify() : Message(CEPH_MSG_WATCH_NOTIFY) { } - MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o) : Message(CEPH_MSG_WATCH_NOTIFY), - cookie(c), ver(v), notify_id(i), opcode(o) { } + MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o, bufferlist b) : Message(CEPH_MSG_WATCH_NOTIFY), + cookie(c), ver(v), notify_id(i), opcode(o), bl(b) { } private: ~MWatchNotify() {} @@ -41,14 +42,17 @@ public: ::decode(cookie, p); ::decode(ver, p); ::decode(notify_id, p); + if (msg_ver >= 1) + ::decode(bl, p); } void encode_payload(CephContext *cct) { - uint8_t msg_ver = 0; + uint8_t msg_ver = 1; ::encode(msg_ver, payload); ::encode(opcode, payload); ::encode(cookie, payload); ::encode(ver, payload); ::encode(notify_id, payload); + ::encode(bl, payload); } const char *get_type_name() { return "watch-notify"; } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 841637d055645..ba725d9fd61ae 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1394,10 +1394,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, { uint32_t ver; uint32_t timeout; + bufferlist bl; try { ::decode(ver, bp); ::decode(timeout, bp); + ::decode(bl, bp); } catch (const buffer::error &e) { timeout = 0; } @@ -1407,6 +1409,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, notify_info_t n; n.timeout = timeout; n.cookie = op.watch.cookie; + n.bl = bl; ctx->notifies.push_back(n); } break; @@ -2280,7 +2283,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) if (iter != notif->watchers.end()) { /* there is a pending notification for this watcher, we should resend it anyway even if we already sent it as it might not have received it */ - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY); + MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); osd->client_messenger->send_message(notify_msg, session->con); } } @@ -2305,7 +2308,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) dout(10) << " " << *p << dendl; - Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie); + Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl); session->get(); // notif got a reference notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); @@ -2322,7 +2325,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race s->add_notif(notif, name); - MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY); + MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl); osd->client_messenger->send_message(notify_msg, s->con); } @@ -2337,7 +2340,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx) notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */ } - notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE); + notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl); if (notif->watchers.empty()) { do_complete_notify(notif, obc); } else { diff --git a/src/osd/Watch.h b/src/osd/Watch.h index 7e50e4967f8c1..7a0760cec8a78 100644 --- a/src/osd/Watch.h +++ b/src/osd/Watch.h @@ -43,12 +43,13 @@ public: Context *timeout; void *obc; pg_t pgid; + bufferlist bl; void add_watcher(const entity_name_t& name, WatcherState state) { watchers[name] = state; } - Notification(entity_name_t& n, OSD::Session *s, uint64_t c) : name(n), session(s), cookie(c) { } + Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b) : name(n), session(s), cookie(c), bl(b) { } }; class C_NotifyTimeout : public Context { diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 61183ced1d7f4..04a97dbabd878 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1306,6 +1306,7 @@ static inline ostream& operator<<(ostream& out, const watch_info_t& w) { struct notify_info_t { uint64_t cookie; uint32_t timeout; + bufferlist bl; }; static inline ostream& operator<<(ostream& out, const notify_info_t& n) { diff --git a/src/rados.cc b/src/rados.cc index 97154f5cc314c..4b501c24cff7e 100644 --- a/src/rados.cc +++ b/src/rados.cc @@ -165,6 +165,22 @@ static int do_put(IoCtx& io_ctx, const char *objname, const char *infile, int op return 0; } +class RadosWatchCtx : public librados::WatchCtx { + string name; +public: + RadosWatchCtx(const char *imgname) : name(imgname) {} + virtual ~RadosWatchCtx() {} + virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { + string s; + try { + bufferlist::iterator iter = bl.begin(); + ::decode(s, iter); + } catch (buffer::error *err) { + cout << "could not decode bufferlist, buffer length=" << bl.length() << std::endl; + } + cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " msg='" << s << "'" << std::endl; + } +}; /********************************************** **********************************************/ @@ -645,7 +661,31 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts, if (ret != 0) cerr << "error during benchmark: " << ret << std::endl; } - else { + else if (strcmp(nargs[0], "watch") == 0) { + if (!pool_name || nargs.size() < 2) + usage(); + string oid(nargs[1]); + RadosWatchCtx ctx(oid.c_str()); + uint64_t cookie; + ret = io_ctx.watch(oid, 0, &cookie, &ctx); + if (ret != 0) + cerr << "error calling watch: " << ret << std::endl; + else { + cout << "press enter to exit..." << std::endl; + getchar(); + } + } + else if (strcmp(nargs[0], "notify") == 0) { + if (!pool_name || nargs.size() < 3) + usage(); + string oid(nargs[1]); + string msg(nargs[2]); + bufferlist bl; + ::encode(msg, bl); + ret = io_ctx.notify(oid, 0, bl); + if (ret != 0) + cerr << "error calling notify: " << ret << std::endl; + } else { cerr << "unrecognized command " << nargs[0] << std::endl; usage(); } diff --git a/src/rbd.cc b/src/rbd.cc index cde5b60ae7446..d25144a0f25a0 100644 --- a/src/rbd.cc +++ b/src/rbd.cc @@ -479,8 +479,8 @@ class RbdWatchCtx : public librados::WatchCtx { public: RbdWatchCtx(const char *imgname) : name(imgname) {} virtual ~RbdWatchCtx() {} - virtual void notify(uint8_t opcode, uint64_t ver) { - cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << std::endl; + virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { + cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " bl.length=" << bl.length() << std::endl; } }; -- 2.39.5