#undef dout_prefix
#define dout_prefix *_dout << "librados: "
+namespace librados {
+namespace {
+
+struct C_notify_Finish : public Context {
+ CephContext *cct;
+ Context *ctx;
+ Objecter *objecter;
+ Objecter::LingerOp *linger_op;
+ bufferlist reply_bl;
+ bufferlist *preply_bl;
+ char **preply_buf;
+ size_t *preply_buf_len;
+
+ C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
+ Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
+ char **_preply_buf, size_t *_preply_buf_len)
+ : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
+ preply_bl(_preply_bl), preply_buf(_preply_buf),
+ preply_buf_len(_preply_buf_len)
+ {
+ linger_op->on_notify_finish = this;
+ linger_op->notify_result_bl = &reply_bl;
+ }
+
+ virtual void finish(int r)
+ {
+ ldout(cct, 10) << __func__ << " completed notify (linger op "
+ << linger_op << "), r = " << r << dendl;
+
+ // pass result back to user
+ // NOTE: we do this regardless of what error code we return
+ if (preply_buf) {
+ if (reply_bl.length()) {
+ *preply_buf = (char*)malloc(reply_bl.length());
+ memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
+ } else {
+ *preply_buf = NULL;
+ }
+ }
+ if (preply_buf_len)
+ *preply_buf_len = reply_bl.length();
+ if (preply_bl)
+ preply_bl->claim(reply_bl);
+
+ ctx->complete(r);
+ }
+};
+
+struct C_aio_linger_cancel : public Context {
+ Objecter *objecter;
+ Objecter::LingerOp *linger_op;
+
+ C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
+ : objecter(_objecter), linger_op(_linger_op)
+ {
+ }
+
+ virtual void finish(int r)
+ {
+ objecter->linger_cancel(linger_op);
+ }
+};
+
+struct C_aio_notify_Complete : public Context {
+ AioCompletionImpl *c;
+ Objecter::LingerOp *linger_op;
+
+ C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
+ : c(_c), linger_op(_linger_op)
+ {
+ c->get();
+ }
+
+ virtual void finish(int r) {
+ c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
+ linger_op));
+
+ c->lock.Lock();
+ c->rval = r;
+ c->ack = true;
+ c->safe = true;
+ c->cond.Signal();
+
+ if (c->callback_complete) {
+ c->io->client->finisher.queue(new C_AioComplete(c));
+ }
+ if (c->callback_safe) {
+ c->io->client->finisher.queue(new C_AioSafe(c));
+ }
+ c->put_unlock();
+ }
+};
+
+struct C_aio_notify_Ack : public Context {
+ CephContext *cct;
+ C_notify_Finish *f;
+
+ C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_f)
+ : cct(_cct), f(_f)
+ {
+ }
+
+ virtual void finish(int r)
+ {
+ ldout(cct, 10) << __func__ << " linger op " << f->linger_op << " acked ("
+ << r << ")" << dendl;
+ if (r < 0) {
+ f->complete(r);
+ }
+ }
+};
+
+} // anonymous namespace
+} // namespace librados
+
librados::IoCtxImpl::IoCtxImpl() :
ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
bufferlist *preply_bl,
char **preply_buf, size_t *preply_buf_len)
{
- bufferlist inbl;
-
- struct C_NotifyFinish : public Context {
- Cond cond;
- Mutex lock;
- bool done;
- int result;
- bufferlist reply_bl;
-
- C_NotifyFinish()
- : lock("IoCtxImpl::notify::C_NotifyFinish::lock"),
- done(false),
- result(0) { }
-
- void finish(int r) {}
- void complete(int r) {
- lock.Lock();
- done = true;
- result = r;
- cond.Signal();
- lock.Unlock();
- }
- void wait() {
- lock.Lock();
- while (!done)
- cond.Wait(lock);
- lock.Unlock();
- }
- } notify_private;
-
Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
- linger_op->on_notify_finish = ¬ify_private;
- linger_op->notify_result_bl = ¬ify_private.reply_bl;
- uint32_t prot_ver = 1;
+ C_SaferCond notify_finish_cond;
+ Context *notify_finish = new C_notify_Finish(client->cct, ¬ify_finish_cond,
+ objecter, linger_op, preply_bl,
+ preply_buf, preply_buf_len);
+
uint32_t timeout = notify_timeout;
if (timeout_ms)
timeout = timeout_ms / 1000;
- ::encode(prot_ver, inbl);
- ::encode(timeout, inbl);
- ::encode(bl, inbl);
// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify(linger_op->get_cookie(), inbl);
+ bufferlist inbl;
+ rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
// Issue RADOS op
C_SaferCond onack;
&onack, &objver);
ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
- int r_issue = onack.wait();
+ int r = onack.wait();
ldout(client->cct, 10) << __func__ << " linger op " << linger_op
- << " acked (" << r_issue << ")" << dendl;
+ << " acked (" << r << ")" << dendl;
- if (r_issue == 0) {
+ if (r == 0) {
ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
<< linger_op << dendl;
- notify_private.wait();
+ r = notify_finish_cond.wait();
- ldout(client->cct, 10) << __func__ << " completed notify (linger op "
- << linger_op << "), r = " << notify_private.result
- << dendl;
} else {
ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
- << r_issue << dendl;
+ << r << dendl;
+ notify_finish->complete(r);
}
- // pass result back to user
- // NOTE: we do this regardless of what error code we return
- if (preply_buf) {
- if (notify_private.reply_bl.length()) {
- *preply_buf = (char*)malloc(notify_private.reply_bl.length());
- memcpy(*preply_buf, notify_private.reply_bl.c_str(),
- notify_private.reply_bl.length());
- } else {
- *preply_buf = NULL;
- }
- }
- if (preply_buf_len)
- *preply_buf_len = notify_private.reply_bl.length();
- if (preply_bl)
- preply_bl->claim(notify_private.reply_bl);
-
objecter->linger_cancel(linger_op);
set_sync_op_version(objver);
+ return r;
+}
+
+int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
+ bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *preply_bl, char **preply_buf,
+ size_t *preply_buf_len)
+{
+ Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
- return r_issue ? r_issue : notify_private.result;
+ c->io = this;
+
+ Context *oncomplete = new C_aio_notify_Complete(c, linger_op);
+ C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
+ objecter, linger_op,
+ preply_bl, preply_buf,
+ preply_buf_len);
+ Context *onack = new C_aio_notify_Ack(client->cct, onnotify);
+
+ uint32_t timeout = notify_timeout;
+ if (timeout_ms)
+ timeout = timeout_ms / 1000;
+
+ // Construct RADOS op
+ ::ObjectOperation rd;
+ prepare_assert_ops(&rd);
+ bufferlist inbl;
+ rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
+
+ // Issue RADOS op
+ objecter->linger_notify(linger_op,
+ rd, snap_seq, inbl, NULL,
+ onack, NULL);
+ return 0;
}
int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
c->put_unlock();
}
-
return io_ctx_impl->notify(obj, bl, timeout_ms, preplybl, NULL, NULL);
}
+int librados::IoCtx::aio_notify(const string& oid, AioCompletion *c,
+ bufferlist& bl, uint64_t timeout_ms,
+ bufferlist *preplybl)
+{
+ object_t obj(oid);
+ return io_ctx_impl->aio_notify(obj, c->pc, bl, timeout_ms, preplybl, NULL,
+ NULL);
+}
+
void librados::IoCtx::notify_ack(const std::string& o,
uint64_t notify_id, uint64_t handle,
bufferlist& bl)
return ret;
}
+extern "C" int rados_aio_notify(rados_ioctx_t io, const char *o,
+ rados_completion_t completion,
+ const char *buf, int buf_len,
+ uint64_t timeout_ms, char **reply_buffer,
+ size_t *reply_buffer_len)
+{
+ tracepoint(librados, rados_aio_notify_enter, io, o, completion, buf, buf_len,
+ timeout_ms);
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ object_t oid(o);
+ bufferlist bl;
+ if (buf) {
+ bl.push_back(buffer::copy(buf, buf_len));
+ }
+ librados::AioCompletionImpl *c =
+ reinterpret_cast<librados::AioCompletionImpl*>(completion);
+ int ret = ctx->aio_notify(oid, c, bl, timeout_ms, NULL, reply_buffer,
+ reply_buffer_len);
+ tracepoint(librados, rados_aio_notify_exit, ret);
+ return ret;
+}
+
extern "C" int rados_notify_ack(rados_ioctx_t io, const char *o,
uint64_t notify_id, uint64_t handle,
const char *buf, int buf_len)