From 99e1e4de29c2c2a01db289cbad48417de580a03c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 1 Nov 2010 15:54:09 -0700 Subject: [PATCH] librados: assert_version on sync operations --- src/include/librados.hpp | 3 + src/include/rados.h | 3 + src/librados.cc | 136 +++++++++++++++++++--- src/osd/ReplicatedPG.cc | 14 ++- src/osdc/Objecter.cc | 2 +- src/osdc/Objecter.h | 242 +++++++++++++++++++++++---------------- src/testradospp.cc | 9 +- 7 files changed, 283 insertions(+), 126 deletions(-) diff --git a/src/include/librados.hpp b/src/include/librados.hpp index 00cc10aea323b..1d14883d46ac9 100644 --- a/src/include/librados.hpp +++ b/src/include/librados.hpp @@ -153,6 +153,9 @@ public: int watch(pool_t pool, const string& o, uint64_t ver, uint64_t *handle, librados::Rados::WatchCtx *ctx); int unwatch(pool_t pool, const string& o, uint64_t handle); int notify(pool_t pool, const string& o, uint64_t ver); + + /* assert version for next sync operations */ + void set_assert_ver(pool_t pool, uint64_t ver); }; } diff --git a/src/include/rados.h b/src/include/rados.h index a09f3f0ac824c..e4a0c40a265fc 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -190,6 +190,9 @@ enum { CEPH_OSD_OP_NOTIFY = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6, CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 7, + /* versioning */ + CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8, + /* write */ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1, CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2, diff --git a/src/librados.cc b/src/librados.cc index bce1e026df87c..7a4ec4701c0d7 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -144,8 +144,9 @@ public: string name; snapid_t snap_seq; SnapContext snapc; + uint64_t assert_ver; - PoolCtx(int pid, const char *n, snapid_t s = CEPH_NOSNAP) : poolid(pid), name(n), snap_seq(s) {} + PoolCtx(int pid, const char *n, snapid_t s = CEPH_NOSNAP) : poolid(pid), name(n), snap_seq(s), assert_ver(0) {} void set_snap(snapid_t s) { if (!s) @@ -492,6 +493,9 @@ public: ver = eversion_t(-1, -1); return ver; } + void set_assert_ver(PoolCtx& pool, uint64_t ver) { + pool.assert_ver = ver; + } }; bool RadosClient::init() @@ -1012,11 +1016,18 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } + lock.Lock(); object_locator_t oloc(pool.poolid); objecter->write(oid, oloc, off, len, pool.snapc, bl, ut, 0, - onack, NULL, &ver); + onack, NULL, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1046,13 +1057,20 @@ int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl) int r; Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } lock.Lock(); object_locator_t oloc(pool.poolid); objecter->write_full(oid, oloc, pool.snapc, bl, ut, 0, - onack, NULL, &ver); + onack, NULL, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1147,11 +1165,18 @@ int RadosClient::remove(PoolCtx& pool, const object_t& oid) Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } + lock.Lock(); object_locator_t oloc(pool.poolid); objecter->remove(oid, oloc, snapc, ut, 0, - onack, NULL, &ver); + onack, NULL, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1180,12 +1205,19 @@ int RadosClient::trunc(PoolCtx& pool, const object_t& oid, size_t size) Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } + lock.Lock(); object_locator_t oloc(pool.poolid); objecter->trunc(oid, oloc, pool.snapc, ut, 0, size, 0, - onack, NULL, &ver); + onack, NULL, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1215,6 +1247,10 @@ int RadosClient::tmap_update(PoolCtx& pool, const object_t& oid, bufferlist& cmd SnapContext snapc; object_locator_t oloc(pool.poolid); ObjectOperation wr; + if (pool.assert_ver) { + wr.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + } wr.tmap_update(cmdbl); objecter->mutate(oid, oloc, wr, snapc, ut, 0, onack, NULL, &ver); lock.Unlock(); @@ -1246,6 +1282,10 @@ int RadosClient::exec(PoolCtx& pool, const object_t& oid, const char *cls, const lock.Lock(); object_locator_t oloc(pool.poolid); ObjectOperation rd; + if (pool.assert_ver) { + rd.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + } rd.call(cls, method, inbl); objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &ver); lock.Unlock(); @@ -1271,11 +1311,17 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } lock.Lock(); object_locator_t oloc(pool.poolid); objecter->read(oid, oloc, off, len, pool.snap_seq, &bl, 0, - onack, &ver); + onack, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1381,11 +1427,17 @@ int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_ if (!psize) psize = &size; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } lock.Lock(); object_locator_t oloc(pool.poolid); objecter->stat(oid, oloc, pool.snap_seq, psize, &mtime, 0, - onack, &ver); + onack, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1414,11 +1466,17 @@ int RadosClient::getxattr(PoolCtx& pool, const object_t& oid, const char *name, Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } lock.Lock(); object_locator_t oloc(pool.poolid); objecter->getxattr(oid, oloc, name, pool.snap_seq, &bl, 0, - onack, &ver); + onack, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1453,10 +1511,16 @@ int RadosClient::rmxattr(PoolCtx& pool, const object_t& oid, const char *name) object_locator_t oloc(pool.poolid); + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } lock.Lock(); objecter->removexattr(oid, oloc, name, pool.snapc, ut, 0, - onack, NULL, &ver); + onack, NULL, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1488,11 +1552,17 @@ int RadosClient::setxattr(PoolCtx& pool, const object_t& oid, const char *name, Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); eversion_t ver; + ObjectOperation op, *pop = NULL; + if (pool.assert_ver) { + op.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + pop = &op; + } lock.Lock(); object_locator_t oloc(pool.poolid); objecter->setxattr(oid, oloc, name, pool.snapc, bl, ut, 0, - onack, NULL, &ver); + onack, NULL, &ver, pop); lock.Unlock(); mylock.Lock(); @@ -1522,6 +1592,12 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map aset; objecter->getxattrs(oid, oloc, pool.snap_seq, aset, - 0, onack, &ver); + 0, onack, &ver, pop); lock.Unlock(); attrset.clear(); @@ -1581,10 +1657,14 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_ eversion_t objver; lock.Lock(); - ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); + object_locator_t oloc(pool.poolid); ObjectOperation rd; + if (pool.assert_ver) { + rd.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + } rd.watch(*cookie, ver, 1); - objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack, &objver); + objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver); lock.Unlock(); mylock.Lock(); @@ -1607,10 +1687,14 @@ int RadosClient::_notify_ack(PoolCtx& pool, const object_t& oid, uint64_t notify Cond cond; eversion_t objver; - ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); + object_locator_t oloc(pool.poolid); ObjectOperation rd; + if (pool.assert_ver) { + rd.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + } rd.notify_ack(notify_id, ver); - objecter->read(oid, layout, rd, pool.snap_seq, NULL, 0, 0, 0); + objecter->read(oid, oloc, rd, pool.snap_seq, NULL, 0, 0, 0); return 0; } @@ -1637,10 +1721,14 @@ int RadosClient::unwatch(PoolCtx& pool, const object_t& oid, uint64_t cookie) eversion_t ver; lock.Lock(); - ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); + object_locator_t oloc(pool.poolid); ObjectOperation rd; + if (pool.assert_ver) { + rd.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + } rd.watch(cookie, 0, 1); - objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack, &ver); + objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &ver); lock.Unlock(); mylock.Lock(); @@ -1672,11 +1760,15 @@ int RadosClient::notify(PoolCtx& pool, const object_t& oid, uint64_t ver) if (r < 0) return r; - ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid); + object_locator_t oloc(pool.poolid); ObjectOperation rd; + if (pool.assert_ver) { + rd.assert_ver(pool.assert_ver); + pool.assert_ver = 0; + } rd.notify(cookie, ver); lock.Lock(); - objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack, &objver); + objecter->read(oid, oloc, rd, pool.snap_seq, &outbl, 0, onack, &objver); lock.Unlock(); mylock_all.Lock(); @@ -2185,6 +2277,12 @@ int Rados::notify(pool_t pool, const string& o, uint64_t ver) return client->notify(*(RadosClient::PoolCtx *)pool, oid, ver); } +void Rados::set_assert_ver(pool_t pool, uint64_t ver) +{ + if (!client) + return; + client->set_assert_ver(*(RadosClient::PoolCtx *)pool, ver); +} } // --------------------------------------------- diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index cdcc22c2eb5d0..a8d50b0ea8fa6 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1097,7 +1097,19 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, } break; - case CEPH_OSD_OP_NOTIFY: + case CEPH_OSD_OP_ASSERT_VER: + { + uint64_t ver = op.watch.ver; + if (!ver) + result = -EINVAL; + else if (ver < oi.user_version.version) + result = -ERANGE; + else if (ver > oi.user_version.version) + result = -EOVERFLOW; + break; + } + + case CEPH_OSD_OP_NOTIFY: { dout(0) << "CEPH_OSD_OP_NOTIFY" << dendl; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 22c79bd1903c3..7e3c1db762de2 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -699,7 +699,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) { object_locator_t oloc(list_context->pool_id); // - Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL); + Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL); o->priority = op.priority; o->snapid = list_context->pool_snap_seq; o->outbl = bl; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index e22614aa5aae9..bc7c0591baddd 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -197,6 +197,10 @@ struct ObjectOperation { void notify_ack(uint64_t notify_id, uint64_t ver) { add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0); } + + void assert_ver(uint64_t ver) { + add_watch(CEPH_OSD_OP_ASSERT_VER, 0, ver, 0); + } }; @@ -546,13 +550,30 @@ private: return op_submit(o); } + int init_ops(vector& ops, int ops_count, ObjectOperation *extra_ops) { + int i; + + if (extra_ops) + ops_count += extra_ops->ops.size(); + + ops.resize(ops_count); + + for (i=0; iops[i]; + } + + return i; + } + + // high-level helpers tid_t stat(const object_t& oid, const object_locator_t& oloc, snapid_t snap, uint64_t *psize, utime_t *pmtime, int flags, Context *onfinish, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_STAT; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_STAT; C_Stat *fin = new C_Stat(psize, pmtime, onfinish); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver); o->snapid = snap; @@ -563,13 +584,14 @@ private: tid_t read(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_READ; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; - ops[0].op.extent.truncate_size = 0; - ops[0].op.extent.truncate_seq = 0; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_READ; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; + ops[i].op.extent.truncate_size = 0; + ops[i].op.extent.truncate_seq = 0; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; @@ -579,13 +601,14 @@ private: uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, uint64_t trunc_size, __u32 trunc_seq, Context *onfinish, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_READ; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; - ops[0].op.extent.truncate_size = trunc_size; - ops[0].op.extent.truncate_seq = trunc_seq; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_READ; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; + ops[i].op.extent.truncate_size = trunc_size; + ops[i].op.extent.truncate_seq = trunc_seq; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; @@ -593,13 +616,15 @@ private: } tid_t mapext(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_MAPEXT; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; - ops[0].op.extent.truncate_size = 0; - ops[0].op.extent.truncate_seq = 0; + Context *onfinish, + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_MAPEXT; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; + ops[i].op.extent.truncate_size = 0; + ops[i].op.extent.truncate_seq = 0; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; @@ -607,13 +632,15 @@ private: } tid_t sparse_read(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, - Context *onfinish) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_SPARSE_READ; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; - ops[0].op.extent.truncate_size = 0; - ops[0].op.extent.truncate_seq = 0; + Context *onfinish, + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_SPARSE_READ; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; + ops[i].op.extent.truncate_size = 0; + ops[i].op.extent.truncate_seq = 0; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; @@ -623,13 +650,14 @@ private: tid_t getxattr(const object_t& oid, const object_locator_t& oloc, const char *name, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_GETXATTR; - ops[0].op.xattr.name_len = (name ? strlen(name) : 0); - ops[0].op.xattr.value_len = 0; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_GETXATTR; + ops[i].op.xattr.name_len = (name ? strlen(name) : 0); + ops[i].op.xattr.value_len = 0; if (name) - ops[0].data.append(name); + ops[i].data.append(name); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); o->snapid = snap; o->outbl = pbl; @@ -639,9 +667,10 @@ private: tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, snapid_t snap, map& attrset, int flags, Context *onfinish, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_GETXATTRS; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_GETXATTRS; C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver); o->snapid = snap; @@ -652,7 +681,7 @@ private: tid_t read_full(const object_t& oid, const object_locator_t& oloc, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish, - eversion_t *objver = NULL) { + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { return read(oid, oloc, 0, 0, snap, pbl, flags | CEPH_OSD_FLAG_READ, onfinish, objver); } @@ -671,14 +700,15 @@ private: uint64_t off, uint64_t len, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_WRITE; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; - ops[0].op.extent.truncate_size = 0; - ops[0].op.extent.truncate_seq = 0; - ops[0].data = bl; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_WRITE; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; + ops[i].op.extent.truncate_size = 0; + ops[i].op.extent.truncate_seq = 0; + ops[i].data = bl; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -689,14 +719,15 @@ private: utime_t mtime, int flags, uint64_t trunc_size, __u32 trunc_seq, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_WRITE; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; - ops[0].op.extent.truncate_size = trunc_size; - ops[0].op.extent.truncate_seq = trunc_seq; - ops[0].data = bl; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_WRITE; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; + ops[i].op.extent.truncate_size = trunc_size; + ops[i].op.extent.truncate_seq = trunc_seq; + ops[i].data = bl; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -705,12 +736,13 @@ private: tid_t write_full(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_WRITEFULL; - ops[0].op.extent.offset = 0; - ops[0].op.extent.length = bl.length(); - ops[0].data = bl; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_WRITEFULL; + ops[i].op.extent.offset = 0; + ops[i].op.extent.length = bl.length(); + ops[i].data = bl; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -721,12 +753,13 @@ private: utime_t mtime, int flags, uint64_t trunc_size, __u32 trunc_seq, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_TRUNCATE; - ops[0].op.extent.offset = trunc_size; - ops[0].op.extent.truncate_size = trunc_size; - ops[0].op.extent.truncate_seq = trunc_seq; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_TRUNCATE; + ops[i].op.extent.offset = trunc_size; + ops[i].op.extent.truncate_size = trunc_size; + ops[i].op.extent.truncate_seq = trunc_seq; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -735,11 +768,12 @@ private: tid_t zero(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_ZERO; - ops[0].op.extent.offset = off; - ops[0].op.extent.length = len; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_ZERO; + ops[i].op.extent.offset = off; + ops[i].op.extent.length = len; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -748,10 +782,11 @@ private: tid_t rollback_object(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, snapid_t snapid, utime_t mtime, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_ROLLBACK; - ops[0].op.snap.snapid = snapid; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_ROLLBACK; + ops[i].op.snap.snapid = snapid; Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -761,10 +796,11 @@ private: const SnapContext& snapc, utime_t mtime, int global_flags, int create_flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_CREATE; - ops[0].op.flags = create_flags; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_CREATE; + ops[i].op.flags = create_flags; Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -773,9 +809,10 @@ private: tid_t remove(const object_t& oid, const object_locator_t& oloc, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_DELETE; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_DELETE; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -783,10 +820,11 @@ private: } tid_t lock(const object_t& oid, const object_locator_t& oloc, int op, int flags, - Context *onack, Context *oncommit, eversion_t *objver = NULL) { + Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { SnapContext snapc; // no snapc for lock ops - vector ops(1); - ops[0].op.op = op; + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = op; Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->snapc = snapc; return op_submit(o); @@ -795,14 +833,15 @@ private: const char *name, const SnapContext& snapc, const bufferlist &bl, utime_t mtime, int flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_SETXATTR; - ops[0].op.xattr.name_len = (name ? strlen(name) : 0); - ops[0].op.xattr.value_len = bl.length(); + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_SETXATTR; + ops[i].op.xattr.name_len = (name ? strlen(name) : 0); + ops[i].op.xattr.value_len = bl.length(); if (name) - ops[0].data.append(name); - ops[0].data.append(bl); + ops[i].data.append(name); + ops[i].data.append(bl); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; @@ -812,13 +851,14 @@ private: const char *name, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit, - eversion_t *objver = NULL) { - vector ops(1); - ops[0].op.op = CEPH_OSD_OP_RMXATTR; - ops[0].op.xattr.name_len = (name ? strlen(name) : 0); - ops[0].op.xattr.value_len = 0; + eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { + vector ops; + int i = init_ops(ops, 1, extra_ops); + ops[i].op.op = CEPH_OSD_OP_RMXATTR; + ops[i].op.xattr.name_len = (name ? strlen(name) : 0); + ops[i].op.xattr.value_len = 0; if (name) - ops[0].data.append(name); + ops[i].data.append(name); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; diff --git a/src/testradospp.cc b/src/testradospp.cc index 6e2975e08ba71..c685281f5871c 100644 --- a/src/testradospp.cc +++ b/src/testradospp.cc @@ -70,9 +70,9 @@ int main(int argc, const char **argv) uint64_t objver = rados.get_last_ver(); cout << "rados.write returned " << r << " last_ver=" << objver << std::endl; - uint64_t cookie; + uint64_t handle; C_Watch wc; - r = rados.watch(pool, oid, objver, &cookie, &wc); + r = rados.watch(pool, oid, objver, &handle, &wc); cout << "rados.watch returned " << r << std::endl; cout << "*** press enter to continue ***" << std::endl; @@ -82,11 +82,12 @@ int main(int argc, const char **argv) cout << "*** press enter to continue ***" << std::endl; getchar(); - exit(0); - + rados.set_assert_ver(pool, objver); r = rados.write(pool, oid, 0, bl, bl.length() - 1); cout << "rados.write returned " << r << std::endl; + + exit(0); r = rados.write(pool, oid, 0, bl, bl.length() - 2); cout << "rados.write returned " << r << std::endl; r = rados.write(pool, oid, 0, bl, bl.length() - 3); -- 2.39.5