}
ReplicatedBackend::ReplicatedBackend(
- PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
- PGBackend(pg), temp_created(false),
- temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
- coll(coll), osd(osd), cct(osd->cct) {}
+ PGBackend::Listener *pg, coll_t coll, ObjectStore *store,
+ CephContext *cct) :
+ PGBackend(pg, store,
+ coll, coll_t::make_temp_coll(pg->get_info().pgid)),
+ cct(cct) {}
void ReplicatedBackend::run_recovery_op(
PGBackend::RecoveryHandle *_h,
void ReplicatedBackend::on_flushed()
{
if (have_temp_coll() &&
- !osd->store->collection_empty(get_temp_coll())) {
+ !store->collection_empty(get_temp_coll())) {
vector<hobject_t> objects;
- osd->store->collection_list(get_temp_coll(), objects);
+ store->collection_list(get_temp_coll(), objects);
derr << __func__ << ": found objects in the temp collection: "
<< objects << ", crashing now"
<< dendl;
uint64_t len,
bufferlist *bl)
{
- return osd->store->read(coll, hoid, off, len, *bl);
+ return store->read(coll, hoid, off, len, *bl);
}
struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
to_read.begin();
i != to_read.end() && r >= 0;
++i) {
- int _r = osd->store->read(coll, hoid, i->first.first,
- i->first.second, *(i->second.first));
+ int _r = store->read(coll, hoid, i->first.first,
+ i->first.second, *(i->second.first));
if (i->second.second) {
- osd->gen_wq.queue(
+ get_parent()->schedule_work(
get_parent()->bless_gencontext(
new AsyncReadCallback(_r, i->second.second)));
}
if (_r < 0)
r = _r;
}
- osd->gen_wq.queue(
+ get_parent()->schedule_work(
get_parent()->bless_gencontext(
new AsyncReadCallback(r, on_complete)));
}
if (op->op)
op->op->mark_event("op_applied");
- op->waiting_for_applied.erase(osd->whoami);
+ op->waiting_for_applied.erase(get_parent()->whoami());
parent->op_applied(op->v);
if (op->waiting_for_applied.empty()) {
if (op->op)
op->op->mark_event("op_commit");
- op->waiting_for_commit.erase(osd->whoami);
+ op->waiting_for_commit.erase(get_parent()->whoami());
if (op->waiting_for_commit.empty()) {
op->on_commit->complete(0);
}
static void log_subop_stats(
- OSDService *osd,
+ PerfCounters *logger,
OpRequestRef op, int tag_inb, int tag_lat)
{
utime_t now = ceph_clock_now(g_ceph_context);
uint64_t inb = op->get_req()->get_data().length();
- osd->logger->inc(l_osd_sop);
+ logger->inc(l_osd_sop);
- osd->logger->inc(l_osd_sop_inb, inb);
- osd->logger->tinc(l_osd_sop_lat, latency);
+ logger->inc(l_osd_sop_inb, inb);
+ logger->tinc(l_osd_sop_lat, latency);
if (tag_inb)
- osd->logger->inc(tag_inb, inb);
- osd->logger->tinc(tag_lat, latency);
+ logger->inc(tag_inb, inb);
+ logger->tinc(tag_lat, latency);
}
struct OnReadComplete : public Context {
peer_missing[peer].revise_have(soid, eversion_t());
}
+void ReplicatedPG::schedule_work(
+ GenContext<ThreadPool::TPHandle&> *c)
+{
+ osd->gen_wq.queue(c);
+}
+
+void ReplicatedPG::send_message_osd_cluster(
+ int peer, Message *m, epoch_t from_epoch)
+{
+ osd->send_message_osd_cluster(peer, m, from_epoch);
+}
+
+void ReplicatedPG::send_message_osd_cluster(
+ Message *m, Connection *con)
+{
+ osd->send_message_osd_cluster(m, con);
+}
+
+void ReplicatedPG::send_message_osd_cluster(
+ Message *m, const ConnectionRef& con)
+{
+ osd->send_message_osd_cluster(m, con);
+}
+
+ConnectionRef ReplicatedPG::get_con_osd_cluster(
+ int peer, epoch_t from_epoch)
+{
+ return osd->get_con_osd_cluster(peer, from_epoch);
+}
+
+PerfCounters *ReplicatedPG::get_logger()
+{
+ return osd->logger;
+}
+
// =======================
// pg changes
const PGPool &_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid),
- pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
+ pgbackend(new ReplicatedBackend(this, coll_t(p), o->store, cct)),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
temp_seq(0),
snap_trimmer_machine(this)
reply->compute_cost(cct);
t->register_on_complete(
- new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
t->register_on_applied(
new ObjectStore::C_DeleteTransaction(t));
m->get_priority());
c->to_continue.swap(to_continue);
t->register_on_complete(
- new C_QueueInWQ(
- &osd->push_wq,
+ new PG_QueueAsync(
+ get_parent(),
get_parent()->bless_gencontext(c)));
}
replies.erase(replies.end() - 1);
reply->compute_cost(cct);
t->register_on_complete(
- new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
}
t->register_on_applied(
wr->new_temp_oid = new_temp_oid;
wr->discard_temp_oid = discard_temp_oid;
- osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
-
+ get_parent()->send_message_osd_cluster(
+ peer, wr, get_osdmap()->get_epoch());
}
}
// send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
- osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
+ get_parent()->send_message_osd_cluster(
+ rm->ackerosd, ack, get_osdmap()->get_epoch());
}
parent->op_applied(m->version);
MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
- osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
+ get_parent()->send_message_osd_cluster(
+ rm->ackerosd, commit, get_osdmap()->get_epoch());
- log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
+ log_subop_stats(get_parent()->get_logger(), rm->op,
+ l_osd_sop_w_inb, l_osd_sop_w_lat);
}
ObjectRecoveryProgress progress)
{
// send op
- tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
+ tid_t tid = get_parent()->get_tid();
+ osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
dout(10) << "send_pull_op " << recovery_info.soid << " "
<< recovery_info.version
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
- osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
+ get_parent()->send_message_osd_cluster(
+ peer, subop, get_osdmap()->get_epoch());
- osd->logger->inc(l_osd_pull);
+ get_parent()->get_logger()->inc(l_osd_pull);
return 0;
}
} else {
dout(10) << __func__ << ": Creating oid "
<< recovery_info.soid << " in the temp collection" << dendl;
- temp_contents.insert(recovery_info.soid);
+ add_temp_obj(recovery_info.soid);
target_coll = get_temp_coll(t);
}
if (complete) {
if (!first) {
- assert(temp_contents.count(recovery_info.soid));
dout(10) << __func__ << ": Removing oid "
<< recovery_info.soid << " from the temp collection" << dendl;
- temp_contents.erase(recovery_info.soid);
+ clear_temp_obj(recovery_info.soid);
t->collection_move(coll, target_coll, recovery_info.soid);
}
C_OnPushCommit(ReplicatedPG *pg, OpRequestRef op) : pg(pg), op(op) {}
void finish(int) {
op->mark_event("committed");
- log_subop_stats(pg->osd, op, l_osd_push_inb, l_osd_sop_push_lat);
+ log_subop_stats(pg->osd->logger, op, l_osd_push_inb, l_osd_sop_push_lat);
}
};
for (map<int, vector<PushOp> >::iterator i = pushes.begin();
i != pushes.end();
++i) {
- ConnectionRef con = osd->get_con_osd_cluster(
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
i->first,
get_osdmap()->get_epoch());
if (!con)
msg->pushes.push_back(*j);
}
msg->compute_cost(cct);
- osd->send_message_osd_cluster(msg, con);
+ get_parent()->send_message_osd_cluster(msg, con);
}
}
}
for (map<int, vector<PullOp> >::iterator i = pulls.begin();
i != pulls.end();
++i) {
- ConnectionRef con = osd->get_con_osd_cluster(
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
i->first,
get_osdmap()->get_epoch());
if (!con)
msg->map_epoch = get_osdmap()->get_epoch();
msg->pulls.swap(i->second);
msg->compute_cost(cct);
- osd->send_message_osd_cluster(msg, con);
+ get_parent()->send_message_osd_cluster(msg, con);
}
}
}
<< dendl;
if (progress.first) {
- osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
- osd->store->getattrs(coll, recovery_info.soid, out_op->attrset);
+ store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
+ store->getattrs(coll, recovery_info.soid, out_op->attrset);
// Debug
bufferlist bv;
object_info_t oi(bv);
if (oi.version != recovery_info.version) {
- osd->clog.error() << get_info().pgid << " push "
- << recovery_info.soid << " v "
- << recovery_info.version
- << " failed because local copy is "
- << oi.version << "\n";
+ get_parent()->clog_error() << get_info().pgid << " push "
+ << recovery_info.soid << " v "
+ << recovery_info.version
+ << " failed because local copy is "
+ << oi.version << "\n";
return -EINVAL;
}
uint64_t available = cct->_conf->osd_recovery_max_chunk;
if (!progress.omap_complete) {
ObjectMap::ObjectMapIterator iter =
- osd->store->get_omap_iterator(coll,
- recovery_info.soid);
+ store->get_omap_iterator(coll,
+ recovery_info.soid);
for (iter->lower_bound(progress.omap_recovered_to);
iter->valid();
iter->next()) {
p != out_op->data_included.end();
++p) {
bufferlist bit;
- osd->store->read(coll, recovery_info.soid,
+ store->read(coll, recovery_info.soid,
p.get_start(), p.get_len(), bit);
if (p.get_len() != bit.length()) {
dout(10) << " extent " << p.get_start() << "~" << p.get_len()
stat->num_bytes_recovered += out_op->data.length();
}
- osd->logger->inc(l_osd_push);
- osd->logger->inc(l_osd_push_outb, out_op->data.length());
+ get_parent()->get_logger()->inc(l_osd_push);
+ get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
// send
out_op->version = recovery_info.version;
int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop)
{
- tid_t tid = osd->get_tid();
- osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
+ tid_t tid = get_parent()->get_tid();
+ osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, pop.soid,
false, 0, get_osdmap()->get_epoch(),
tid, pop.recovery_info.version);
subop->current_progress = pop.before_progress;
subop->recovery_progress = pop.after_progress;
- osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
+ get_parent()->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
return 0;
}
m->get_source().num(),
reply);
- log_subop_stats(osd, op, 0, l_osd_sop_pull_lat);
+ log_subop_stats(get_parent()->get_logger(), op, 0, l_osd_sop_pull_lat);
}
void ReplicatedBackend::handle_pull(int peer, PullOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
struct stat st;
- int r = osd->store->stat(coll, soid, &st);
+ int r = store->stat(coll, soid, &st);
if (r != 0) {
- osd->clog.error() << get_info().pgid << " "
- << peer << " tried to pull " << soid
- << " but got " << cpp_strerror(-r) << "\n";
+ get_parent()->clog_error() << get_info().pgid << " "
+ << peer << " tried to pull " << soid
+ << " but got " << cpp_strerror(-r) << "\n";
prep_push_op_blank(soid, reply);
} else {
ObjectRecoveryInfo &recovery_info = op.recovery_info;
op->get_req()->get_priority());
c->to_continue.swap(to_continue);
t->register_on_complete(
- new C_QueueInWQ(
- &osd->push_wq,
+ new PG_QueueAsync(
+ get_parent(),
get_parent()->bless_gencontext(c)));
}
run_recovery_op(h, op->get_req()->get_priority());
reply->set_priority(m->get_priority());
assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
handle_push(m->get_source().num(), pop, &resp, t);
- t->register_on_complete(new C_OSD_SendMessageOnConn(
- osd, reply, m->get_connection()));
+ t->register_on_complete(new PG_SendMessageOnConn(
+ get_parent(), reply, m->get_connection()));
}
t->register_on_applied(
new ObjectStore::C_DeleteTransaction(t));