From a4864bd85409976c953d052cbe01a0e8e1f3f4af Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 29 Oct 2010 13:53:42 -0700 Subject: [PATCH] librados: enable object versioning --- src/include/librados.h | 2 + src/include/librados.hpp | 6 +- src/librados.cc | 232 ++++++++++++++++++++++++++++----------- src/osdc/Objecter.cc | 3 + src/osdc/Objecter.h | 109 ++++++++++-------- src/testradospp.cc | 20 +++- 6 files changed, 262 insertions(+), 110 deletions(-) diff --git a/src/include/librados.h b/src/include/librados.h index acd7b1c4282c8..5eb7fbf7ff6e7 100644 --- a/src/include/librados.h +++ b/src/include/librados.h @@ -76,6 +76,8 @@ int rados_snap_lookup(rados_pool_t pool, const char *name, rados_snap_t *id); int rados_snap_get_name(rados_pool_t pool, rados_snap_t id, char *name, int maxlen); /* sync io */ +uint64_t rados_get_last_ver(); + int rados_write(rados_pool_t pool, const char *oid, off_t off, const char *buf, size_t len); int rados_write_full(rados_pool_t pool, const char *oid, off_t off, const char *buf, size_t len); int rados_read(rados_pool_t pool, const char *oid, off_t off, char *buf, size_t len); diff --git a/src/include/librados.hpp b/src/include/librados.hpp index 049652e1398e6..4660481654ccc 100644 --- a/src/include/librados.hpp +++ b/src/include/librados.hpp @@ -68,6 +68,7 @@ public: void set_snap(pool_t pool, snap_t seq); int set_snap_context(pool_t pool, snap_t seq, std::vector& snaps); + uint64_t get_last_ver(); int create(pool_t pool, const std::string& oid, bool exclusive); @@ -132,6 +133,7 @@ public: bool is_complete(); bool is_safe(); int get_return_value(); + int get_obj_ver(); void release(); }; @@ -144,11 +146,11 @@ public: class WatchCtx { public: - virtual int finish() = 0; + virtual bool finish(int r) = 0; }; // watch/notify - int watch(pool_t pool, const string& o, uint64_t ver, uint64_t *cookie, Context *ctx); + int watch(pool_t pool, const string& o, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx); int unwatch(pool_t pool, const string& o, uint64_t ver, uint64_t cookie); }; diff --git a/src/librados.cc b/src/librados.cc index 92beb9172e34b..e064b902e16f6 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -19,6 +19,7 @@ #include #include #include +#include using namespace std; #include "config.h" @@ -46,6 +47,22 @@ using namespace std; #define dout_prefix *_dout << dbeginl << "librados: " +struct RadosTlsInfo { + pthread_key_t tls_key; + eversion_t objver; + + RadosTlsInfo(pthread_key_t key) : tls_key(key), objver(eversion_t(-1, -1)) {} +}; + +static void rados_tls_destructor(void *val) +{ + RadosTlsInfo *info = (RadosTlsInfo *)val; + pthread_key_t key = info->tls_key; + delete info; + cout << "pthread_getspecific NULL " << std::endl; + pthread_setspecific(key, NULL); +} + class RadosClient : public Dispatcher { OSDMap osdmap; @@ -74,10 +91,43 @@ class RadosClient : public Dispatcher Cond cond; SafeTimer timer; + // watch/notify + struct WatchContext : public Context { + librados::Rados::WatchCtx *ctx; + uint64_t cookie; + uint64_t ver; + + WatchContext(librados::Rados::WatchCtx *_ctx) : ctx(_ctx) {} + ~WatchContext() { + delete ctx; + } + void finish(int r) { + ctx->finish(r); + } + }; + Mutex watch_lock; uint64_t max_watch_cookie; - map watchers; + map watchers; + pthread_key_t tls_key; + + RadosTlsInfo *tls_info() { + struct RadosTlsInfo *info = (RadosTlsInfo *)pthread_getspecific(tls_key); + cout << "pthread_getspecific returned " << (void *)info << std::endl; + if (!info) { + info = new RadosTlsInfo(tls_key); + pthread_setspecific(tls_key, info); + } + return info; + } + + void set_sync_op_version(eversion_t& ver) { + RadosTlsInfo *info = tls_info(); + if (info) + info->objver = ver; + } + public: RadosClient() : messenger(NULL), lock("radosclient"), timer(lock), watch_lock("watch_lock"), max_watch_cookie(0) { @@ -170,6 +220,7 @@ public: int ref, rval; bool released; bool ack, safe; + eversion_t objver; rados_callback_t callback_complete, callback_safe; void *callback_arg; @@ -230,6 +281,12 @@ public: lock.Unlock(); return r; } + uint64_t get_obj_ver() { + lock.Lock(); + eversion_t v = objver; + lock.Unlock(); + return v.version; + } void get() { lock.Lock(); @@ -337,26 +394,29 @@ public: return c; } - // watch/notify - struct WatchContext : public Context { - librados::Rados::WatchCtx *ctx; - uint64_t cookie; - uint64_t ver; - - WatchContext(librados::Rados::WatchCtx *_ctx) : ctx(_ctx) {} - ~WatchContext() { - delete _ctx; - } - int finish() { - return ctx->finish(); - } - } - int watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_t *cookie, WatchContext *ctx); + int watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx); int unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie); + + eversion_t last_version() { + eversion_t ver; + RadosTlsInfo *info = tls_info(); + if (info) + ver = info->objver; + else + ver = eversion_t(-1, -1); + return ver; + } }; bool RadosClient::init() { + int ret = pthread_key_create(&tls_key, rados_tls_destructor); + if (ret < 0) { + int err = errno; + dout(0) << "pthread_key_create returned " << err << dendl; + return false; + } + // get monmap if (monclient.build_initial_monmap() < 0) return false; @@ -825,6 +885,7 @@ int RadosClient::create(PoolCtx& pool, const object_t& oid, bool exclusive) Cond cond; bool done; int r; + eversion_t ver; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); @@ -832,7 +893,7 @@ int RadosClient::create(PoolCtx& pool, const object_t& oid, bool exclusive) object_locator_t oloc(pool.poolid); objecter->create(oid, oloc, pool.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0), - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -840,34 +901,13 @@ int RadosClient::create(PoolCtx& pool, const object_t& oid, bool exclusive) cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; } int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len) { -#if 0 - static SnapContext snapc; - static int i; - - snapc.snaps.clear(); - -#define START_SNAP 1 - - if (snapc.seq == 0) - snapc.seq = START_SNAP - 1; - - ++snapc.seq; - for (i=0; i::iterator iter = snapc.snaps.begin(); - iter != snapc.snaps.end(); ++iter, ++i) { - dout(0) << "snapc[" << i << "] = " << *iter << dendl; - } - dout(0) << "seq=" << snapc.seq << dendl; - dout(0) << "snapc=" << snapc << dendl; -#endif utime_t ut = g_clock.now(); /* can't write to a snapshot */ @@ -880,12 +920,13 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->write(oid, oloc, off, len, pool.snapc, bl, ut, 0, - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -893,6 +934,8 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + if (r < 0) return r; @@ -913,12 +956,13 @@ int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl) int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->write_full(oid, oloc, pool.snapc, bl, ut, 0, - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -926,6 +970,8 @@ int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl) cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; } @@ -934,6 +980,7 @@ int RadosClient::aio_read(PoolCtx& pool, const object_t oid, off_t off, bufferli { Context *onack = new C_aio_Ack(c); + eversion_t ver; c->pbl = pbl; @@ -941,7 +988,7 @@ int RadosClient::aio_read(PoolCtx& pool, const object_t oid, off_t off, bufferli object_locator_t oloc(pool.poolid); objecter->read(oid, oloc, off, len, pool.snap_seq, &c->bl, 0, - onack); + onack, &c->objver); return 0; } @@ -957,7 +1004,7 @@ int RadosClient::aio_read(PoolCtx& pool, const object_t oid, off_t off, char *bu object_locator_t oloc(pool.poolid); objecter->read(oid, oloc, off, len, pool.snap_seq, &c->bl, 0, - onack); + onack, &c->objver); return 0; } @@ -975,7 +1022,7 @@ int RadosClient::aio_write(PoolCtx& pool, const object_t oid, off_t off, const b object_locator_t oloc(pool.poolid); objecter->write(oid, oloc, off, len, pool.snapc, bl, ut, 0, - onack, onsafe); + onack, onsafe, &c->objver); return 0; } @@ -993,7 +1040,7 @@ int RadosClient::aio_write_full(PoolCtx& pool, const object_t oid, const bufferl object_locator_t oloc(pool.poolid); objecter->write_full(oid, oloc, pool.snapc, bl, ut, 0, - onack, onsafe); + onack, onsafe, &c->objver); return 0; } @@ -1008,12 +1055,13 @@ int RadosClient::remove(PoolCtx& pool, const object_t& oid) bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->remove(oid, oloc, snapc, ut, 0, - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -1021,6 +1069,8 @@ int RadosClient::remove(PoolCtx& pool, const object_t& oid) cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; } @@ -1038,13 +1088,14 @@ int RadosClient::trunc(PoolCtx& pool, const object_t& oid, size_t size) int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->trunc(oid, oloc, pool.snapc, ut, 0, size, 0, - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -1052,6 +1103,8 @@ int RadosClient::trunc(PoolCtx& pool, const object_t& oid, size_t size) cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; } @@ -1064,6 +1117,7 @@ int RadosClient::tmap_update(PoolCtx& pool, const object_t& oid, bufferlist& cmd bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; bufferlist outbl; @@ -1072,7 +1126,7 @@ int RadosClient::tmap_update(PoolCtx& pool, const object_t& oid, bufferlist& cmd object_locator_t oloc(pool.poolid); ObjectOperation wr; wr.tmap_update(cmdbl); - objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL); + objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -1080,6 +1134,8 @@ int RadosClient::tmap_update(PoolCtx& pool, const object_t& oid, bufferlist& cmd cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; } @@ -1094,13 +1150,14 @@ int RadosClient::exec(PoolCtx& pool, const object_t& oid, const char *cls, const bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); ObjectOperation rd; rd.call(cls, method, inbl); - objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack); + objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &ver); lock.Unlock(); mylock.Lock(); @@ -1108,6 +1165,8 @@ int RadosClient::exec(PoolCtx& pool, const object_t& oid, const char *cls, const cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; } @@ -1120,12 +1179,13 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->read(oid, oloc, off, len, pool.snap_seq, &bl, 0, - onack); + onack, &ver); lock.Unlock(); mylock.Lock(); @@ -1134,6 +1194,8 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& mylock.Unlock(); dout(10) << "Objecter returned from read r=" << r << dendl; + set_sync_op_version(ver); + if (r < 0) return r; @@ -1224,6 +1286,7 @@ int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); uint64_t size; utime_t mtime; + eversion_t ver; if (!psize) psize = &size; @@ -1232,7 +1295,7 @@ int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_ object_locator_t oloc(pool.poolid); objecter->stat(oid, oloc, pool.snap_seq, psize, &mtime, 0, - onack); + onack, &ver); lock.Unlock(); mylock.Lock(); @@ -1245,6 +1308,8 @@ int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_ *pmtime = mtime.sec(); } + set_sync_op_version(ver); + return r; } @@ -1257,12 +1322,13 @@ int RadosClient::getxattr(PoolCtx& pool, const object_t& oid, const char *name, bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->getxattr(oid, oloc, name, pool.snap_seq, &bl, 0, - onack); + onack, &ver); lock.Unlock(); mylock.Lock(); @@ -1271,6 +1337,8 @@ int RadosClient::getxattr(PoolCtx& pool, const object_t& oid, const char *name, mylock.Unlock(); dout(10) << "Objecter returned from getxattr" << dendl; + set_sync_op_version(ver); + if (r < 0) return r; @@ -1291,13 +1359,14 @@ int RadosClient::rmxattr(PoolCtx& pool, const object_t& oid, const char *name) int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; object_locator_t oloc(pool.poolid); lock.Lock(); objecter->removexattr(oid, oloc, name, pool.snapc, ut, 0, - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -1305,6 +1374,8 @@ int RadosClient::rmxattr(PoolCtx& pool, const object_t& oid, const char *name) cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + if (r < 0) return r; @@ -1325,12 +1396,13 @@ int RadosClient::setxattr(PoolCtx& pool, const object_t& oid, const char *name, int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); object_locator_t oloc(pool.poolid); objecter->setxattr(oid, oloc, name, pool.snapc, bl, ut, 0, - onack, NULL); + onack, NULL, &ver); lock.Unlock(); mylock.Lock(); @@ -1338,6 +1410,8 @@ int RadosClient::setxattr(PoolCtx& pool, const object_t& oid, const char *name, cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + if (r < 0) return r; @@ -1356,6 +1430,7 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map aset; objecter->getxattrs(oid, oloc, pool.snap_seq, aset, - 0, onack); + 0, onack, &ver); lock.Unlock(); attrset.clear(); @@ -1379,17 +1454,22 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, maposdmap->make_object_layout(oid, pool.poolid); ObjectOperation rd; rd.watch(*cookie, ver, 1); - objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack); + objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack, &objver); lock.Unlock(); mylock.Lock(); @@ -1410,6 +1491,8 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(objver); + return r; } @@ -1419,8 +1502,10 @@ int RadosClient::unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie) bufferlist inbl, outbl; watch_lock.Lock(); - map::iterator iter = watchers.find(cookie); + map::iterator iter = watchers.find(cookie); if (iter != watchers.end()) { + WatchContext *ctx = iter->second; + delete ctx; watchers.erase(iter); } watch_lock.Unlock(); @@ -1430,12 +1515,13 @@ int RadosClient::unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie) bool done; int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; lock.Lock(); ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); ObjectOperation rd; rd.watch(cookie, 0, 1); - objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack); + objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack, &ver); lock.Unlock(); mylock.Lock(); @@ -1443,6 +1529,8 @@ int RadosClient::unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie) cond.Wait(mylock); mylock.Unlock(); + set_sync_op_version(ver); + return r; }// --------------------------------------------- @@ -1578,6 +1666,15 @@ void Rados::list_objects_close(Rados::ListCtx ctx) delete h; } +uint64_t Rados::get_last_ver() +{ + if (!client) + return -EINVAL; + + eversion_t ver = client->last_version(); + return ver.version; +} + int Rados::create(rados_pool_t pool, const string& o, bool exclusive) { @@ -1888,6 +1985,11 @@ int Rados::AioCompletion::get_return_value() RadosClient::AioCompletion *c = (RadosClient::AioCompletion *)pc; return c->get_return_value(); } +int Rados::AioCompletion::get_obj_ver() +{ + RadosClient::AioCompletion *c = (RadosClient::AioCompletion *)pc; + return c->get_obj_ver(); +} void Rados::AioCompletion::release() { RadosClient::AioCompletion *c = (RadosClient::AioCompletion *)pc; @@ -1896,7 +1998,7 @@ void Rados::AioCompletion::release() // watch/notify int Rados::watch(pool_t pool, const string& o, - uint64_t ver, uint64_t *cookie, Context *ctx) + uint64_t ver, uint64_t *cookie, Rados::WatchCtx *ctx) { if (!client) return -EINVAL; @@ -2092,6 +2194,12 @@ extern "C" int rados_read(rados_pool_t pool, const char *o, off_t off, char *buf return ret; } +extern "C" uint64_t rados_get_last_ver() +{ + eversion_t ver = radosp->last_version(); + return ver.version; +} + extern "C" int rados_create_pool(const char *name) { string sname(name); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 68ac539dca19d..22c79bd1903c3 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -592,6 +592,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) return; } + if (op->objver) + *op->objver = m->get_version(); + // got data? if (op->outbl) { if (op->outbl->length()) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index edd2ef5de17c9..13eb2057454bd 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -261,14 +261,16 @@ public: bool paused; + eversion_t *objver; + Op(const object_t& o, const object_locator_t& ol, vector& op, - int f, Context *ac, Context *co) : + int f, Context *ac, Context *co, eversion_t *ov) : session_item(this), oid(o), oloc(ol), con(NULL), snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), tid(0), attempts(0), - paused(false) { + paused(false), objver(ov) { ops.swap(op); } }; @@ -522,8 +524,8 @@ private: tid_t mutate(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, const SnapContext& snapc, utime_t mtime, int flags, - Context *onack, Context *oncommit) { - Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Context *onack, Context *oncommit, eversion_t *objver = NULL) { + Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->priority = op.priority; o->mtime = mtime; o->snapc = snapc; @@ -532,8 +534,8 @@ private: tid_t read(const object_t& oid, const object_locator_t& oloc, ObjectOperation& op, snapid_t snapid, bufferlist *pbl, int flags, - Context *onack) { - Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL); + Context *onack, eversion_t *objver = NULL) { + Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL, objver); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; @@ -543,11 +545,12 @@ private: // high-level helpers tid_t stat(const object_t& oid, const object_locator_t& oloc, snapid_t snap, uint64_t *psize, utime_t *pmtime, int flags, - Context *onfinish) { + Context *onfinish, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_STAT; C_Stat *fin = new C_Stat(psize, pmtime, onfinish); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver); o->snapid = snap; o->outbl = &fin->bl; return op_submit(o); @@ -555,14 +558,15 @@ private: tid_t read(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish) { + Context *onfinish, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_READ; ops[0].op.extent.offset = off; ops[0].op.extent.length = len; ops[0].op.extent.truncate_size = 0; ops[0].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -570,14 +574,15 @@ private: tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, uint64_t trunc_size, __u32 trunc_seq, - Context *onfinish) { + Context *onfinish, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_READ; ops[0].op.extent.offset = off; ops[0].op.extent.length = len; ops[0].op.extent.truncate_size = trunc_size; ops[0].op.extent.truncate_seq = trunc_seq; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -591,7 +596,7 @@ private: ops[0].op.extent.length = len; ops[0].op.extent.truncate_size = 0; ops[0].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -605,7 +610,7 @@ private: ops[0].op.extent.length = len; ops[0].op.extent.truncate_size = 0; ops[0].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -613,14 +618,15 @@ private: tid_t getxattr(const object_t& oid, const object_locator_t& oloc, const char *name, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish) { + Context *onfinish, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_GETXATTR; ops[0].op.xattr.name_len = (name ? strlen(name) : 0); ops[0].op.xattr.value_len = 0; if (name) ops[0].data.append(name); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -628,11 +634,12 @@ private: tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, snapid_t snap, map& attrset, - int flags, Context *onfinish) { + int flags, Context *onfinish, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_GETXATTRS; C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver); o->snapid = snap; o->outbl = &fin->bl; return op_submit(o); @@ -640,16 +647,18 @@ private: tid_t read_full(const object_t& oid, const object_locator_t& oloc, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish) { - return read(oid, oloc, 0, 0, snap, pbl, flags | CEPH_OSD_FLAG_READ, onfinish); + Context *onfinish, + eversion_t *objver = NULL) { + return read(oid, oloc, 0, 0, snap, pbl, flags | CEPH_OSD_FLAG_READ, onfinish, objver); } // writes tid_t _modify(const object_t& oid, const object_locator_t& oloc, vector& ops, utime_t mtime, const SnapContext& snapc, int flags, - Context *onack, Context *oncommit) { - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -657,7 +666,8 @@ private: tid_t write(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_WRITE; ops[0].op.extent.offset = off; @@ -665,7 +675,7 @@ private: ops[0].op.extent.truncate_size = 0; ops[0].op.extent.truncate_seq = 0; ops[0].data = bl; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -673,8 +683,9 @@ private: tid_t write_trunc(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, - uint64_t trunc_size, __u32 trunc_seq, - Context *onack, Context *oncommit) { + uint64_t trunc_size, __u32 trunc_seq, + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_WRITE; ops[0].op.extent.offset = off; @@ -682,20 +693,21 @@ private: ops[0].op.extent.truncate_size = trunc_size; ops[0].op.extent.truncate_seq = trunc_seq; ops[0].data = bl; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } tid_t write_full(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_WRITEFULL; ops[0].op.extent.offset = 0; ops[0].op.extent.length = bl.length(); ops[0].data = bl; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -704,36 +716,39 @@ private: const SnapContext& snapc, utime_t mtime, int flags, uint64_t trunc_size, __u32 trunc_seq, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_TRUNCATE; ops[0].op.extent.offset = trunc_size; ops[0].op.extent.truncate_size = trunc_size; ops[0].op.extent.truncate_seq = trunc_seq; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } tid_t zero(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, const SnapContext& snapc, utime_t mtime, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_ZERO; ops[0].op.extent.offset = off; ops[0].op.extent.length = len; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } tid_t rollback_object(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, snapid_t snapid, - utime_t mtime, Context *onack, Context *oncommit) { + utime_t mtime, Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_ROLLBACK; ops[0].op.snap.snapid = snapid; - Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -741,39 +756,42 @@ private: tid_t create(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, utime_t mtime, int global_flags, int create_flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_CREATE; ops[0].op.flags = create_flags; - Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } tid_t remove(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, utime_t mtime, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_DELETE; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); } tid_t lock(const object_t& oid, const object_locator_t& oloc, int op, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, eversion_t *objver = NULL) { SnapContext snapc; // no snapc for lock ops vector ops(1); ops[0].op.op = op; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->snapc = snapc; return op_submit(o); } tid_t setxattr(const object_t& oid, const object_locator_t& oloc, const char *name, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_SETXATTR; ops[0].op.xattr.name_len = (name ? strlen(name) : 0); @@ -781,7 +799,7 @@ private: if (name) ops[0].data.append(name); ops[0].data.append(bl); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -789,14 +807,15 @@ private: tid_t removexattr(const object_t& oid, const object_locator_t& oloc, const char *name, const SnapContext& snapc, utime_t mtime, int flags, - Context *onack, Context *oncommit) { + Context *onack, Context *oncommit, + eversion_t *objver = NULL) { vector ops(1); ops[0].op.op = CEPH_OSD_OP_RMXATTR; ops[0].op.xattr.name_len = (name ? strlen(name) : 0); ops[0].op.xattr.value_len = 0; if (name) ops[0].data.append(name); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o); diff --git a/src/testradospp.cc b/src/testradospp.cc index 9361ed4b00e98..b5b907dc8e16f 100644 --- a/src/testradospp.cc +++ b/src/testradospp.cc @@ -30,6 +30,15 @@ void buf_to_hex(const unsigned char *buf, int len, char *str) } } +class C_Watch : public Rados::WatchCtx { +public: + C_Watch() {} + bool finish(int r) { + cout << "C_Watch::finish()" << std::endl; + return true; + } +}; + int main(int argc, const char **argv) { Rados rados; @@ -59,7 +68,16 @@ int main(int argc, const char **argv) cout << "open pool result = " << r << " pool = " << pool << std::endl; r = rados.write(pool, oid, 0, bl, bl.length()); - cout << "rados.write returned " << r << std::endl; + cout << "rados.write returned " << r << " last_ver=" << rados.get_last_ver() << std::endl; + + uint64_t cookie; + C_Watch wc; + r = rados.watch(pool, oid, 0, &cookie, &wc); + cout << "rados.watch returned " << r << std::endl; + + exit(0); + + r = rados.write(pool, oid, 0, bl, bl.length() - 1); cout << "rados.write returned " << r << std::endl; r = rados.write(pool, oid, 0, bl, bl.length() - 2); -- 2.39.5