repop->all_applied = true;
if (!repop->rep_aborted) {
eval_repop(repop);
+ if (repop->on_applied) {
+ repop->on_applied->complete(0);
+ repop->on_applied = NULL;
+ }
}
}
repop_queue.pop_front();
dout(10) << " applying repop tid " << repop->rep_tid << dendl;
repop->rep_aborted = true;
+ if (repop->on_applied) {
+ delete repop->on_applied;
+ repop->on_applied = NULL;
+ }
if (requeue) {
if (repop->ctx->op) {
dout(20) << __func__ << dendl;
hit_set.reset();
hit_set_start_stamp = utime_t();
+ hit_set_flushing.clear();
}
void ReplicatedPG::hit_set_setup()
return true;
}
+struct C_HitSetFlushing : public Context {
+ ReplicatedPGRef pg;
+ time_t hit_set_name;
+ C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { }
+ void finish(int r) {
+ pg->hit_set_flushing.erase(hit_set_name);
+ }
+};
+
void ReplicatedPG::hit_set_persist()
{
dout(10) << __func__ << dendl;
RepGather *repop;
hobject_t oid;
bool reset = false;
+ time_t flush_time = 0;
if (!info.hit_set.current_info.begin)
info.hit_set.current_info.begin = hit_set_start_stamp;
if (agent_state)
agent_state->add_hit_set(info.hit_set.current_info.begin, hit_set);
+ // hold a ref until it is flushed to disk
+ hit_set_flushing[info.hit_set.current_info.begin] = hit_set;
+ flush_time = info.hit_set.current_info.begin;
} else {
// persist snapshot of current hitset
::encode(*hit_set, bl);
ObjectContextRef obc = get_object_context(oid, true);
repop = simple_repop_create(obc);
+ if (flush_time != 0)
+ repop->on_applied = new C_HitSetFlushing(this, flush_time);
OpContext *ctx = repop->ctx;
ctx->at_version = get_next_version();
derr << __func__ << " on non-replicated pool" << dendl;
break;
}
- bufferlist bl;
- hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
- int r = osd->store->read(coll, oid, 0, 0, bl);
- assert(r >= 0);
- HitSetRef hs(new HitSet);
- bufferlist::iterator pbl = bl.begin();
- ::decode(*hs, pbl);
- agent_state->add_hit_set(p->begin.sec(), hs);
+
+ // check if it's still in flight
+ if (hit_set_flushing.count(p->begin)) {
+ agent_state->add_hit_set(p->begin.sec(), hit_set_flushing[p->begin]);
+ } else {
+ bufferlist bl;
+ hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
+ int r = osd->store->read(coll, oid, 0, 0, bl);
+ assert(r >= 0);
+ HitSetRef hs(new HitSet);
+ bufferlist::iterator pbl = bl.begin();
+ ::decode(*hs, pbl);
+ agent_state->add_hit_set(p->begin.sec(), hs);
+ }
}
}
}
eversion_t pg_local_last_complete;
bool queue_snap_trimmer;
+
+ Context *on_applied;
RepGather(OpContext *c, ObjectContextRef pi, tid_t rt,
eversion_t lc) :
//sent_nvram(false),
sent_disk(false),
pg_local_last_complete(lc),
- queue_snap_trimmer(false) { }
+ queue_snap_trimmer(false),
+ on_applied(NULL) { }
RepGather *get() {
nref++;
assert(nref > 0);
if (--nref == 0) {
delete ctx; // must already be unlocked
+ assert(on_applied == NULL);
delete this;
//generic_dout(0) << "deleting " << this << dendl;
}
HitSetRef hit_set; ///< currently accumulating HitSet
utime_t hit_set_start_stamp; ///< time the current HitSet started recording
+ map<time_t,HitSetRef> hit_set_flushing; ///< currently being written, not yet readable
+
void hit_set_clear(); ///< discard any HitSet state
void hit_set_setup(); ///< initialize HitSet state
void hit_set_create(); ///< create a new HitSet
boost::scoped_ptr<TierAgentState> agent_state;
friend class C_AgentFlushStartStop;
+ friend class C_HitSetFlushing;
void agent_setup(); ///< initialize agent state
void agent_work(int max); ///< entry point to do some agent work