OPTION(osd_heartbeat_interval, OPT_INT, 6) // (seconds) how often we ping peers
OPTION(osd_heartbeat_grace, OPT_INT, 20) // (seconds) how long before we decide a peer has failed
OPTION(osd_heartbeat_min_peers, OPT_INT, 10) // minimum number of peers
+
+// max number of parallel snap trims/pg
+OPTION(osd_pg_max_concurrent_snap_trims, OPT_U64, 2)
+
OPTION(osd_mon_heartbeat_interval, OPT_INT, 30) // (seconds) how often to ping monitor if no peers
OPTION(osd_mon_report_interval_max, OPT_INT, 120)
OPTION(osd_mon_report_interval_min, OPT_INT, 5) // pg stats, failures, up_thru, boot.
dout(10) << "TrimmingObjects: trimming snap " << snap_to_trim << dendl;
- // Get next
- int r = pg->snap_mapper.get_next_object_to_trim(snap_to_trim, &pos);
- if (r != 0 && r != -ENOENT) {
- derr << __func__ << ": get_next returned " << cpp_strerror(r) << dendl;
- assert(0);
- } else if (r == -ENOENT) {
- // Done!
- dout(10) << "TrimmingObjects: got ENOENT" << dendl;
- post_event(SnapTrim());
- return transit< WaitingOnReplicas >();
+ for (set<RepGather *>::iterator i = repops.begin();
+ i != repops.end();
+ ) {
+ if ((*i)->done) {
+ (*i)->put();
+ repops.erase(i++);
+ } else {
+ ++i;
+ }
}
- dout(10) << "TrimmingObjects react trimming " << pos << dendl;
- RepGather *repop = pg->trim_object(pos);
- assert(repop);
+ while (repops.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
+ // Get next
+ hobject_t old_pos = pos;
+ int r = pg->snap_mapper.get_next_object_to_trim(snap_to_trim, &pos);
+ if (r != 0 && r != -ENOENT) {
+ derr << __func__ << ": get_next returned " << cpp_strerror(r) << dendl;
+ assert(0);
+ } else if (r == -ENOENT) {
+ // Done!
+ dout(10) << "TrimmingObjects: got ENOENT" << dendl;
+ post_event(SnapTrim());
+ return transit< WaitingOnReplicas >();
+ }
- repop->queue_snap_trimmer = true;
- eversion_t old_last_update = pg->pg_log.get_head();
- bool old_exists = repop->obc->obs.exists;
- uint64_t old_size = repop->obc->obs.oi.size;
- eversion_t old_version = repop->obc->obs.oi.version;
+ dout(10) << "TrimmingObjects react trimming " << pos << dendl;
+ RepGather *repop = pg->trim_object(pos);
+ assert(repop);
+
+ repop->queue_snap_trimmer = true;
- pg->append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
- pg->issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists, old_size, old_version);
- pg->eval_repop(repop);
+ eversion_t old_last_update = pg->pg_log.get_head();
+ bool old_exists = repop->obc->obs.exists;
+ uint64_t old_size = repop->obc->obs.oi.size;
+ eversion_t old_version = repop->obc->obs.oi.version;
- repops.insert(repop);
+ pg->append_log(repop->ctx->log, eversion_t(), repop->ctx->local_t);
+ pg->issue_repop(repop, repop->ctx->mtime, old_last_update, old_exists, old_size, old_version);
+ pg->eval_repop(repop);
+
+ repops.insert(repop);
+ }
return discard_event();
}
/* WaitingOnReplicasObjects */
for (set<RepGather *>::iterator i = repops.begin();
i != repops.end();
repops.erase(i++)) {
- if (!(*i)->applied || !(*i)->waitfor_ack.empty()) {
+ if (!(*i)->done) {
return discard_event();
} else {
(*i)->put();