Cond cond;
SafeTimer timer;
- // watch/notify
- struct WatchContext : public Context {
- librados::Rados::WatchCtx *ctx;
- uint64_t cookie;
- uint64_t ver;
-
- WatchContext(librados::Rados::WatchCtx *_ctx) : ctx(_ctx) {}
- ~WatchContext() {
- delete ctx;
- }
- void finish(int r) {
- ctx->finish(r);
- }
- };
-
- Mutex watch_lock;
- uint64_t max_watch_cookie;
- map<uint64_t, WatchContext *> watchers;
-
- pthread_key_t tls_key;
-
- RadosTlsInfo *tls_info() {
- struct RadosTlsInfo *info = (RadosTlsInfo *)pthread_getspecific(tls_key);
- cout << "pthread_getspecific returned " << (void *)info << std::endl;
- if (!info) {
- info = new RadosTlsInfo(tls_key);
- pthread_setspecific(tls_key, info);
- }
- return info;
- }
-
- void set_sync_op_version(eversion_t& ver) {
- RadosTlsInfo *info = tls_info();
- if (info)
- info->objver = ver;
- }
-
public:
RadosClient() : messenger(NULL), lock("radosclient"), timer(lock),
watch_lock("watch_lock"), max_watch_cookie(0) {
uint64_t cookie;
uint64_t ver;
librados::Rados::WatchCtx *ctx;
+ ObjectOperation *op;
- WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx) : pool_ctx(_pc), oid(_oc), ctx(_ctx) {}
+ WatchContext(PoolCtx& _pc, const object_t& _oc, librados::Rados::WatchCtx *_ctx,
+ ObjectOperation *_op) : pool_ctx(_pc), oid(_oc), ctx(_ctx), op(_op) {}
~WatchContext() {
delete ctx;
}
pool.last_objver = ver;
}
- int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx, uint64_t *cookie) {
- WatchContext *wc = new WatchContext(pool, oid, ctx);
+ int register_watcher(PoolCtx& pool, const object_t& oid, librados::Rados::WatchCtx *ctx,
+ ObjectOperation *op, uint64_t *cookie) {
+ WatchContext *wc = new WatchContext(pool, oid, ctx, op);
if (!wc)
return -ENOMEM;
watch_lock.Lock();
utime_t ut = g_clock.now();
bufferlist inbl, outbl;
- int r = register_watcher(pool, oid, ctx, cookie);
- if (r < 0)
+ ObjectOperation *rd = new ObjectOperation();
+ if (!rd)
+ return -ENOMEM;
+ int r = register_watcher(pool, oid, ctx, rd, cookie);
+ if (r < 0) {
+ delete rd;
return r;
+ }
Mutex mylock("RadosClient::watch::mylock");
Cond cond;
lock.Lock();
object_locator_t oloc(pool.poolid);
- ObjectOperation rd;
+
if (pool.assert_ver) {
- rd.assert_version(pool.assert_ver);
+ rd->assert_version(pool.assert_ver);
pool.assert_ver = 0;
}
- rd.watch(*cookie, ver, 1);
- objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver);
+ rd->watch(*cookie, ver, 1);
+ rd->set_linger(true);
+ objecter->read(oid, oloc, *rd, pool.snap_seq, &outbl, 0, onack, &objver);
lock.Unlock();
mylock.Lock();
set_sync_op_version(pool, objver);
+ if (r < 0) {
+ unregister_watcher(*cookie);
+ }
+
return r;
}
eversion_t objver;
uint64_t cookie;
C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
+ ObjectOperation rd;
- r = register_watcher(pool, oid, ctx, &cookie);
+ r = register_watcher(pool, oid, ctx, &rd, &cookie);
if (r < 0)
return r;
object_locator_t oloc(pool.poolid);
- ObjectOperation rd;
if (pool.assert_ver) {
rd.assert_version(pool.assert_ver);
pool.assert_ver = 0;
vector<OSDOp> ops;
int flags;
int priority;
+ bool linger;
- ObjectOperation() : flags(0), priority(0) {}
+ ObjectOperation() : flags(0), priority(0), linger(false) {}
void add_op(int op) {
int s = ops.size();
ops[s].op.pgls.count = count;
ops[s].op.pgls.cookie = cookie;
}
+ void set_linger(bool l) {
+ linger = l;
+ }
// ------
eversion_t *objver;
+ bool linger;
+
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, eversion_t *ov) :
session_item(this),
con(NULL),
snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
- paused(false), objver(ov) {
+ paused(false), objver(ov), linger(false) {
ops.swap(op);
}
};
cout << "open pool result = " << r << " pool = " << pool << std::endl;
r = rados.write(pool, oid, 0, bl, bl.length());
- uint64_t objver = rados.get_last_ver(pool);
+ uint64_t objver = rados.get_last_version(pool);
cout << "rados.write returned " << r << " last_ver=" << objver << std::endl;
uint64_t handle;
cout << "*** press enter to continue ***" << std::endl;
getchar();
- rados.set_assert_ver(pool, objver);
+ rados.set_assert_version(pool, objver);
r = rados.write(pool, oid, 0, bl, bl.length() - 1);
cout << "rados.write returned " << r << std::endl;