OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
&obc->obs, obc->ssc,
this);
+ ctx->op_t = pgbackend->get_transaction();
ctx->obc = obc;
if (m->get_flags() & CEPH_OSD_FLAG_SKIPRWLOCKS) {
dout(20) << __func__ << ": skipping rw locks" << dendl;
// this method must be idempotent since we may call it several times
// before we finally apply the resulting transaction.
- ctx->op_t = ObjectStore::Transaction();
- ctx->local_t = ObjectStore::Transaction();
+ delete ctx->op_t;
+ ctx->op_t = pgbackend->get_transaction();
if (op->may_write() || op->may_cache()) {
// dup/replay?
// possible to construct an operation that does a read, does a guard
// check (e.g., CMPXATTR), and then a write. Then we either succeed
// with the write, or return a CMPXATTR and the read value.
- if ((ctx->op_t.empty() && !ctx->modify) || result < 0) {
+ if ((ctx->op_t->empty() && !ctx->modify) || result < 0) {
// read.
ctx->reply->claim_op_out_data(ctx->ops);
ctx->reply->get_header().data_off = ctx->data_off;
ctx->reply->set_result(result);
// read or error?
- if (ctx->op_t.empty() || result < 0) {
+ if (ctx->op_t->empty() || result < 0) {
MOSDOpReply *reply = ctx->reply;
ctx->reply = NULL;
// trim log?
calc_trim_to();
- append_log(ctx->log, pg_trim_to, ctx->local_t);
-
// verify that we are doing this in order?
if (cct->_conf->osd_debug_op_order && m->get_source().is_client()) {
map<client_t,tid_t>& cm = debug_op_order[obc->obs.oi.soid];
return;
}
}
-
- sub_op_modify(op);
}
void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
return;
}
}
-
- sub_op_modify_reply(op);
}
void ReplicatedPG::do_scan(
OpContext *ctx = repop->ctx;
ctx->at_version = get_next_version();
- ObjectStore::Transaction *t = &ctx->op_t;
+ PGBackend::PGTransaction *t = ctx->op_t;
set<snapid_t> new_snaps;
for (set<snapid_t>::iterator i = old_snaps.begin();
i != old_snaps.end();
// remove clone
dout(10) << coid << " snaps " << old_snaps << " -> "
<< new_snaps << " ... deleting" << dendl;
- t->remove(coll, coid);
+ t->remove(coid);
// ...from snapset
snapid_t last = coid.snap;
coi.version = ctx->at_version;
bl.clear();
::encode(coi, bl);
- t->setattr(coll, coid, OI_ATTR, bl);
+ t->setattr(coid, OI_ATTR, bl);
ctx->log.push_back(
pg_log_entry_t(
);
ctx->snapset_obc->obs.exists = false;
- t->remove(coll, snapoid);
+ t->remove(snapoid);
} else {
dout(10) << coid << " updating snapset on " << snapoid << dendl;
ctx->log.push_back(
bl.clear();
::encode(snapset, bl);
- t->setattr(coll, snapoid, SS_ATTR, bl);
+ t->setattr(snapoid, SS_ATTR, bl);
bl.clear();
::encode(ctx->snapset_obc->obs.oi, bl);
- t->setattr(coll, snapoid, OI_ATTR, bl);
+ t->setattr(snapoid, OI_ATTR, bl);
}
return repop;
bool first_read = true;
- ObjectStore::Transaction& t = ctx->op_t;
+ PGBackend::PGTransaction* t = ctx->op_t;
dout(10) << "do_osd_op " << soid << " " << ops << dendl;
if (obs.exists && !oi.is_whiteout()) {
dout(10) << " truncate_seq " << op.extent.truncate_seq << " > current " << seq
<< ", truncating to " << op.extent.truncate_size << dendl;
- t.truncate(coll, soid, op.extent.truncate_size);
+ t->truncate(soid, op.extent.truncate_size);
oi.truncate_seq = op.extent.truncate_seq;
oi.truncate_size = op.extent.truncate_size;
if (op.extent.truncate_size != oi.size) {
result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
if (result < 0)
break;
- t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata);
+ t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges,
op.extent.offset, op.extent.length, true);
if (!obs.exists) {
if (result < 0)
break;
if (obs.exists) {
- t.truncate(coll, soid, 0);
+ t->truncate(soid, 0);
} else {
ctx->delta_stats.num_objects++;
obs.exists = true;
}
- t.write(coll, soid, op.extent.offset, op.extent.length, osd_op.indata);
+ t->write(soid, op.extent.offset, op.extent.length, osd_op.indata);
interval_set<uint64_t> ch;
if (oi.size > 0)
ch.insert(0, oi.size);
break;
assert(op.extent.length);
if (obs.exists && !oi.is_whiteout()) {
- t.zero(coll, soid, op.extent.offset, op.extent.length);
+ t->zero(soid, op.extent.offset, op.extent.length);
interval_set<uint64_t> ch;
ch.insert(op.extent.offset, op.extent.length);
ctx->modified_ranges.union_of(ch);
}
}
if (result >= 0 && !obs.exists) {
- t.touch(coll, soid);
+ t->touch(soid);
ctx->delta_stats.num_objects++;
obs.exists = true;
}
oi.truncate_size = op.extent.truncate_size;
}
- t.truncate(coll, soid, op.extent.offset);
+ t->truncate(soid, op.extent.offset);
if (oi.size > op.extent.offset) {
interval_set<uint64_t> trim;
trim.insert(op.extent.offset, oi.size-op.extent.offset);
++ctx->num_write;
{
if (!obs.exists) {
- t.touch(coll, obs.oi.soid);
+ t->touch(obs.oi.soid);
ctx->delta_stats.num_objects++;
obs.exists = true;
}
result = -EINVAL;
break;
}
- t.clone_range(coll, src_obc->obs.oi.soid,
+ t->clone_range(src_obc->obs.oi.soid,
obs.oi.soid, op.clonerange.src_offset,
op.clonerange.length, op.clonerange.offset);
} else {
dout(10) << " registered new watch " << w << " by " << entity << dendl;
oi.watchers[make_pair(cookie, entity)] = w;
- t.nop(); // make sure update the object_info on disk!
+ t->nop(); // make sure update the object_info on disk!
}
ctx->watch_connects.push_back(w);
} else {
dout(10) << " removed watch " << oi_iter->second << " by "
<< entity << dendl;
oi.watchers.erase(oi_iter);
- t.nop(); // update oi on disk
+ t->nop(); // update oi on disk
ctx->watch_disconnects.push_back(w);
} else {
dout(10) << " can't remove: no watch by " << entity << dendl;
break;
}
if (!obs.exists) {
- t.touch(coll, soid);
+ t->touch(soid);
ctx->delta_stats.num_objects++;
obs.exists = true;
}
string name = "_" + aname;
bufferlist bl;
bp.copy(op.xattr.value_len, bl);
- t.setattr(coll, soid, name, bl);
+ t->setattr(soid, name, bl);
ctx->delta_stats.num_wr++;
}
break;
string aname;
bp.copy(op.xattr.name_len, aname);
string name = "_" + aname;
- t.rmattr(coll, soid, name);
+ t->rmattr(soid, name);
ctx->delta_stats.num_wr++;
}
break;
break;
case CEPH_OSD_OP_STARTSYNC:
- t.start_sync();
+ // TODOSAM: either nop this or fix it
+ //t.start_sync();
break;
ctx->delta_stats.num_objects++;
obs.exists = true;
}
- t.touch(coll, soid);
+ t->touch(soid);
map<string, bufferlist> to_set;
try {
::decode(to_set, bp);
++i) {
dout(20) << "\t" << i->first << dendl;
}
- t.omap_setkeys(coll, soid, to_set);
+ t->omap_setkeys(soid, to_set);
ctx->delta_stats.num_wr++;
}
break;
ctx->delta_stats.num_objects++;
obs.exists = true;
}
- t.touch(coll, soid);
- t.omap_setheader(coll, soid, osd_op.indata);
+ t->touch(soid);
+ t->omap_setheader(soid, osd_op.indata);
ctx->delta_stats.num_wr++;
}
break;
result = -ENOENT;
break;
}
- t.touch(coll, soid);
- t.omap_clear(coll, soid);
+ t->touch(soid);
+ t->omap_clear(soid);
ctx->delta_stats.num_wr++;
}
break;
result = -ENOENT;
break;
}
- t.touch(coll, soid);
+ t->touch(soid);
set<string> to_rm;
try {
::decode(to_rm, bp);
result = -EINVAL;
goto fail;
}
- t.omap_rmkeys(coll, soid, to_rm);
+ t->omap_rmkeys(soid, to_rm);
ctx->delta_stats.num_wr++;
}
break;
ObjectState& obs = ctx->new_obs;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
- ObjectStore::Transaction& t = ctx->op_t;
+ PGBackend::PGTransaction* t = ctx->op_t;
if (!obs.exists || (obs.oi.is_whiteout() && !no_whiteout))
return -ENOENT;
+ t->remove(soid);
+
if (oi.size > 0) {
interval_set<uint64_t> ch;
ch.insert(0, oi.size);
dout(20) << __func__ << " setting whiteout on " << soid << dendl;
oi.set_flag(object_info_t::FLAG_WHITEOUT);
ctx->delta_stats.num_whiteouts++;
- t.truncate(coll, soid, 0);
- t.omap_clear(coll, soid);
- t.rmattrs(coll, soid);
+ t->touch(soid);
return 0;
}
- t.remove(coll, soid);
ctx->delta_stats.num_objects--;
if (oi.is_dirty())
ctx->delta_stats.num_objects_dirty--;
ObjectState& obs = ctx->new_obs;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
- ObjectStore::Transaction& t = ctx->op_t;
+ PGBackend::PGTransaction* t = ctx->op_t;
snapid_t snapid = (uint64_t)op.snap.snapid;
hobject_t missing_oid;
<< " and rolling back to old snap" << dendl;
if (obs.exists)
- t.remove(coll, soid);
+ t->remove(soid);
- t.clone(coll,
- rollback_to_sobject, soid);
+ t->clone(rollback_to_sobject, soid);
snapset.head_exists = true;
map<snapid_t, interval_set<uint64_t> >::iterator iter =
return ret;
}
-void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
+void ReplicatedPG::_make_clone(PGBackend::PGTransaction* t,
const hobject_t& head, const hobject_t& coid,
object_info_t *poi)
{
bufferlist bv;
::encode(*poi, bv);
- t.clone(coll, head, coid);
- t.setattr(coll, coid, OI_ATTR, bv);
- t.rmattr(coll, coid, SS_ATTR);
+ t->clone(head, coid);
+ t->setattr(coid, OI_ATTR, bv);
+ t->rmattr(coid, SS_ATTR);
}
void ReplicatedPG::make_writeable(OpContext *ctx)
{
const hobject_t& soid = ctx->obs->oi.soid;
SnapContext& snapc = ctx->snapc;
- ObjectStore::Transaction t;
+ PGBackend::PGTransaction *t = pgbackend->get_transaction();
// clone?
assert(soid.snap == CEPH_NOSNAP);
}
// prepend transaction to op_t
- t.append(ctx->op_t);
- t.swap(ctx->op_t);
+ t->append(ctx->op_t);
+ delete ctx->op_t;
+ ctx->op_t = t;
// update snapset with latest snap context
ctx->new_snapset.seq = snapc.seq;
ostringstream ss;
ss << "temp_" << info.pgid << "_" << get_role() << "_" << osd->monc->get_global_id() << "_" << (++temp_seq);
hobject_t hoid = hobject_t::make_temp(ss.str());
- // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
- pgbackend->add_temp_obj(hoid);
dout(20) << __func__ << " " << hoid << dendl;
return hoid;
}
do_osd_op_effects(ctx);
// read-op? done?
- if (ctx->op_t.empty() && !ctx->modify) {
+ if (ctx->op_t->empty() && !ctx->modify) {
unstable_stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
return result;
}
ctx->snapset_obc = get_object_context(snapoid, false);
if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
- ctx->op_t.remove(coll, snapoid);
+ ctx->op_t->remove(snapoid);
dout(10) << " removing old " << snapoid << dendl;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::DELETE, snapoid,
bufferlist bv(sizeof(ctx->new_obs.oi));
::encode(ctx->snapset_obc->obs.oi, bv);
- ctx->op_t.touch(coll, snapoid);
- ctx->op_t.setattr(coll, snapoid, OI_ATTR, bv);
- ctx->op_t.setattr(coll, snapoid, SS_ATTR, bss);
+ ctx->op_t->touch(snapoid);
+ ctx->op_t->setattr(snapoid, OI_ATTR, bv);
+ ctx->op_t->setattr(snapoid, SS_ATTR, bss);
ctx->at_version.version++;
}
}
ctx->user_at_version = ctx->at_version.version;
ctx->new_obs.oi.user_version = ctx->user_at_version;
}
- ctx->bytes_written = ctx->op_t.get_encoded_bytes();
+ ctx->bytes_written = ctx->op_t->get_bytes_written();
if (ctx->new_obs.exists) {
// on the head object
bufferlist bv(sizeof(ctx->new_obs.oi));
::encode(ctx->new_obs.oi, bv);
- ctx->op_t.setattr(coll, soid, OI_ATTR, bv);
+ ctx->op_t->setattr(soid, OI_ATTR, bv);
if (soid.snap == CEPH_NOSNAP) {
dout(10) << " final snapset " << ctx->new_snapset
<< " in " << soid << dendl;
- ctx->op_t.setattr(coll, soid, SS_ATTR, bss);
+ ctx->op_t->setattr(soid, SS_ATTR, bss);
} else {
dout(10) << " no snapset (this is a clone)" << dendl;
}
}
}
+ assert(cop->rval >= 0);
+
if (!cop->cursor.is_complete()) {
// write out what we have so far
if (cop->temp_cursor.is_initial()) {
if (cop->temp_cursor.is_initial()) {
repop->ctx->new_temp_oid = cop->results.temp_oid;
}
- _write_copy_chunk(cop, &repop->ctx->op_t);
+ _write_copy_chunk(cop, repop->ctx->op_t);
simple_repop_submit(repop);
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(cobc, cop);
}
dout(20) << __func__ << " success; committing" << dendl;
+ cop->results.final_tx = pgbackend->get_transaction();
_build_finish_copy_transaction(cop, cop->results.final_tx);
out:
kick_object_context_blocked(cobc);
}
-void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t)
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
{
dout(20) << __func__ << " " << cop
<< " " << cop->attrs.size() << " attrs"
<< " " << cop->omap.size() << " keys"
<< dendl;
if (!cop->temp_cursor.attr_complete) {
- t->touch(cop->results.temp_coll, cop->results.temp_oid);
+ t->touch(cop->results.temp_oid);
for (map<string,bufferlist>::iterator p = cop->attrs.begin();
- p != cop->attrs.end(); ++p)
- t->setattr(cop->results.temp_coll, cop->results.temp_oid,
- string("_") + p->first, p->second);
+ p != cop->attrs.end();
+ ++p)
+ t->setattr(
+ cop->results.temp_oid,
+ string("_") + p->first, p->second);
cop->attrs.clear();
}
if (!cop->temp_cursor.data_complete) {
- t->write(cop->results.temp_coll, cop->results.temp_oid,
- cop->temp_cursor.data_offset, cop->data.length(), cop->data);
+ t->write(
+ cop->results.temp_oid,
+ cop->temp_cursor.data_offset, cop->data.length(), cop->data);
cop->data.clear();
}
if (!cop->temp_cursor.omap_complete) {
if (cop->omap_header.length()) {
- t->omap_setheader(cop->results.temp_coll, cop->results.temp_oid,
- cop->omap_header);
+ t->omap_setheader(
+ cop->results.temp_oid,
+ cop->omap_header);
cop->omap_header.clear();
}
- t->omap_setkeys(cop->results.temp_coll, cop->results.temp_oid, cop->omap);
+ t->omap_setkeys(cop->results.temp_oid, cop->omap);
cop->omap.clear();
}
cop->temp_cursor = cop->cursor;
}
void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
- ObjectStore::Transaction& t)
+ PGBackend::PGTransaction* t)
{
ObjectState& obs = cop->obc->obs;
if (obs.exists) {
- t.remove(coll, obs.oi.soid);
+ t->remove(obs.oi.soid);
}
if (cop->temp_cursor.is_initial()) {
// write directly to final object
cop->results.temp_oid = obs.oi.soid;
- _write_copy_chunk(cop, &t);
+ _write_copy_chunk(cop, t);
} else {
// finish writing to temp object, then move into place
- _write_copy_chunk(cop, &t);
- t.collection_move_rename(
- cop->results.temp_coll, cop->results.temp_oid, coll, obs.oi.soid);
-
- // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
- pgbackend->clear_temp_obj(cop->results.temp_oid);
+ _write_copy_chunk(cop, t);
+ t->rename(cop->results.temp_oid, obs.oi.soid);
}
}
if (cb->is_temp_obj_used()) {
ctx->discard_temp_oid = cb->results->temp_oid;
}
- ctx->op_t.swap(cb->results->final_tx);
- ctx->op_t.append(cb->results->final_tx);
+ ctx->op_t->append(cb->results->final_tx);
+ delete cb->results->final_tx;
+ cb->results->final_tx = NULL;
// CopyFromCallback fills this in for us
obs.oi.user_version = ctx->user_at_version;
if (whiteout) {
// create a whiteout
- tctx->op_t.touch(coll, soid);
+ tctx->op_t->touch(soid);
tctx->new_obs.oi.set_flag(object_info_t::FLAG_WHITEOUT);
++tctx->delta_stats.num_whiteouts;
dout(20) << __func__ << " creating whiteout" << dendl;
} else {
- tctx->op_t.swap(results->final_tx);
+ tctx->op_t->append(results->final_tx);
+ delete results->final_tx;
+ results->final_tx = NULL;
if (results->started_temp_obj) {
tctx->discard_temp_oid = results->temp_oid;
}
// ========================================================================
// rep op gather
-class C_OSD_OpApplied : public Context {
-public:
+class C_OSD_RepopApplied : public Context {
ReplicatedPGRef pg;
- ReplicatedPG::RepGather *repop;
-
- C_OSD_OpApplied(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
- pg(p), repop(rg) {
- repop->get();
- }
- void finish(int r) {
- pg->op_applied(repop);
- }
-};
-
-class C_OSD_OpCommit : public Context {
+ boost::intrusive_ptr<ReplicatedPG::RepGather> repop;
public:
- ReplicatedPGRef pg;
- ReplicatedPG::RepGather *repop;
-
- C_OSD_OpCommit(ReplicatedPG *p, ReplicatedPG::RepGather *rg) :
- pg(p), repop(rg) {
- repop->get();
- }
- void finish(int r) {
- pg->op_commit(repop);
+ C_OSD_RepopApplied(ReplicatedPG *pg, ReplicatedPG::RepGather *repop)
+ : pg(pg), repop(repop) {}
+ void finish(int) {
+ pg->repop_all_applied(repop.get());
}
};
-void ReplicatedPG::apply_repop(RepGather *repop)
-{
- dout(10) << "apply_repop applying update on " << *repop << dendl;
- assert(!repop->applying);
- assert(!repop->applied);
-
- repop->applying = true;
-
- repop->tls.push_back(&repop->ctx->local_t);
- repop->tls.push_back(&repop->ctx->op_t);
-
- repop->obc->ondisk_write_lock();
- if (repop->ctx->clone_obc)
- repop->ctx->clone_obc->ondisk_write_lock();
- bool unlock_snapset_obc = false;
- if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
- repop->obc->obs.oi.soid) {
- repop->ctx->snapset_obc->ondisk_write_lock();
- unlock_snapset_obc = true;
+void ReplicatedPG::repop_all_applied(RepGather *repop)
+{
+ dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all applied "
+ << dendl;
+ repop->all_applied = true;
+ if (!repop->rep_aborted) {
+ eval_repop(repop);
}
+}
- Context *oncommit = new C_OSD_OpCommit(this, repop);
- Context *onapplied = new C_OSD_OpApplied(this, repop);
- Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
- repop->obc,
- repop->ctx->clone_obc,
- unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
- int r = osd->store->queue_transactions(osr.get(), repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
- if (r) {
- derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl;
- assert(0);
+class C_OSD_RepopCommit : public Context {
+ ReplicatedPGRef pg;
+ boost::intrusive_ptr<ReplicatedPG::RepGather> repop;
+public:
+ C_OSD_RepopCommit(ReplicatedPG *pg, ReplicatedPG::RepGather *repop)
+ : pg(pg), repop(repop) {}
+ void finish(int) {
+ pg->repop_all_committed(repop.get());
}
-}
+};
-void ReplicatedPG::op_applied(RepGather *repop)
+void ReplicatedPG::repop_all_committed(RepGather *repop)
{
- lock();
- dout(10) << "op_applied " << *repop << dendl;
- if (repop->ctx->op)
- repop->ctx->op->mark_event("op_applied");
-
- repop->applying = false;
- repop->applied = true;
-
- // (logical) local ack.
- int whoami = osd->get_nodeid();
-
- if (!repop->aborted) {
- assert(repop->waitfor_ack.count(whoami) ||
- repop->waitfor_disk.count(whoami) == 0); // commit before ondisk
- repop->waitfor_ack.erase(whoami);
+ dout(10) << __func__ << ": repop tid " << repop->rep_tid << " all committed "
+ << dendl;
+ repop->all_committed = true;
+ if (!repop->rep_aborted) {
if (repop->v != eversion_t()) {
- assert(info.last_update >= repop->v);
- assert(last_update_applied < repop->v);
- last_update_applied = repop->v;
+ last_update_ondisk = repop->v;
+ last_complete_ondisk = repop->pg_local_last_complete;
}
+ eval_repop(repop);
+ }
+}
- // chunky scrub
+void ReplicatedPG::op_applied(const eversion_t &applied_version)
+{
+ dout(10) << "op_applied on primary on version " << applied_version << dendl;
+ if (applied_version == eversion_t())
+ return;
+ assert(applied_version > last_update_applied);
+ assert(applied_version <= info.last_update);
+ last_update_applied = applied_version;
+ if (is_primary()) {
if (scrubber.active && scrubber.is_chunky) {
if (last_update_applied == scrubber.subset_last_update) {
osd->scrub_wq.queue(this);
}
-
- // classic scrub
} else if (last_update_applied == info.last_update && scrubber.block_writes) {
dout(10) << "requeueing scrub for cleanup" << dendl;
scrubber.finalizing = true;
scrubber.waiting_on_whom.insert(osd->whoami);
osd->scrub_wq.queue(this);
}
- }
-
- if (!repop->aborted)
- eval_repop(repop);
-
- repop->put();
- unlock();
-}
-
-void ReplicatedPG::op_commit(RepGather *repop)
-{
- lock();
- if (repop->ctx->op)
- repop->ctx->op->mark_event("op_commit");
-
- if (repop->aborted) {
- dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
- } else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
- dout(10) << "op_commit " << *repop << " -- already marked ondisk" << dendl;
} else {
- dout(10) << "op_commit " << *repop << dendl;
- int whoami = osd->get_nodeid();
-
- repop->waitfor_disk.erase(whoami);
-
- // remove from ack waitfor list too. sub_op_modify_commit()
- // behaves the same in that the COMMIT implies and ACK and there
- // is no separate reply sent.
- repop->waitfor_ack.erase(whoami);
-
- if (repop->v != eversion_t()) {
- last_update_ondisk = repop->v;
- last_complete_ondisk = repop->pg_local_last_complete;
+ dout(10) << "op_applied on replica on version " << applied_version << dendl;
+ if (scrubber.active_rep_scrub) {
+ if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
+ osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
+ scrubber.active_rep_scrub = 0;
+ }
}
- eval_repop(repop);
}
-
- repop->put();
- unlock();
}
-
-
void ReplicatedPG::eval_repop(RepGather *repop)
{
MOSDOp *m = NULL;
if (m)
dout(10) << "eval_repop " << *repop
<< " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"")
- << (repop->done() ? " DONE" : "")
+ << (repop->rep_done ? " DONE" : "")
<< dendl;
else
dout(10) << "eval_repop " << *repop << " (no op)"
- << (repop->done() ? " DONE" : "")
+ << (repop->rep_done ? " DONE" : "")
<< dendl;
- if (repop->done())
+ if (repop->rep_done)
return;
- // apply?
- if (!repop->applied && !repop->applying)
- apply_repop(repop);
-
if (m) {
// an 'ondisk' reply implies 'ack'. so, prefer to send just one
// ondisk instead of ack followed by ondisk.
// ondisk?
- if (repop->waitfor_disk.empty()) {
+ if (repop->all_committed) {
release_op_ctx_locks(repop->ctx);
}
// applied?
- if (repop->waitfor_ack.empty()) {
+ if (repop->all_applied) {
// send dup acks, in order
if (waiting_for_ack.count(repop->v)) {
}
// done.
- if (repop->waitfor_ack.empty() && repop->waitfor_disk.empty() &&
- repop->applied) {
- repop->mark_done();
+ if (repop->all_applied && repop->all_committed) {
+ repop->rep_done = true;
calc_min_last_complete_ondisk();
{
OpContext *ctx = repop->ctx;
const hobject_t& soid = ctx->obs->oi.soid;
-
+ if (ctx->op &&
+ ((static_cast<MOSDOp *>(
+ ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
+ // replicate original op for parallel execution on replica
+ assert(0 == "broken implementation, do not use");
+ }
dout(7) << "issue_repop rep_tid " << repop->rep_tid
<< " o " << soid
<< dendl;
repop->v = ctx->at_version;
- // add myself to gather set
- repop->waitfor_ack.insert(acting[0]);
- repop->waitfor_disk.insert(acting[0]);
+ for (vector<int>::iterator i = actingbackfill.begin() + 1;
+ i != actingbackfill.end();
+ ++i) {
+ pg_info_t &pinfo = peer_info[*i];
+ // keep peer_info up to date
+ if (pinfo.last_complete == pinfo.last_update)
+ pinfo.last_complete = ctx->at_version;
+ pinfo.last_update = ctx->at_version;
+ }
+
+ repop->obc->ondisk_write_lock();
+ if (repop->ctx->clone_obc)
+ repop->ctx->clone_obc->ondisk_write_lock();
+
+ bool unlock_snapset_obc = false;
+ if (repop->ctx->snapset_obc && repop->ctx->snapset_obc->obs.oi.soid !=
+ repop->obc->obs.oi.soid) {
+ repop->ctx->snapset_obc->ondisk_write_lock();
+ unlock_snapset_obc = true;
+ }
+ Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
+ Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
+ Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
+ repop->obc,
+ repop->ctx->clone_obc,
+ unlock_snapset_obc ? repop->ctx->snapset_obc : ObjectContextRef());
+ pgbackend->submit_transaction(
+ soid,
+ repop->ctx->at_version,
+ repop->ctx->op_t,
+ pg_trim_to,
+ repop->ctx->log,
+ onapplied_sync,
+ on_all_applied,
+ on_all_commit,
+ repop->rep_tid,
+ repop->ctx->reqid,
+ repop->ctx->op);
+ repop->ctx->op_t = NULL;
+}
+
+void ReplicatedBackend::issue_op(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ vector<pg_log_entry_t> &log_entries,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t)
+{
int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
- assert(actingbackfill.size() > 0);
- if (ctx->op && actingbackfill.size() > 1) {
+ if (parent->get_actingbackfill().size() > 1) {
ostringstream ss;
- ss << "waiting for subops from " << vector<int>(actingbackfill.begin() + 1, actingbackfill.end());
- ctx->op->mark_sub_op_sent(ss.str());
+ ss << "waiting for subops from " <<
+ vector<int>(
+ parent->get_actingbackfill().begin() + 1,
+ parent->get_actingbackfill().end());
+ if (op->op)
+ op->op->mark_sub_op_sent(ss.str());
}
- for (unsigned i=1; i<actingbackfill.size(); i++) {
- int peer = actingbackfill[i];
- pg_info_t &pinfo = peer_info[peer];
+ for (unsigned i=1; i<parent->get_actingbackfill().size(); i++) {
+ int peer = parent->get_actingbackfill()[i];
+ const pg_info_t &pinfo = parent->get_peer_info().find(peer)->second;
- repop->waitfor_ack.insert(peer);
- repop->waitfor_disk.insert(peer);
+ op->waiting_for_applied.insert(peer);
+ op->waiting_for_commit.insert(peer);
// forward the write/update/whatever
- MOSDSubOp *wr = new MOSDSubOp(repop->ctx->reqid, info.pgid, soid,
- false, acks_wanted,
- get_osdmap()->get_epoch(),
- repop->rep_tid, repop->ctx->at_version);
- if (ctx->op &&
- ((static_cast<MOSDOp *>(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
- // replicate original op for parallel execution on replica
- assert(0 == "broken implementation, do not use");
- }
+ MOSDSubOp *wr = new MOSDSubOp(
+ reqid, get_info().pgid, soid,
+ false, acks_wanted,
+ get_osdmap()->get_epoch(),
+ tid, at_version);
// ship resulting transaction, log entries, and pg_stats
- if (!should_send_op(peer, soid)) {
+ if (!parent->should_send_op(peer, soid)) {
dout(10) << "issue_repop shipping empty opt to osd." << peer
<<", object " << soid
<< " beyond MAX(last_backfill_started "
- << last_backfill_started << ", pinfo.last_backfill "
+ << ", pinfo.last_backfill "
<< pinfo.last_backfill << ")" << dendl;
ObjectStore::Transaction t;
::encode(t, wr->get_data());
} else {
- ::encode(repop->ctx->op_t, wr->get_data());
+ ::encode(*op_t, wr->get_data());
}
- ::encode(repop->ctx->log, wr->logbl);
+ ::encode(log_entries, wr->logbl);
- if (is_backfill_targets(peer))
+ if (pinfo.is_incomplete())
wr->pg_stats = pinfo.stats; // reflects backfill progress
else
- wr->pg_stats = info.stats;
+ wr->pg_stats = get_info().stats;
wr->pg_trim_to = pg_trim_to;
- wr->new_temp_oid = repop->ctx->new_temp_oid;
- wr->discard_temp_oid = repop->ctx->discard_temp_oid;
+ wr->new_temp_oid = new_temp_oid;
+ wr->discard_temp_oid = discard_temp_oid;
osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
- // keep peer_info up to date
- if (pinfo.last_complete == pinfo.last_update)
- pinfo.last_update = ctx->at_version;
- pinfo.last_update = ctx->at_version;
}
}
osd->logger->set(l_osd_op_wip, repop_map.size());
}
-void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
- int fromosd, eversion_t peer_lcod)
-{
- MOSDOp *m = NULL;
-
- if (repop->ctx->op)
- m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
-
- if (m)
- dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m
- << " result " << result
- << " ack_type " << ack_type
- << " from osd." << fromosd
- << dendl;
- else
- dout(7) << "repop_ack rep_tid " << repop->rep_tid << " (no op) "
- << " result " << result
- << " ack_type " << ack_type
- << " from osd." << fromosd
- << dendl;
-
- if (ack_type & CEPH_OSD_FLAG_ONDISK) {
- if (repop->ctx->op)
- repop->ctx->op->mark_event("sub_op_commit_rec");
- // disk
- if (repop->waitfor_disk.count(fromosd)) {
- repop->waitfor_disk.erase(fromosd);
- //repop->waitfor_nvram.erase(fromosd);
- repop->waitfor_ack.erase(fromosd);
- peer_last_complete_ondisk[fromosd] = peer_lcod;
- }
-/*} else if (ack_type & CEPH_OSD_FLAG_ONNVRAM) {
- // nvram
- repop->waitfor_nvram.erase(fromosd);
- repop->waitfor_ack.erase(fromosd);*/
- } else {
- // ack
- if (repop->ctx->op)
- repop->ctx->op->mark_event("sub_op_applied_rec");
- repop->waitfor_ack.erase(fromosd);
- }
-
- if (!repop->aborted)
- eval_repop(repop);
-}
-
-
ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
{
dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
&obc->obs, obc->ssc, this);
+ ctx->op_t = pgbackend->get_transaction();
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->obc = obc;
RepGather *repop = new_repop(ctx, obc, rep_tid);
void ReplicatedPG::simple_repop_submit(RepGather *repop)
{
dout(20) << __func__ << " " << repop << dendl;
- if (!repop->ctx->log.empty())
- append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
issue_repop(repop, repop->ctx->mtime);
eval_repop(repop);
repop->put();
}
-
-
-
-
// -------------------------------------------------------
void ReplicatedPG::get_watchers(list<obj_watch_item_t> &pg_watchers)
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
&obc->obs, obc->ssc, this);
+ ctx->op_t = pgbackend->get_transaction();
ctx->mtime = ceph_clock_now(cct);
ctx->at_version = get_next_version();
RepGather *repop = new_repop(ctx, obc, rep_tid);
- ObjectStore::Transaction *t = &ctx->op_t;
+ PGBackend::PGTransaction *t = ctx->op_t;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
ctx->at_version,
obc->obs.oi.version = ctx->at_version;
bufferlist bl;
::encode(obc->obs.oi, bl);
- t->setattr(coll, obc->obs.oi.soid, OI_ATTR, bl);
-
- append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
+ t->setattr(obc->obs.oi.soid, OI_ATTR, bl);
// obc ref swallowed by repop!
issue_repop(repop, repop->ctx->mtime);
// sub op modify
-void ReplicatedPG::sub_op_modify(OpRequestRef op)
+void ReplicatedBackend::sub_op_modify(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
assert(m->get_header().type == MSG_OSD_SUBOP);
<< dendl;
// sanity checks
- assert(m->map_epoch >= info.history.same_interval_since);
- assert(is_active());
+ assert(m->map_epoch >= get_info().history.same_interval_since);
// we better not be missing this.
- assert(!pg_log.get_missing().is_missing(soid));
+ assert(!parent->get_log().get_missing().is_missing(soid));
- int ackerosd = acting[0];
+ int ackerosd = m->get_source().num();
op->mark_started();
- RepModify *rm = new RepModify;
- rm->pg = this;
- get("RepModify");
+ RepModifyRef rm(new RepModify);
rm->op = op;
- rm->ctx = 0;
rm->ackerosd = ackerosd;
- rm->last_complete = info.last_complete;
+ rm->last_complete = get_info().last_complete;
rm->epoch_started = get_osdmap()->get_epoch();
if (!m->noop) {
if (m->new_temp_oid != hobject_t()) {
dout(20) << __func__ << " start tracking temp " << m->new_temp_oid << dendl;
- // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
- pgbackend->add_temp_obj(m->new_temp_oid);
+ add_temp_obj(m->new_temp_oid);
get_temp_coll(&rm->localt);
}
if (m->discard_temp_oid != hobject_t()) {
dout(20) << __func__ << " stop tracking temp " << m->discard_temp_oid << dendl;
- // TODOSAM: adjust when this method gets absorbed into ReplicatedBackend
- pgbackend->clear_temp_obj(m->discard_temp_oid);
+ clear_temp_obj(m->discard_temp_oid);
}
::decode(rm->opt, p);
i != log.end();
++i) {
if (!i->soid.is_max() && i->soid.pool == -1)
- i->soid.pool = info.pgid.pool();
+ i->soid.pool = get_info().pgid.pool();
}
- rm->opt.set_pool_override(info.pgid.pool());
+ rm->opt.set_pool_override(get_info().pgid.pool());
}
rm->opt.set_replica();
- info.stats = m->pg_stats;
bool update_snaps = false;
if (!rm->opt.empty()) {
// If the opt is non-empty, we infer we are before
// collections now. Otherwise, we do it later on push.
update_snaps = true;
}
- append_log(log, m->pg_trim_to, rm->localt, update_snaps);
-
- rm->tls.push_back(&rm->localt);
- rm->tls.push_back(&rm->opt);
-
+ parent->update_stats(m->pg_stats);
+ parent->log_operation(
+ log,
+ m->pg_trim_to,
+ update_snaps,
+ &(rm->localt));
+
rm->bytes_written = rm->opt.get_encoded_bytes();
} else {
+ assert(0);
+ #if 0
// just trim the log
if (m->pg_trim_to != eversion_t()) {
pg_log.trim(m->pg_trim_to, info);
dirty_info = true;
write_if_dirty(rm->localt);
- rm->tls.push_back(&rm->localt);
}
+ #endif
}
op->mark_started();
- Context *oncommit = new C_OSD_RepModifyCommit(rm);
- Context *onapply = new C_OSD_RepModifyApply(rm);
- int r = osd->store->queue_transactions(osr.get(), rm->tls, onapply, oncommit, 0, op);
- if (r) {
- dout(0) << "error applying transaction: r = " << r << dendl;
- assert(0);
- }
+ rm->localt.append(rm->opt);
+ rm->localt.register_on_commit(
+ parent->bless_context(
+ new C_OSD_RepModifyCommit(this, rm)));
+ rm->localt.register_on_applied(
+ parent->bless_context(
+ new C_OSD_RepModifyApply(this, rm)));
+ parent->queue_transaction(&(rm->localt), op);
// op is cleaned up by oncommit/onapply when both are executed
}
-void ReplicatedPG::op_applied_replica(
- const eversion_t &applied_version)
+void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
{
- dout(10) << "op_applied_replica on version " << applied_version << dendl;
- if (applied_version != eversion_t()) {
- assert(info.last_update >= applied_version);
- assert(last_update_applied < applied_version);
- last_update_applied = applied_version;
- }
- if (scrubber.active_rep_scrub) {
- if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
- osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
- scrubber.active_rep_scrub = 0;
- }
- }
-}
-
-void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
-{
- lock();
rm->op->mark_event("sub_op_applied");
rm->applied = true;
- if (!pg_has_reset_since(rm->epoch_started)) {
- dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl;
- MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
- assert(m->get_header().type == MSG_OSD_SUBOP);
-
- if (!rm->committed) {
- // send ack to acker only if we haven't sent a commit already
- MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
- }
-
- op_applied_replica(m->version);
- } else {
- dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req()
- << " from epoch " << rm->epoch_started << " < last_peering_reset "
- << last_peering_reset << dendl;
- }
-
- bool done = rm->applied && rm->committed;
- unlock();
- if (done) {
- delete rm->ctx;
- delete rm;
- put("RepModify");
+ dout(10) << "sub_op_modify_applied on " << rm << " op "
+ << *rm->op->get_req() << dendl;
+ MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+
+ if (!rm->committed) {
+ // send ack to acker only if we haven't sent a commit already
+ MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+ osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
}
+
+ parent->op_applied(m->version);
}
-void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
+void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
{
- lock();
rm->op->mark_commit_sent();
rm->committed = true;
- if (!pg_has_reset_since(rm->epoch_started)) {
- // send commit.
- dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
- << ", sending commit to osd." << rm->ackerosd
- << dendl;
-
- if (get_osdmap()->is_up(rm->ackerosd)) {
- last_complete_ondisk = rm->last_complete;
- MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
- commit->set_last_complete_ondisk(rm->last_complete);
- commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
- }
- } else {
- dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req()
- << " from epoch " << rm->epoch_started << " < last_peering_reset "
- << last_peering_reset << dendl;
- }
+ // send commit.
+ dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
+ << ", sending commit to osd." << rm->ackerosd
+ << dendl;
- log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
- bool done = rm->applied && rm->committed;
- unlock();
- if (done) {
- delete rm->ctx;
- delete rm;
- put("RepModify");
- }
-}
-
-void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
-{
- MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
- assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
-
- op->mark_started();
-
- // must be replication.
- tid_t rep_tid = r->get_tid();
- int fromosd = r->get_source().num();
+ assert(get_osdmap()->is_up(rm->ackerosd));
+ get_parent()->update_last_complete_ondisk(rm->last_complete);
+ MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ commit->set_last_complete_ondisk(rm->last_complete);
+ commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+ osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
- if (repop_map.count(rep_tid)) {
- // oh, good.
- repop_ack(repop_map[rep_tid],
- r->get_result(), r->ack_type,
- fromosd,
- r->get_last_complete_ondisk());
- }
+ log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
}
-
-
-
-
-
-
-
-
// ===========================================================
void ReplicatedBackend::calc_head_subsets(
t->register_on_complete(new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
}
+ t->register_on_applied(
+ new ObjectStore::C_DeleteTransaction(t));
get_parent()->queue_transaction(t);
return;
}
RepGather *repop = repop_queue.front();
repop_queue.pop_front();
dout(10) << " applying repop tid " << repop->rep_tid << dendl;
- if (!repop->applied && !repop->applying)
- apply_repop(repop);
- repop->aborted = true;
+ repop->rep_aborted = true;
if (requeue) {
if (repop->ctx->op) {
// the deleted object over this period.
hobject_t old_obj =
get_hit_set_current_object(info.hit_set.current_last_stamp);
- ctx->op_t.remove(coll, old_obj);
+ ctx->op_t->remove(old_obj);
ctx->log.push_back(
- pg_log_entry_t(pg_log_entry_t::DELETE,
- old_obj,
- ctx->at_version,
- info.hit_set.current_last_update,
- 0,
- osd_reqid_t(),
- ctx->mtime));
+ pg_log_entry_t(pg_log_entry_t::DELETE,
+ old_obj,
+ ctx->at_version,
+ info.hit_set.current_last_update,
+ 0,
+ osd_reqid_t(),
+ ctx->mtime));
++ctx->at_version.version;
struct stat st;
bufferlist boi(sizeof(ctx->new_obs.oi));
::encode(ctx->new_obs.oi, boi);
- ctx->op_t.write(coll, oid, 0, bl.length(), bl);
- ctx->op_t.setattr(coll, oid, OI_ATTR, boi);
- ctx->op_t.setattr(coll, oid, SS_ATTR, bss);
+ ctx->op_t->write(oid, 0, bl.length(), bl);
+ ctx->op_t->setattr(oid, OI_ATTR, boi);
+ ctx->op_t->setattr(oid, SS_ATTR, bss);
ctx->log.push_back(
- pg_log_entry_t(
- pg_log_entry_t::MODIFY,
- oid,
- ctx->at_version,
- ctx->obs->oi.version,
- 0,
- osd_reqid_t(),
- ctx->mtime)
- );
+ pg_log_entry_t(
+ pg_log_entry_t::MODIFY,
+ oid,
+ ctx->at_version,
+ ctx->obs->oi.version,
+ 0,
+ osd_reqid_t(),
+ ctx->mtime)
+ );
hit_set_trim(repop, pool.info.hit_set_count);
assert(p != info.hit_set.history.end());
hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
dout(20) << __func__ << " removing " << oid << dendl;
- repop->ctx->op_t.remove(coll, oid);
+ repop->ctx->op_t->remove(oid);
++repop->ctx->at_version.version;
repop->ctx->log.push_back(
pg_log_entry_t(pg_log_entry_t::DELETE,
RepGather *repop = pg->trim_object(pos);
assert(repop);
repop->queue_snap_trimmer = true;
+
repops.insert(repop->get());
pg->simple_repop_submit(repop);
return discard_event();
for (set<RepGather *>::iterator i = repops.begin();
i != repops.end();
repops.erase(i++)) {
- if (!(*i)->applied || !(*i)->waitfor_ack.empty()) {
+ if (!(*i)->all_applied) {
return discard_event();
} else {
(*i)->put();
uint64_t get_with_id(ReplicatedPG *pg) { return pg->get_with_id(); }
void put_with_id(ReplicatedPG *pg, uint64_t id) { return pg->put_with_id(id); }
#endif
+
+void intrusive_ptr_add_ref(ReplicatedPG::RepGather *repop) { repop->get(); }
+void intrusive_ptr_release(ReplicatedPG::RepGather *repop) { repop->put(); }