Replace the out bufferlist with a response struct.
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
using librbd::watcher::ClientId;
+WRITE_CLASS_ENCODER(ClientId);
+
struct AsyncRequestId {
ClientId client_id;
uint64_t request_id;
}
}
-void Watcher::send_notify(bufferlist& payload, bufferlist *out_bl,
+void Watcher::send_notify(bufferlist& payload,
+ watcher::NotifyResponse *response,
Context *on_finish) {
- m_notifier.notify(payload, out_bl, on_finish);
+ m_notifier.notify(payload, response, on_finish);
}
void Watcher::WatchCtx::handle_notify(uint64_t notify_id,
namespace librbd {
+namespace watcher { struct NotifyResponse; }
+
class Watcher {
public:
struct C_NotifyAck : public Context {
watcher::Notifier m_notifier;
WatchState m_watch_state;
- void send_notify(bufferlist &payload, bufferlist *out_bl = nullptr,
+ void send_notify(bufferlist &payload,
+ watcher::NotifyResponse *response = nullptr,
Context *on_finish = nullptr);
virtual void handle_notify(uint64_t notify_id, uint64_t handle,
ldout(cct, 20) << dendl;
assert(m_image_ctx.owner_lock.is_locked());
- m_notifier.notify(m_bl, &m_out_bl, create_context_callback<
+ m_notifier.notify(m_bl, &m_notify_response, create_context_callback<
NotifyLockOwner, &NotifyLockOwner::handle_notify>(this));
}
return;
}
- typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> responses_t;
- responses_t responses;
- if (m_out_bl.length() > 0) {
- try {
- bufferlist::iterator iter = m_out_bl.begin();
- ::decode(responses, iter);
- } catch (const buffer::error &err) {
- lderr(cct) << ": failed to decode response" << dendl;
- finish(-EINVAL);
- return;
- }
- }
-
bufferlist response;
bool lock_owner_responded = false;
- for (responses_t::iterator i = responses.begin(); i != responses.end(); ++i) {
- if (i->second.length() > 0) {
+ for (auto &it : m_notify_response.acks) {
+ if (it.second.length() > 0) {
if (lock_owner_responded) {
lderr(cct) << ": duplicate lock owners detected" << dendl;
finish(-EINVAL);
return;
}
lock_owner_responded = true;
- response.claim(i->second);
+ response.claim(it.second);
}
}
#define CEPH_LIBRBD_IMAGE_WATCHER_NOTIFY_LOCK_OWNER_H
#include "include/buffer.h"
+#include "librbd/watcher/Types.h"
class Context;
watcher::Notifier &m_notifier;
bufferlist m_bl;
- bufferlist m_out_bl;
+ watcher::NotifyResponse m_notify_response;
Context *m_on_finish;
void send_notify();
#include "common/WorkQueue.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
+#include "librbd/watcher/Types.h"
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
const uint64_t Notifier::NOTIFY_TIMEOUT = 5000;
+Notifier::C_AioNotify::C_AioNotify(Notifier *notifier, NotifyResponse *response,
+ Context *on_finish)
+ : notifier(notifier), response(response), on_finish(on_finish) {
+}
+
+void Notifier::C_AioNotify::finish(int r) {
+ if (response != nullptr) {
+ if (r == 0 || r == -ETIMEDOUT) {
+ try {
+ bufferlist::iterator it = out_bl.begin();
+ ::decode(*response, it);
+ } catch (const buffer::error &err) {
+ r = -EBADMSG;
+ }
+ }
+ }
+ notifier->handle_notify(r, on_finish);
+}
+
Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid)
: m_work_queue(work_queue), m_ioctx(ioctx), m_oid(oid),
m_aio_notify_lock(util::unique_lock_name(
m_aio_notify_flush_ctxs.push_back(on_finish);
}
-void Notifier::notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish) {
+void Notifier::notify(bufferlist &bl, NotifyResponse *response,
+ Context *on_finish) {
{
Mutex::Locker aio_notify_locker(m_aio_notify_lock);
++m_pending_aio_notifies;
<< dendl;
}
- C_AioNotify *ctx = new C_AioNotify(this, on_finish);
+ C_AioNotify *ctx = new C_AioNotify(this, response, on_finish);
librados::AioCompletion *comp = util::create_rados_callback(ctx);
- int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, out_bl);
+ int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, &ctx->out_bl);
assert(r == 0);
comp->release();
}
namespace watcher {
+struct NotifyResponse;
+
class Notifier {
public:
static const uint64_t NOTIFY_TIMEOUT;
~Notifier();
void flush(Context *on_finish);
- void notify(bufferlist &bl, bufferlist *out_bl, Context *on_finish);
+ void notify(bufferlist &bl, NotifyResponse *response, Context *on_finish);
private:
typedef std::list<Context*> Contexts;
struct C_AioNotify : public Context {
Notifier *notifier;
+ NotifyResponse *response;
Context *on_finish;
+ bufferlist out_bl;
+
+ C_AioNotify(Notifier *notifier, NotifyResponse *response,
+ Context *on_finish);
- C_AioNotify(Notifier *notifier, Context *on_finish)
- : notifier(notifier), on_finish(on_finish) {
- }
- void finish(int r) override {
- notifier->handle_notify(r, on_finish);
- }
+ void finish(int r) override;
};
ContextWQ *m_work_queue;
f->dump_unsigned("handle", handle);
}
+WRITE_CLASS_ENCODER(ClientId);
+
+void NotifyResponse::encode(bufferlist& bl) const {
+ ::encode(acks, bl);
+ ::encode(timeouts, bl);
+}
+
+void NotifyResponse::decode(bufferlist::iterator& iter) {
+ ::decode(acks, iter);
+ ::decode(timeouts, iter);
+}
+
} // namespace watcher
} // namespace librbd
}
};
+struct NotifyResponse {
+ std::map<ClientId, bufferlist> acks;
+ std::vector<ClientId> timeouts;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& it);
+};
+
template <typename ImageCtxT>
struct Traits {
typedef librbd::Watcher Watcher;
const librbd::watcher::ClientId &client);
WRITE_CLASS_ENCODER(librbd::watcher::ClientId);
+WRITE_CLASS_ENCODER(librbd::watcher::NotifyResponse);
#endif // CEPH_LIBRBD_WATCHER_TYPES_H
bufferlist bl;
::encode(NotifyMessage{HeartbeatPayload{}}, bl);
- m_heartbeat_ack_bl.clear();
- send_notify(bl, &m_heartbeat_ack_bl, ctx);
+ m_heartbeat_response.acks.clear();
+ send_notify(bl, &m_heartbeat_response, ctx);
}
template <typename I>
return;
}
- try {
- bufferlist::iterator iter = m_heartbeat_ack_bl.begin();
- uint32_t num_acks;
- ::decode(num_acks, iter);
-
- dout(20) << num_acks << " acks received" << dendl;
-
- for (uint32_t i = 0; i < num_acks; i++) {
- uint64_t notifier_id;
- uint64_t cookie;
- bufferlist reply_bl;
-
- ::decode(notifier_id, iter);
- ::decode(cookie, iter);
- ::decode(reply_bl, iter);
+ dout(20) << m_heartbeat_response.acks.size() << " acks received, "
+ << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
- if (notifier_id == m_notifier_id) {
- continue;
- }
-
- std::string instance_id = stringify(notifier_id);
- m_instances->notify(instance_id);
+ for (auto &it: m_heartbeat_response.acks) {
+ uint64_t notifier_id = it.first.gid;
+ if (notifier_id == m_notifier_id) {
+ continue;
}
- } catch (const buffer::error &err) {
- derr << ": error decoding heartbeat acks: " << err.what() << dendl;
+
+ std::string instance_id = stringify(notifier_id);
+ m_instances->notify(instance_id);
}
schedule_timer_task("heartbeat", 1, true,
#include "common/AsyncOpTracker.h"
#include "librbd/ManagedLock.h"
-#include "librbd/managed_lock/Types.h"
#include "librbd/Watcher.h"
+#include "librbd/managed_lock/Types.h"
+#include "librbd/watcher/Types.h"
#include "Instances.h"
#include "MirrorStatusWatcher.h"
#include "tools/rbd_mirror/leader_watcher/Types.h"
Context *m_timer_task = nullptr;
C_TimerGate *m_timer_gate = nullptr;
- bufferlist m_heartbeat_ack_bl;
+ librbd::watcher::NotifyResponse m_heartbeat_response;
bool is_leader(Mutex &m_lock);