* - arg opaque user-defined value provided to rados_watch2()
* - notify_id an id for this notify event
* - handle the watcher handle we are notifying
+ * - notifier_id the unique client id for the notifier
* - data payload from the notifier
* - datalen length of payload buffer
*/
typedef void (*rados_watchcb2_t)(void *arg,
uint64_t notify_id,
uint64_t handle,
+ uint64_t notifier_id,
void *data,
size_t data_len);
/**
* @param notify_id unique id for this notify event
* @param cookie the watcher we are notifying
+ * @param notifier_id the unique client id of the notifier
* @param bl opaque notify payload (from the notifier)
*/
virtual void handle_notify(uint64_t notify_id,
uint64_t cookie,
+ uint64_t notifier_id,
bufferlist& bl) = 0;
};
bufferlist empty;
wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty);
} else if (wc->watch_ctx2) {
- wc->watch_ctx2->handle_notify(m->notify_id, m->cookie, m->bl);
+ wc->watch_ctx2->handle_notify(m->notify_id, m->cookie,
+ m->notifier_gid, m->bl);
// user needs to explicitly ack (and may have already!)
}
lock.Lock();
C_WatchCB2(rados_watchcb2_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
void handle_notify(uint64_t notify_id,
uint64_t cookie,
+ uint64_t notifier_gid,
bufferlist& bl) {
- wcb(arg, notify_id, cookie, bl.c_str(), bl.length());
+ wcb(arg, notify_id, cookie, notifier_gid, bl.c_str(), bl.length());
}
};
class MWatchNotify : public Message {
- static const int HEAD_VERSION = 2;
+ static const int HEAD_VERSION = 3;
static const int COMPAT_VERSION = 1;
public:
uint8_t opcode; ///< always WATCH_NOTIFY
bufferlist bl; ///< notify payload (osd->client)
int32_t return_code; ///< notify result (osd->client)
+ uint64_t notifier_gid; ///< who sent the notify
MWatchNotify()
: Message(CEPH_MSG_WATCH_NOTIFY, HEAD_VERSION, COMPAT_VERSION) { }
notify_id(i),
opcode(o),
bl(b),
- return_code(0) { }
+ return_code(0),
+ notifier_gid(0) { }
private:
~MWatchNotify() {}
::decode(return_code, p);
else
return_code = 0;
+ if (header.version >= 3)
+ ::decode(notifier_gid, p);
+ else
+ notifier_gid = 0;
}
void encode_payload(uint64_t features) {
uint8_t msg_ver = 1;
::encode(notify_id, payload);
::encode(bl, payload);
::encode(return_code, payload);
+ ::encode(notifier_gid, payload);
}
const char *get_type_name() const { return "watch-notify"; }
NotifyRef notif(
Notify::makeNotifyRef(
conn,
+ ctx->reqid.name.num(),
ctx->obc->watchers.size(),
p->bl,
p->timeout,
Notify::Notify(
ConnectionRef client,
+ uint64_t client_gid,
unsigned num_watchers,
bufferlist &payload,
uint32_t timeout,
uint64_t notify_id,
uint64_t version,
OSDService *osd)
- : client(client),
+ : client(client), client_gid(client_gid),
in_progress_watchers(num_watchers),
complete(false),
discarded(false),
NotifyRef Notify::makeNotifyRef(
ConnectionRef client,
+ uint64_t client_gid,
unsigned num_watchers,
bufferlist &payload,
uint32_t timeout,
OSDService *osd) {
NotifyRef ret(
new Notify(
- client, num_watchers,
+ client, client_gid, num_watchers,
payload, timeout,
cookie, notify_id,
version, osd));
bufferlist empty;
MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
WATCH_NOTIFY, empty));
+ reply->notifier_gid = client_gid;
reply->set_data(bl);
if (timed_out)
reply->return_code = -ETIMEDOUT;
MWatchNotify *notify_msg = new MWatchNotify(
cookie, notif->version, notif->notify_id,
WATCH_NOTIFY, notif->payload);
+ notify_msg->notifier_gid = notif->client_gid;
osd->send_message_osd_client(notify_msg, conn.get());
}
friend class Watch;
WNotifyRef self;
ConnectionRef client;
+ uint64_t client_gid;
unsigned in_progress_watchers;
bool complete;
bool discarded;
Notify(
ConnectionRef client,
+ uint64_t client_gid,
unsigned num_watchers,
bufferlist &payload,
uint32_t timeout,
}
static NotifyRef makeNotifyRef(
ConnectionRef client,
+ uint64_t client_gid,
unsigned num_watchers,
bufferlist &payload,
uint32_t timeout,
static void watch_notify2_test_cb(void *arg,
uint64_t notify_id,
uint64_t handle,
+ uint64_t notifier_gid,
void *data,
size_t data_len)
{
- std::cout << __func__ << " notify_id " << notify_id
+ std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
<< " handle " << handle << std::endl;
+ assert(notifier_gid > 0);
notify_bl.clear();
notify_bl.append((char*)data, data_len);
if (notify_sleep)
class WatchNotifyTestCtx2 : public WatchCtx2
{
public:
- void handle_notify(uint64_t notify_id, uint64_t cookie, bufferlist& bl)
+ void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
+ bufferlist& bl)
{
std::cout << __func__ << std::endl;
notify_bl = bl;