int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle,
rados_watchcb_t watchcb, void *arg);
int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle);
-int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver);
+int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len);
#ifdef __cplusplus
}
class WatchCtx {
public:
virtual ~WatchCtx();
- virtual void notify(uint8_t opcode, uint64_t ver) = 0;
+ virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) = 0;
};
struct AioCompletion {
int watch(const std::string& o, uint64_t ver, uint64_t *handle,
librados::WatchCtx *ctx);
int unwatch(const std::string& o, uint64_t handle);
- int notify(const std::string& o, uint64_t ver);
+ int notify(const std::string& o, uint64_t ver, bufferlist& bl);
void set_notify_timeout(uint32_t timeout);
// assert version for next sync operations
io_ctx_impl->put();
}
void notify(RadosClient *client, MWatchNotify *m) {
- ctx->notify(m->opcode, m->ver);
+ ctx->notify(m->opcode, m->ver, m->bl);
if (m->opcode != WATCH_NOTIFY_COMPLETE) {
client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver);
}
*done = false;
}
- void notify(uint8_t opcode, uint64_t ver) {
+ void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
*done = true;
cond->Signal();
}
int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie);
- int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver);
+ int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl);
int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver);
eversion_t last_version(IoCtxImpl& io) {
}// ---------------------------------------------
int librados::RadosClient::
-notify(IoCtxImpl& io, const object_t& oid, uint64_t ver)
+notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl)
{
utime_t ut = ceph_clock_now(cct);
bufferlist inbl, outbl;
uint32_t timeout = io.notify_timeout;
::encode(prot_ver, inbl);
::encode(timeout, inbl);
+ ::encode(bl, inbl);
rd.notify(cookie, ver, inbl);
objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &objver);
lock.Unlock();
}
int librados::IoCtx::
-notify(const string& oid, uint64_t ver)
+notify(const string& oid, uint64_t ver, bufferlist& bl)
{
object_t obj(oid);
- return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver);
+ return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl);
}
void librados::IoCtx::
rados_watchcb_t wcb;
void *arg;
C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
- void notify(uint8_t opcode, uint64_t ver) {
+ void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
wcb(opcode, ver, arg);
}
};
return ctx->client->unwatch(*ctx, oid, cookie);
}
-int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver)
+int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len)
{
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- return ctx->client->notify(*ctx, oid, ver);
+ bufferlist bl;
+ if (buf) {
+ bufferptr p = buffer::create(buf_len);
+ memcpy(p.c_str(), buf, buf_len);
+ bl.push_back(p);
+ }
+ return ctx->client->notify(*ctx, oid, ver, bl);
}
lock("librbd::WatchCtx") {}
virtual ~WatchCtx() {}
void invalidate();
- virtual void notify(uint8_t opcode, uint64_t ver);
+ virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
};
struct AioCompletion;
valid = false;
}
-void WatchCtx::notify(uint8_t opcode, uint64_t ver)
+void WatchCtx::notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
{
Mutex::Locker l(lock);
ldout(ictx->cct, 1) << " got notification opcode=" << (int)opcode << " ver=" << ver << " cookie=" << cookie << dendl;
ver = *pver;
else
ver = io_ctx.get_last_version();
- io_ctx.notify(oid, ver);
+ bufferlist bl;
+ io_ctx.notify(oid, ver, bl);
return 0;
}
uint64_t ver;
uint64_t notify_id;
uint8_t opcode;
+ bufferlist bl;
MWatchNotify() : Message(CEPH_MSG_WATCH_NOTIFY) { }
- MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o) : Message(CEPH_MSG_WATCH_NOTIFY),
- cookie(c), ver(v), notify_id(i), opcode(o) { }
+ MWatchNotify(uint64_t c, uint64_t v, uint64_t i, uint8_t o, bufferlist b) : Message(CEPH_MSG_WATCH_NOTIFY),
+ cookie(c), ver(v), notify_id(i), opcode(o), bl(b) { }
private:
~MWatchNotify() {}
::decode(cookie, p);
::decode(ver, p);
::decode(notify_id, p);
+ if (msg_ver >= 1)
+ ::decode(bl, p);
}
void encode_payload(CephContext *cct) {
- uint8_t msg_ver = 0;
+ uint8_t msg_ver = 1;
::encode(msg_ver, payload);
::encode(opcode, payload);
::encode(cookie, payload);
::encode(ver, payload);
::encode(notify_id, payload);
+ ::encode(bl, payload);
}
const char *get_type_name() { return "watch-notify"; }
{
uint32_t ver;
uint32_t timeout;
+ bufferlist bl;
try {
::decode(ver, bp);
::decode(timeout, bp);
+ ::decode(bl, bp);
} catch (const buffer::error &e) {
timeout = 0;
}
notify_info_t n;
n.timeout = timeout;
n.cookie = op.watch.cookie;
+ n.bl = bl;
ctx->notifies.push_back(n);
}
break;
if (iter != notif->watchers.end()) {
/* there is a pending notification for this watcher, we should resend it anyway
even if we already sent it as it might not have received it */
- MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
+ MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
osd->client_messenger->send_message(notify_msg, session->con);
}
}
dout(10) << " " << *p << dendl;
- Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie);
+ Watch::Notification *notif = new Watch::Notification(ctx->reqid.name, session, p->cookie, p->bl);
session->get(); // notif got a reference
notif->pgid = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc);
notif->add_watcher(name, Watch::WATCHER_NOTIFIED); // adding before send_message to avoid race
s->add_notif(notif, name);
- MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY);
+ MWatchNotify *notify_msg = new MWatchNotify(w.cookie, oi.user_version.version, notif->id, WATCH_NOTIFY, notif->bl);
osd->client_messenger->send_message(notify_msg, s->con);
}
notif->add_watcher(name, Watch::WATCHER_PENDING); /* FIXME: should we remove expired unconnected? probably yes */
}
- notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE);
+ notif->reply = new MWatchNotify(p->cookie, oi.user_version.version, notif->id, WATCH_NOTIFY_COMPLETE, notif->bl);
if (notif->watchers.empty()) {
do_complete_notify(notif, obc);
} else {
Context *timeout;
void *obc;
pg_t pgid;
+ bufferlist bl;
void add_watcher(const entity_name_t& name, WatcherState state) {
watchers[name] = state;
}
- Notification(entity_name_t& n, OSD::Session *s, uint64_t c) : name(n), session(s), cookie(c) { }
+ Notification(entity_name_t& n, OSD::Session *s, uint64_t c, bufferlist& b) : name(n), session(s), cookie(c), bl(b) { }
};
class C_NotifyTimeout : public Context {
struct notify_info_t {
uint64_t cookie;
uint32_t timeout;
+ bufferlist bl;
};
static inline ostream& operator<<(ostream& out, const notify_info_t& n) {
return 0;
}
+class RadosWatchCtx : public librados::WatchCtx {
+ string name;
+public:
+ RadosWatchCtx(const char *imgname) : name(imgname) {}
+ virtual ~RadosWatchCtx() {}
+ virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
+ string s;
+ try {
+ bufferlist::iterator iter = bl.begin();
+ ::decode(s, iter);
+ } catch (buffer::error *err) {
+ cout << "could not decode bufferlist, buffer length=" << bl.length() << std::endl;
+ }
+ cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " msg='" << s << "'" << std::endl;
+ }
+};
/**********************************************
**********************************************/
if (ret != 0)
cerr << "error during benchmark: " << ret << std::endl;
}
- else {
+ else if (strcmp(nargs[0], "watch") == 0) {
+ if (!pool_name || nargs.size() < 2)
+ usage();
+ string oid(nargs[1]);
+ RadosWatchCtx ctx(oid.c_str());
+ uint64_t cookie;
+ ret = io_ctx.watch(oid, 0, &cookie, &ctx);
+ if (ret != 0)
+ cerr << "error calling watch: " << ret << std::endl;
+ else {
+ cout << "press enter to exit..." << std::endl;
+ getchar();
+ }
+ }
+ else if (strcmp(nargs[0], "notify") == 0) {
+ if (!pool_name || nargs.size() < 3)
+ usage();
+ string oid(nargs[1]);
+ string msg(nargs[2]);
+ bufferlist bl;
+ ::encode(msg, bl);
+ ret = io_ctx.notify(oid, 0, bl);
+ if (ret != 0)
+ cerr << "error calling notify: " << ret << std::endl;
+ } else {
cerr << "unrecognized command " << nargs[0] << std::endl;
usage();
}
public:
RbdWatchCtx(const char *imgname) : name(imgname) {}
virtual ~RbdWatchCtx() {}
- virtual void notify(uint8_t opcode, uint64_t ver) {
- cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << std::endl;
+ virtual void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
+ cout << name << " got notification opcode=" << (int)opcode << " ver=" << ver << " bl.length=" << bl.length() << std::endl;
}
};