osd->op_wq.queue(make_pair(PGRef(this), op));
}
-void PG::do_request(
- OpRequestRef op,
- ThreadPool::TPHandle &handle)
-{
- // do any pending flush
- do_pending_flush();
-
- if (!op_has_sufficient_caps(op)) {
- osd->reply_op_error(op, -EPERM);
- return;
- }
- assert(!op_must_wait_for_map(get_osdmap(), op));
- if (can_discard_request(op)) {
- return;
- }
- if (!flushed) {
- dout(20) << " !flushed, waiting for active on " << op << dendl;
- waiting_for_active.push_back(op);
- return;
- }
-
- switch (op->request->get_type()) {
- case CEPH_MSG_OSD_OP:
- if (is_replay() || !is_active()) {
- dout(20) << " replay, waiting for active on " << op << dendl;
- waiting_for_active.push_back(op);
- return;
- }
- do_op(op); // do it now
- break;
-
- case MSG_OSD_SUBOP:
- do_sub_op(op);
- break;
-
- case MSG_OSD_SUBOPREPLY:
- do_sub_op_reply(op);
- break;
-
- case MSG_OSD_PG_SCAN:
- do_scan(op, handle);
- break;
-
- case MSG_OSD_PG_BACKFILL:
- do_backfill(op);
- break;
-
- case MSG_OSD_PG_PUSH:
- if (!is_active()) {
- waiting_for_active.push_back(op);
- op->mark_delayed("waiting for active");
- return;
- }
- do_push(op);
- break;
-
- case MSG_OSD_PG_PULL:
- do_pull(op);
- break;
-
- case MSG_OSD_PG_PUSH_REPLY:
- do_push_reply(op);
- break;
-
- default:
- assert(0 == "bad message type in do_request");
- }
-}
-
-
void PG::replay_queued_ops()
{
assert(is_replay() && is_active());
// abstract bits
- void do_request(
+ virtual void do_request(
OpRequestRef op,
ThreadPool::TPHandle &handle
- );
+ ) = 0;
virtual void do_op(OpRequestRef op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
ThreadPool::TPHandle &handle
) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
- virtual void do_push(OpRequestRef op) = 0;
- virtual void do_pull(OpRequestRef op) = 0;
- virtual void do_push_reply(OpRequestRef op) = 0;
virtual void snap_trimmer() = 0;
virtual int do_command(cmdmap_t cmdmap, ostream& ss,
// TODOXXX: needs to be active possibly
sub_op_push(op);
return true;
+ default:
+ break;
}
}
break;
}
- case MSG_OSD_SUBOPREPLY:
+ case MSG_OSD_SUBOPREPLY: {
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
if (r->ops.size() >= 1) {
OSDOp &first = r->ops[0];
}
break;
}
+
+ default:
+ break;
+ }
return false;
}
src_oloc.key = oid.name;
}
+void ReplicatedPG::do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
+{
+ // do any pending flush
+ do_pending_flush();
+
+ if (!op_has_sufficient_caps(op)) {
+ osd->reply_op_error(op, -EPERM);
+ return;
+ }
+ assert(!op_must_wait_for_map(get_osdmap(), op));
+ if (can_discard_request(op)) {
+ return;
+ }
+ if (!flushed) {
+ dout(20) << " !flushed, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+
+ if (pgbackend->handle_message(op))
+ return;
+
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ if (is_replay() || !is_active()) {
+ dout(20) << " replay, waiting for active on " << op << dendl;
+ waiting_for_active.push_back(op);
+ return;
+ }
+ do_op(op); // do it now
+ break;
+
+ case MSG_OSD_SUBOP:
+ do_sub_op(op);
+ break;
+
+ case MSG_OSD_SUBOPREPLY:
+ do_sub_op_reply(op);
+ break;
+
+ case MSG_OSD_PG_SCAN:
+ do_scan(op, handle);
+ break;
+
+ case MSG_OSD_PG_BACKFILL:
+ do_backfill(op);
+ break;
+
+ default:
+ assert(0 == "bad message type in do_request");
+ }
+}
+
+
/** do_op - do an op
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
OSDOp *first = NULL;
if (m->ops.size() >= 1) {
first = &m->ops[0];
- switch (first->op.op) {
- case CEPH_OSD_OP_PULL:
- sub_op_pull(op);
- return;
- }
}
if (!is_active()) {
if (first) {
switch (first->op.op) {
- case CEPH_OSD_OP_PUSH:
- sub_op_push(op);
- return;
case CEPH_OSD_OP_DELETE:
sub_op_remove(op);
return;
if (r->ops.size() >= 1) {
OSDOp& first = r->ops[0];
switch (first.op.op) {
- case CEPH_OSD_OP_PUSH:
- // continue peer recovery
- sub_op_push_reply(op);
- return;
-
case CEPH_OSD_OP_SCRUB_RESERVE:
sub_op_scrub_reserve_reply(op);
return;
int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata,
bufferlist& odata);
+ void do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle);
void do_op(OpRequestRef op);
bool pg_op_must_wait(MOSDOp *op);
void do_pg_op(OpRequestRef op);