]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados: encode bufferlist in watch-notify
authorYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 28 Jun 2011 00:20:34 +0000 (17:20 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Tue, 28 Jun 2011 00:21:05 +0000 (17:21 -0700)
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados.cc
src/librbd.cc
src/messages/MWatchNotify.h
src/osd/ReplicatedPG.cc
src/osd/Watch.h
src/osd/osd_types.h
src/rados.cc
src/rbd.cc

index ae735bb2e96da075ba979a24ce90801d04354e5f..7708f5d897d52b68a018a253f536875737c8df69 100644 (file)
@@ -199,7 +199,7 @@ typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg);
 int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
                 rados_watchcb_t watchcb, void *arg);
 int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle);
-int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver);
+int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len);
 
 #ifdef __cplusplus
 }
index f0ee082d18e0d6f77d78cf0d780396eadd570c24..f6319b5d910f41b86871b5a5831da5524a762c2e 100644 (file)
@@ -74,7 +74,7 @@ namespace librados
   class WatchCtx {
   public:
     virtual ~WatchCtx();
-    virtual void notify(uint8_t opcode, uint64_t ver) = 0;
+    virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0;
   };
 
   struct AioCompletion {
@@ -248,7 +248,7 @@ namespace librados
     int watch(const std::string& o, uint64_t ver, uint64_t *handle,
              librados::WatchCtx *ctx);
     int unwatch(const std::string& o, uint64_t handle);
-    int notify(const std::string& o, uint64_t ver);
+    int notify(const std::string& o, uint64_t ver, bufferlist& bl);
     void set_notify_timeout(uint32_t timeout);
 
     // assert version for next sync operations
index c121307e01c10ca4608d00c87c9e9cb4caf749cd..25e56f4745b6f8698e8ffdd5f4101950cec80891 100644 (file)
@@ -703,7 +703,7 @@ public:
       io_ctx_impl->put();
     }
     void notify(RadosClient *client, MWatchNotify *m) {
-      ctx->notify(m->opcode, m->ver);
+      ctx->notify(m->opcode, m->ver, m->bl);
       if (m->opcode != WATCH_NOTIFY_COMPLETE) {
         client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver);
       }
@@ -719,7 +719,7 @@ public:
       *done = false;
     }
 
-    void notify(uint8_t opcode, uint64_t ver) {
+    void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
       *done = true;
       cond->Signal();
     }
@@ -758,7 +758,7 @@ public:
 
   int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
   int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie);
-  int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver);
+  int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl);
   int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver);
 
   eversion_t last_version(IoCtxImpl& io) {
@@ -2327,7 +2327,7 @@ unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie)
 }// ---------------------------------------------
 
 int librados::RadosClient::
-notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
+notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl)
 {
   utime_t ut = ceph_clock_now(cct);
   bufferlist inbl, outbl;
@@ -2353,6 +2353,7 @@ notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
   uint32_t timeout = io.notify_timeout;
   ::encode(prot_ver, inbl);
   ::encode(timeout, inbl);
+  ::encode(bl, inbl);
   rd.notify(cookie, ver, inbl);
   objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &objver);
   lock.Unlock();
@@ -2910,10 +2911,10 @@ unwatch(const string& oid, uint64_t handle)
 }
 
 int librados::IoCtx::
-notify(const string& oid, uint64_t ver)
+notify(const string& oid, uint64_t ver, bufferlist& bl)
 {
   object_t obj(oid);
-  return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver);
+  return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl);
 }
 
 void librados::IoCtx::
@@ -3871,7 +3872,7 @@ struct C_WatchCB : public librados::WatchCtx {
   rados_watchcb_t wcb;
   void *arg;
   C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
-  void notify(uint8_t opcode, uint64_t ver) {
+  void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
     wcb(opcode, ver, arg);
   }
 };
@@ -3894,9 +3895,15 @@ int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle)
   return ctx->client->unwatch(*ctx, oid, cookie);
 }
 
-int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver)
+int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len)
 {
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
   object_t oid(o);
-  return ctx->client->notify(*ctx, oid, ver);
+  bufferlist bl;
+  if (buf) {
+    bufferptr p = buffer::create(buf_len);
+    memcpy(p.c_str(), buf, buf_len);
+    bl.push_back(p);
+  }
+  return ctx->client->notify(*ctx, oid, ver, bl);
 }
index 54daffbbe36f4891fadade49dbbbe7994487e7f1..3d04d564027f3d317de8bf7fe9cef4166139e671 100644 (file)
@@ -118,7 +118,7 @@ namespace librbd {
                              lock("librbd::WatchCtx") {}
     virtual ~WatchCtx() {}
     void invalidate();
-    virtual void notify(uint8_t opcode, uint64_t ver);
+    virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
   };
 
   struct AioCompletion;
@@ -301,7 +301,7 @@ void WatchCtx::invalidate()
   valid = false;
 }
 
-void WatchCtx::notify(uint8_t opcode, uint64_t ver)
+void WatchCtx::notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
 {
   Mutex::Locker l(lock);
   ldout(ictx->cct, 1) <<  " got notification opcode=" << (int)opcode << " ver=" << ver << " cookie=" << cookie << dendl;
@@ -488,7 +488,8 @@ int notify_change(IoCtx& io_ctx, const string& oid, uint64_t *pver, ImageCtx *ic
     ver = *pver;
   else
     ver = io_ctx.get_last_version();
-  io_ctx.notify(oid, ver);
+  bufferlist bl;
+  io_ctx.notify(oid, ver, bl);
   return 0;
 }
 
index d0fc4858c4536324a604252dd45bd56c9172a0d3..9c62ff919922d96bfb2a7ad7ebabdb5a4ddb0dd9 100644 (file)
@@ -25,10 +25,11 @@ class MWatchNotify : public Message {
   uint64_t ver;
   uint64_t notify_id;
   uint8_t opcode;
+  bufferlist bl;
 
   MWatchNotify() : Message(CEPH_MSG_WATCH_NOTIFY) { }
-  MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o) : Message(CEPH_MSG_WATCH_NOTIFY),
-                                       cookie(c), ver(v), notify_id(i), opcode(o) { }
+  MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o, bufferlist b) : Message(CEPH_MSG_WATCH_NOTIFY),
+                                       cookie(c), ver(v), notify_id(i), opcode(o), bl(b) { }
 private:
   ~MWatchNotify() {}
 
@@ -41,14 +42,17 @@ public:
     ::decode(cookie, p);
     ::decode(ver, p);
     ::decode(notify_id, p);
+    if (msg_ver >= 1)
+      ::decode(bl, p);
   }
   void encode_payload(CephContext *cct) {
-    uint8_t msg_ver = 0;
+    uint8_t msg_ver = 1;
     ::encode(msg_ver, payload);
     ::encode(opcode, payload);
     ::encode(cookie, payload);
     ::encode(ver, payload);
     ::encode(notify_id, payload);
+    ::encode(bl, payload);
   }
 
   const char *get_type_name() { return "watch-notify"; }
index 841637d0556456cd903410d91a1b391da254a34a..ba725d9fd61ae6d2fe039b2ea06d0394ceb4bb4e 100644 (file)
@@ -1394,10 +1394,12 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
       {
        uint32_t ver;
        uint32_t timeout;
+        bufferlist bl;
 
        try {
           ::decode(ver, bp);
          ::decode(timeout, bp);
+          ::decode(bl, bp);
        } catch (const buffer::error &e) {
          timeout = 0;
        }
@@ -1407,6 +1409,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
        notify_info_t n;
        n.timeout = timeout;
        n.cookie = op.watch.cookie;
+        n.bl = bl;
        ctx->notifies.push_back(n);
       }
       break;
@@ -2280,7 +2283,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
        if (iter != notif->watchers.end()) {
          /* there is a pending notification for this watcher, we should resend it anyway
             even if we already sent it as it might not have received it */
-         MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
+         MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
          osd->client_messenger->send_message(notify_msg, session->con);
        }
       }
@@ -2305,7 +2308,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
 
       dout(10) << " " << *p << dendl;
 
-      Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie);
+      Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
       session->get();  // notif got a reference
       notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
 
@@ -2322,7 +2325,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
        notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
        s->add_notif(notif, name);
 
-       MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
+       MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
        osd->client_messenger->send_message(notify_msg, s->con);
       }
 
@@ -2337,7 +2340,7 @@ void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
          notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
       }
 
-      notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE);
+      notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
       if (notif->watchers.empty()) {
        do_complete_notify(notif, obc);
       } else {
index 7e50e4967f8c11cbd7fd6a1c8718d24459cc0427..7a0760cec8a783b452c446a8e406803a6844fe12 100644 (file)
@@ -43,12 +43,13 @@ public:
     Context *timeout;
     void *obc;
     pg_t pgid;
+    bufferlist bl;
 
     void add_watcher(const entity_name_t& name, WatcherState state) {
       watchers[name] = state;
     }
 
-    Notification(entity_name_t& n, OSD::Session *s, uint64_t c) : name(n), session(s), cookie(c) { }
+    Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b) : name(n), session(s), cookie(c), bl(b) { }
   };
 
   class C_NotifyTimeout : public Context {
index 61183ced1d7f41b4afde273a6787a3b02b7a42f5..04a97dbabd87884094e1c990094d66fb2d314415 100644 (file)
@@ -1306,6 +1306,7 @@ static inline ostream& operator<<(ostream& out, const watch_info_t& w) {
 struct notify_info_t {
   uint64_t cookie;
   uint32_t timeout;
+  bufferlist bl;
 };
 
 static inline ostream& operator<<(ostream& out, const notify_info_t& n) {
index 97154f5cc314c77bc8f2108623df96eca2a6484a..4b501c24cff7eed956d2203bfa5d2e11e7e91276 100644 (file)
@@ -165,6 +165,22 @@ static int do_put(IoCtx& io_ctx, const char *objname, const char *infile, int op
   return 0;
 }
 
+class RadosWatchCtx : public librados::WatchCtx {
+  string name;
+public:
+  RadosWatchCtx(const char *imgname) : name(imgname) {}
+  virtual ~RadosWatchCtx() {}
+  virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
+    string s;
+    try {
+      bufferlist::iterator iter = bl.begin();
+      ::decode(s, iter);
+    } catch (buffer::error *err) {
+      cout << "could not decode bufferlist, buffer length=" << bl.length() << std::endl;
+    }
+    cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " msg='" << s << "'" << std::endl;
+  }
+};
 /**********************************************
 
 **********************************************/
@@ -645,7 +661,31 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
     if (ret != 0)
       cerr << "error during benchmark: " << ret << std::endl;
   }
-  else {
+  else if (strcmp(nargs[0], "watch") == 0) {
+    if (!pool_name || nargs.size() < 2)
+      usage();
+    string oid(nargs[1]);
+    RadosWatchCtx ctx(oid.c_str());
+    uint64_t cookie;
+    ret = io_ctx.watch(oid, 0, &cookie, &ctx);
+    if (ret != 0)
+      cerr << "error calling watch: " << ret << std::endl;
+    else {
+      cout << "press enter to exit..." << std::endl;
+      getchar();
+    }
+  }
+  else if (strcmp(nargs[0], "notify") == 0) {
+    if (!pool_name || nargs.size() < 3)
+      usage();
+    string oid(nargs[1]);
+    string msg(nargs[2]);
+    bufferlist bl;
+    ::encode(msg, bl);
+    ret = io_ctx.notify(oid, 0, bl);
+    if (ret != 0)
+      cerr << "error calling notify: " << ret << std::endl;
+  }  else {
     cerr << "unrecognized command " << nargs[0] << std::endl;
     usage();
   }
index cde5b60ae74461133d4856c643fc091b96f2cf5b..d25144a0f25a0f5347d32c4a5e9befb398090d44 100644 (file)
@@ -479,8 +479,8 @@ class RbdWatchCtx : public librados::WatchCtx {
 public:
   RbdWatchCtx(const char *imgname) : name(imgname) {}
   virtual ~RbdWatchCtx() {}
-  virtual void notify(uint8_t opcode, uint64_t ver) {
-    cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << std::endl;
+  virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
+    cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " bl.length=" << bl.length() << std::endl;
   }
 };