From 6518fae3177ef5fbc2b8a3c4cdae26b1fc35de79 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 2 Dec 2010 11:52:28 -0800 Subject: [PATCH] watch: some more linger fixes --- src/auth/Crypto.cc | 1 - src/osd/ReplicatedPG.cc | 7 ++- src/osdc/Objecter.cc | 108 +++++++++++++++++++++++++++------------ src/osdc/Objecter.h | 110 ++++++++++++++++++++++++---------------- 4 files changed, 146 insertions(+), 80 deletions(-) diff --git a/src/auth/Crypto.cc b/src/auth/Crypto.cc index 26b7b5ccc6167..7ad02c7fb42ee 100644 --- a/src/auth/Crypto.cc +++ b/src/auth/Crypto.cc @@ -12,7 +12,6 @@ */ #include "Crypto.h" - #include "openssl/evp.h" #include "openssl/aes.h" diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 563c48f43fba5..250241cea19d9 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1415,10 +1415,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, session->get(); session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); obc->ref++; -#if 0 - assert(obc->unconnected_watchers.count(entity)); - obc->unconnected_watchers.erase(entity); -#endif } else if (iter->second == session) { // already there dout(10) << " already connected to watch " << w << " by " << entity @@ -1433,6 +1429,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, session->get(); session->watches[obc] = osd->osdmap->object_locator_to_pg(soid.oid, obc->obs.oi.oloc); } + map::iterator un_iter = obc->unconnected_watchers.find(entity); + if (un_iter != obc->unconnected_watchers.end()) + obc->unconnected_watchers.erase(un_iter); register_object_context(obc); } else { map::iterator oi_iter = oi.watchers.find(entity); diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 97245d5044e4b..81372d302621e 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -35,6 +35,7 @@ #include "messages/MStatfsReply.h" #include "messages/MOSDFailure.h" +#include "common/BackTrace.h" #include @@ -157,16 +158,23 @@ void Objecter::handle_osd_map(MOSDMap *m) p != op_osd.end(); p++) { if (p->second->paused) { + dout(0) << "handle_osd_map: op_submit(1)" << dendl; p->second->paused = false; op_submit(p->second); } } - for (hash_map::iterator p = op_osd_linger.begin(); - p != op_osd_linger.end(); p++) { - op_submit(p->second); - } } - + dout(0) << "handle_osd_map" << dendl; + dump_active(); +#if 0 + hash_map old_map; + old_map.swap(op_osd_linger); + for (hash_map::iterator p = old_map.begin(); + p != op_osd_linger.end(); p++) { + dout(0) << "handle_osd_map: op_submit" << dendl; + op_submit(p->second); + } +#endif assert(e == osdmap->get_epoch()); } @@ -304,6 +312,7 @@ void Objecter::scan_pgs(set& changed_pgs) void Objecter::kick_requests(set& changed_pgs) { dout(10) << "kick_requests in pgs " << changed_pgs << dendl; + dout(0) << "kick_requests in pgs " << changed_pgs << dendl; for (set::iterator i = changed_pgs.begin(); i != changed_pgs.end(); @@ -314,7 +323,14 @@ void Objecter::kick_requests(set& changed_pgs) // resubmit ops! set tids; tids.swap( pg.active_tids ); - close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing + set::iterator liter; + for (liter = pg.linger_tids.begin(); liter != pg.linger_tids.end(); ++liter) { + tids.insert(*liter); + dout(0) << "adding lingering tid=" << *liter << " to set" << dendl; + } + dout(0) << "pg.linger_tids.empty()=" << pg.linger_tids.empty() << " pg=" << &pg << dendl; + if (pg.linger_tids.empty()) + close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl; for (set::iterator p = tids.begin(); @@ -333,22 +349,29 @@ void Objecter::kick_requests(set& changed_pgs) if (op->oncommit) num_uncommitted--; + dout(0) << "kick_requests tid=" << tid << " linger=" << op->linger << dendl; // WRITE if (op->onack) { dout(3) << "kick_requests missing ack, resub " << tid << dendl; - op_submit(op); - } else if (!op->linger) { - assert(op->oncommit); - dout(3) << "kick_requests missing commit, resub " << tid << dendl; - op_submit(op); + op_submit(op, false); } else { - dout(3) << "kick_requests on lingering op " << tid << dendl; - op_submit(op); + if (!op->linger) + assert(op->oncommit); + dout(3) << "kick_requests missing commit, resub " << tid << dendl; + dout(0) << "kick_requests missing commit, resub " << tid << dendl; + op_submit(op, false); } + } else { + hash_map::iterator p = op_osd_linger.find(tid); + if (p != op_osd_linger.end()) { + dout(0) << "kick_requests lingering " << tid << dendl; + op_submit(p->second, true); + } else + assert(0); } - else - assert(0); - } + } +#if 0 +#endif } } @@ -412,8 +435,12 @@ void Objecter::resend_mon_ops() // read | write --------------------------- -tid_t Objecter::op_submit(Op *op) +tid_t Objecter::op_submit(Op *op, bool register_linger) { + bool handle_linger = (op->linger && register_linger); + bool new_linger = (op->linger && !op->tid); + +dout(0) << "Objecter::op_submit register_linger=" << register_linger << " new_linger=" << new_linger << dendl; // throttle. before we look at any state, because // take_op_budget() may drop our lock while it blocks. @@ -426,8 +453,13 @@ tid_t Objecter::op_submit(Op *op) PG &pg = get_pg(op->pgid); // pick tid - if (!op->tid) + if (!op->tid) { + op->tid = ++last_tid; + if (new_linger) + op_osd_linger[op->tid] = op; + } else if (handle_linger) { op->tid = ++last_tid; + } assert(client_inc >= 0); // add to gather set(s) @@ -446,14 +478,25 @@ tid_t Objecter::op_submit(Op *op) } op_osd[op->tid] = op; - if (op->linger) { - op_osd_linger[op->tid] = op; + if (handle_linger) { + dout(0) << "reset lingering request" << dendl; + op->attempts = 0; } - + BackTrace bt(0); + bt.print(*_dout); pg.active_tids.insert(op->tid); + if (new_linger) { + pg.linger_tids.insert(op->tid); + dout(0) << "inserting tid=" << op->tid << " to linger_tids pg=" << (void *)&pg << dendl; + } pg.last = g_clock.now(); // send? + dout(0) << "op_submit oid " << op->oid + << " " << op->oloc + << " " << op->ops << " tid " << op->tid + << " osd" << pg.primary() + << dendl; dout(10) << "op_submit oid " << op->oid << " " << op->oloc << " " << op->ops << " tid " << op->tid @@ -513,7 +556,7 @@ tid_t Objecter::op_submit(Op *op) m->ops = op->ops; m->set_mtime(op->mtime); m->set_retry_attempt(op->attempts++); - + if (op->version != eversion_t()) m->set_version(op->version); // we're replaying this op! @@ -599,7 +642,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) num_unacked--; if (op->oncommit) num_uncommitted--; - op_submit(op); + op_submit(op, false); m->put(); return; } @@ -637,14 +680,15 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) pg.active_tids.erase(tid); dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg() << " still has " << pg.active_tids << dendl; - if (pg.active_tids.empty()) + if (pg.active_tids.empty() && pg.linger_tids.empty()) close_pg( m->get_pg() ); put_op_budget(op); op_osd.erase( tid ); - if (op->con) - op->con->put(); - if (!op->linger) + if (!op->linger) { + if (op->con) + op->con->put(); delete op; + } } dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; @@ -712,7 +756,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) { object_locator_t oloc(list_context->pool_id); // - Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL); + Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL, false); o->priority = op.priority; o->snapid = list_context->pool_snap_seq; o->outbl = bl; @@ -1201,11 +1245,11 @@ void Objecter::ms_handle_remote_reset(Connection *con) void Objecter::dump_active() { - dout(10) << "dump_active" << dendl; + dout(0) << "dump_active" << dendl; for (hash_map::iterator p = op_osd.begin(); p != op_osd.end(); p++) - dout(10) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl; - dout(10) << "lingering" << dendl; + dout(0) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl; + dout(0) << "lingering" << dendl; for (hash_map::iterator p = op_osd_linger.begin(); p != op_osd_linger.end(); p++) - dout(10) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl; + dout(0) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl; } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 512c4f7486d32..ada014e878f52 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -278,13 +278,13 @@ public: bool linger; Op(const object_t& o, const object_locator_t& ol, vector& op, - int f, Context *ac, Context *co, eversion_t *ov) : + int f, Context *ac, Context *co, eversion_t *ov, bool ln) : session_item(this), oid(o), oloc(ol), con(NULL), snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), tid(0), attempts(0), - paused(false), objver(ov), linger(false) { + paused(false), objver(ov), linger(ln) { ops.swap(op); } }; @@ -426,6 +426,7 @@ public: public: vector acting; set active_tids; // active ops + set linger_tids; // active ops utime_t last; PG() {} @@ -518,7 +519,7 @@ public: private: // low-level - tid_t op_submit(Op *op); + tid_t op_submit(Op *op, bool register_linger = true); // public interface public: @@ -540,7 +541,7 @@ private: ObjectOperation& op, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit, eversion_t *objver = NULL) { - Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, op.linger); o->priority = op.priority; o->mtime = mtime; o->snapc = snapc; @@ -550,15 +551,16 @@ private: ObjectOperation& op, snapid_t snapid, bufferlist *pbl, int flags, Context *onack, eversion_t *objver = NULL) { - Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL, objver); + Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL, objver, op.linger); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; return op_submit(o); } - int init_ops(vector& ops, int ops_count, ObjectOperation *extra_ops) { + int init_ops(vector& ops, int ops_count, ObjectOperation *extra_ops, bool *plinger) { int i; + bool linger = false; if (extra_ops) ops_count += extra_ops->ops.size(); @@ -567,8 +569,12 @@ private: for (i=0; iops[i]; + linger |= extra_ops->linger; } + if (plinger) + *plinger = linger; + return i; } @@ -579,10 +585,11 @@ private: Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_STAT; C_Stat *fin = new C_Stat(psize, pmtime, onfinish); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver, linger); o->snapid = snap; o->outbl = &fin->bl; return op_submit(o); @@ -593,13 +600,14 @@ private: Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_READ; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -610,13 +618,14 @@ private: Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_READ; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_seq = trunc_seq; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -626,13 +635,14 @@ private: Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_MAPEXT; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -642,13 +652,14 @@ private: Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_SPARSE_READ; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -659,13 +670,14 @@ private: Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_GETXATTR; ops[i].op.xattr.name_len = (name ? strlen(name) : 0); ops[i].op.xattr.value_len = 0; if (name) ops[i].data.append(name); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); o->snapid = snap; o->outbl = pbl; return op_submit(o); @@ -676,10 +688,11 @@ private: int flags, Context *onfinish, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_GETXATTRS; C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver, linger); o->snapid = snap; o->outbl = &fin->bl; return op_submit(o); @@ -698,7 +711,7 @@ private: const SnapContext& snapc, int flags, Context *onack, Context *oncommit, eversion_t *objver = NULL) { - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, false); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -709,14 +722,15 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_WRITE; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; ops[i].data = bl; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -728,14 +742,15 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_WRITE; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_seq = trunc_seq; ops[i].data = bl; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -745,12 +760,13 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_WRITEFULL; ops[i].op.extent.offset = 0; ops[i].op.extent.length = bl.length(); ops[i].data = bl; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -762,12 +778,13 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_TRUNCATE; ops[i].op.extent.offset = trunc_size; ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_seq = trunc_seq; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -777,11 +794,12 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_ZERO; ops[i].op.extent.offset = off; ops[i].op.extent.length = len; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -791,10 +809,11 @@ private: utime_t mtime, Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_ROLLBACK; ops[i].op.snap.snapid = snapid; - Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -805,10 +824,11 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_CREATE; ops[i].op.flags = create_flags; - Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -818,9 +838,10 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_DELETE; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -830,9 +851,10 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { SnapContext snapc; // no snapc for lock ops vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = op; - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->snapc = snapc; return op_submit(o); } @@ -842,14 +864,15 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_SETXATTR; ops[i].op.xattr.name_len = (name ? strlen(name) : 0); ops[i].op.xattr.value_len = bl.length(); if (name) ops[i].data.append(name); ops[i].data.append(bl); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); @@ -860,13 +883,14 @@ private: Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { vector ops; - int i = init_ops(ops, 1, extra_ops); + bool linger; + int i = init_ops(ops, 1, extra_ops, &linger); ops[i].op.op = CEPH_OSD_OP_RMXATTR; ops[i].op.xattr.name_len = (name ? strlen(name) : 0); ops[i].op.xattr.value_len = 0; if (name) ops[i].data.append(name); - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); o->mtime = mtime; o->snapc = snapc; return op_submit(o); -- 2.39.5