} else {
dout(10) << " discarding waiting ops for " << pgid << dendl;
while (!p->second.empty()) {
- p->second.front()->put();
p->second.pop_front();
}
waiting_for_pg.erase(p++);
if (!op->request->get_connection()->peer_is_osd()) {
dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr()
<< " " << *op->request << dendl;
- op->put();
return false;
}
return true;
if (epoch < up_epoch) {
dout(7) << "from pre-up epoch " << epoch << " < " << up_epoch << dendl;
- op->put();
return false;
}
cluster_messenger->mark_down_on_empty(con);
cluster_messenger->mark_disposable(con);
- op->put();
return false;
}
}
// ok, we have at least as new a map as they do. are we (re)booting?
if (!is_active()) {
dout(7) << "still in boot state, dropping message " << *m << dendl;
- op->put();
return false;
}
if (!require_mon_peer(op->request)) {
// we have to hack around require_mon_peer's interface limits
op->request = NULL;
- op->put();
return;
}
do_infos(info_map);
maybe_update_heartbeat_peers();
-
- op->put();
}
do_infos(info_map);
maybe_update_heartbeat_peers();
-
- op->put();
}
void OSD::handle_pg_log(OpRequestRef op)
PG *pg = get_or_create_pg(m->info, m->get_epoch(),
from, created, false, &t, &fin);
if (!pg) {
- op->put();
return;
}
assert(!tr);
maybe_update_heartbeat_peers();
-
- op->put();
}
void OSD::handle_pg_info(OpRequestRef op)
do_infos(info_map);
maybe_update_heartbeat_peers();
-
- op->put();
}
void OSD::handle_pg_trim(OpRequestRef op)
if (m->epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old trim to " << m->trim_to << ", ignoring" << dendl;
pg->unlock();
- goto out;
+ return;
}
assert(pg);
}
pg->unlock();
}
-
- out:
- op->put();
}
void OSD::handle_pg_scan(OpRequestRef op)
PG *pg;
if (!_have_pg(m->pgid)) {
- op->put();
return;
}
if (m->query_epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old scan, ignoring" << dendl;
- op->put();
return false;
}
PG *pg;
if (!_have_pg(m->pgid)) {
- op->put();
return;
}
if (m->query_epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old backfill, ignoring" << dendl;
- op->put();
return false;
}
do_queries(query_map);
maybe_update_heartbeat_peers();
-
- op->put();
#endif
}
}
do_notifies(notify_list, m->get_epoch());
-
- op->put();
}
}
pg->unlock();
}
- op->put();
}
if (m->get_source().is_osd())
msgr = cluster_messenger;
msgr->send_message(reply, m->get_connection());
- op->put();
}
void OSD::handle_misdirected_op(PG *pg, OpRequestRef op)
if (pg) {
if (m->get_map_epoch() < pg->info.history.same_primary_since) {
dout(7) << *pg << " changed after " << m->get_map_epoch() << ", dropping" << dendl;
- op->put();
return;
} else {
dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (op_is_discardable(m)) {
- op->put();
return;
}
// okay, we aren't valid now; check send epoch
if (m->get_map_epoch() >= superblock.oldest_map) {
dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
- op->put();
return;
}
OSDMapRef send_map = get_map(m->get_map_epoch());
if (send_map->get_pg_role(m->get_pg(), whoami) >= 0) {
dout(7) << "dropping request; client will resend when they get new map" << dendl;
- op->put();
} else {
dout(7) << "we are invalid target" << dendl;
handle_misdirected_op(NULL, op);
dout(10) << "handle_sub_op " << *m << " epoch " << m->map_epoch << dendl;
if (m->map_epoch < up_epoch) {
dout(3) << "replica op from before up" << dendl;
- op->put();
return;
}
PG *pg = _have_pg(pgid) ? _lookup_lock_pg(pgid) : NULL;
if (!pg) {
- op->put();
return;
}
pg->get();
assert(m->get_header().type == MSG_OSD_SUBOPREPLY);
if (m->get_map_epoch() < up_epoch) {
dout(3) << "replica op reply from before up" << dendl;
- op->put();
return;
}
PG *pg = _have_pg(pgid) ? _lookup_lock_pg(pgid) : NULL;
if (!pg) {
- op->put();
return;
}
pg->get();
}
if (op_is_discardable(m)) {
- op->put();
return false;
}
dout(10) << "handle_sub_op pg changed " << pg->info.history
<< " after " << m->map_epoch
<< ", dropping" << dendl;
- op->put();
return false;
}
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
- if (osd->op_is_discardable((MOSDOp*)op->request))
- op->put();
- else
+ if (!osd->op_is_discardable((MOSDOp*)op->request))
do_op(op); // do it now
break;
if (m->map_epoch < info.history.same_interval_since) {
dout(10) << "sub_op_scrub discarding old sub_op from "
<< m->map_epoch << " < " << info.history.same_interval_since << dendl;
- op->put();
return;
}
assert(last_update_applied == info.last_update);
osd->scrub_finalize_wq.queue(this);
}
-
- op->put();
}
/*
if (scrub_reserved) {
dout(10) << "Ignoring reserve request: Already reserved" << dendl;
- op->put();
return;
}
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
::encode(scrub_reserved, reply->get_data());
osd->cluster_messenger->send_message(reply, m->get_connection());
-
- op->put();
}
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
if (!scrub_reserved) {
dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
- op->put();
return;
}
}
sched_scrub();
}
-
- op->put();
}
void PG::sub_op_scrub_unreserve(OpRequestRef op)
op->mark_started();
clear_scrub_reserved();
-
- op->put();
}
void PG::sub_op_scrub_stop(OpRequestRef op)
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
osd->cluster_messenger->send_message(reply, m->get_connection());
-
- op->put();
}
void PG::clear_scrub_reserved()
reply->set_data(outdata);
reply->set_result(result);
osd->client_messenger->send_message(reply, m->get_connection());
- op->put();
delete filter;
}
} else {
dout(10) << "no src oid specified for multi op " << osd_op << dendl;
osd->reply_op_error(op, -EINVAL);
- op->put();
}
put_object_contexts(src_obc);
put_object_context(obc);
ctx->reply = NULL;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->client_messenger->send_message(reply, m->get_connection());
- op->put();
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
}
break;
}
-
- op->put();
}
void ReplicatedPG::do_backfill(OpRequestRef op)
}
break;
}
-
- op->put();
}
/* Returns head of snap_trimq as snap_to_trim and the relevant objects as
unlock();
if (done) {
delete rm->ctx;
- rm->op->put();
delete rm;
put();
}
unlock();
if (done) {
delete rm->ctx;
- rm->op->put();
delete rm;
put();
}
fromosd,
r->get_last_complete_ondisk());
}
-
- op->put();
}
}
}
}
- op->put();
}
void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
}
log_subop_stats(op, 0, l_osd_sop_pull_lat);
- op->put();
}
} else {
handle_push(op);
}
- op->put();
return;
}
finish_recovery_op(soid); // close out this attempt,
pull_from_peer[from].erase(soid);
pulling.erase(soid);
-
- op->put();
}
void ReplicatedPG::sub_op_remove(OpRequestRef op)
remove_object_with_snap_hardlinks(*t, m->poid);
int r = osd->store->queue_transaction(&osr, t);
assert(r == 0);
-
- op->put();
}