uint32_t notify_timeout;
object_locator_t oloc;
- IoCtxImpl() {}
- IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s);
+ Mutex aio_write_list_lock;
+ tid_t aio_write_seq;
+ Cond aio_write_cond;
+ xlist<AioCompletionImpl*> aio_write_list;
+
+ IoCtxImpl() :
+ aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock") {}
+ IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) :
+ ref_cnt(0), client(c), poolid(pid),
+ pool_name(pool_name_), snap_seq(s), assert_ver(0),
+ notify_timeout(g_conf.client_notify_timeout), oloc(pid),
+ aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) {}
void dup(const IoCtxImpl& rhs) {
// Copy everything except the ref count
if (ref_cnt.dec() == 0)
delete this;
}
+
+ void queue_aio_write(struct AioCompletionImpl *c);
+ void complete_aio_write(struct AioCompletionImpl *c);
+ void flush_aio_writes();
};
char *buf;
unsigned maxlen;
+ IoCtxImpl *io;
+ tid_t aio_write_seq;
+ xlist<AioCompletionImpl*>::item aio_write_list_item;
+
AioCompletionImpl() : lock("AioCompletionImpl lock"),
- ref(1), rval(0), released(false), ack(false), safe(false),
- callback_complete(0), callback_safe(0), callback_arg(0),
- pbl(0), buf(0), maxlen(0) { }
+ ref(1), rval(0), released(false), ack(false), safe(false),
+ callback_complete(0), callback_safe(0), callback_arg(0),
+ pbl(0), buf(0), maxlen(0),
+ io(NULL), aio_write_seq(0), aio_write_list_item(this) { }
int set_complete_callback(void *cb_arg, rados_callback_t cb) {
lock.Lock();
}
};
+void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
+{
+ aio_write_list_lock.Lock();
+ assert(!c->io);
+ c->io = this;
+ c->aio_write_seq = ++aio_write_seq;
+ aio_write_list.push_back(&c->aio_write_list_item);
+ aio_write_list_lock.Unlock();
+}
+
+void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
+{
+ aio_write_list_lock.Lock();
+ assert(c->io == this);
+ c->io = NULL;
+ c->aio_write_list_item.remove_myself();
+ aio_write_cond.Signal();
+ aio_write_list_lock.Unlock();
+}
+
+void librados::IoCtxImpl::flush_aio_writes()
+{
+ aio_write_list_lock.Lock();
+ tid_t seq = aio_write_seq;
+ while (!aio_write_list.empty() &&
+ aio_write_list.front()->aio_write_seq <= seq)
+ aio_write_cond.Wait(aio_write_list_lock);
+ aio_write_list_lock.Unlock();
+}
+
struct librados::ObjListCtx {
librados::IoCtxImpl *ctx;
Objecter::ListContext *lc;
c->lock.Lock();
}
+ c->io->complete_aio_write(c);
+
c->put_unlock();
}
C_aio_Safe(AioCompletionImpl *_c) : c(_c) {
}
};
-librados::IoCtxImpl::
-IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s)
- : ref_cnt(0), client(c), poolid(pid),
- pool_name(pool_name_), snap_seq(s), assert_ver(0),
- notify_timeout(c->conf->client_notify_timeout), oloc(pid)
-{}
-
int librados::RadosClient::
connect()
{
if (io.snap_seq != CEPH_NOSNAP)
return -EINVAL;
+ io.queue_aio_write(c);
objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver);
return 0;
if (io.snap_seq != CEPH_NOSNAP)
return -EROFS;
+ io.queue_aio_write(c);
+
Context *onack = new C_aio_Ack(c);
Context *onsafe = new C_aio_Safe(c);
if (io.snap_seq != CEPH_NOSNAP)
return -EROFS;
+ io.queue_aio_write(c);
+
Context *onack = new C_aio_Ack(c);
Context *onsafe = new C_aio_Safe(c);
if (io.snap_seq != CEPH_NOSNAP)
return -EROFS;
+ io.queue_aio_write(c);
+
Context *onack = new C_aio_Ack(c);
Context *onsafe = new C_aio_Safe(c);
return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl);
}
+int librados::IoCtx::
+aio_flush()
+{
+ io_ctx_impl->flush_aio_writes();
+ return 0;
+}
+
int librados::IoCtx::
watch(const string& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx)
{
(librados::AioCompletionImpl*)completion, bl);
}
+extern "C" int rados_aio_flush(rados_ioctx_t io)
+{
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ ctx->flush_aio_writes();
+ return 0;
+}
+
struct C_WatchCB : public librados::WatchCtx {
rados_watchcb_t wcb;
void *arg;