#include "PG.h"
#include "ReplicatedPG.h"
#include "OSD.h"
+#include "OpRequest.h"
#include "PGLS.h"
#include "common/errno.h"
return missing.missing.count(soid);
}
-void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, Message *m)
+void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
{
assert(is_missing_object(soid));
dout(7) << "missing " << soid << " v " << v << ", pulling." << dendl;
pull(soid, v);
}
- waiting_for_missing_object[soid].push_back(m);
+ waiting_for_missing_object[soid].push_back(op);
}
-void ReplicatedPG::wait_for_all_missing(Message *m)
+void ReplicatedPG::wait_for_all_missing(OpRequest *op)
{
- waiting_for_all_missing.push_back(m);
+ waiting_for_all_missing.push_back(op);
}
bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
return false;
}
-void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, Message *m)
+void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op)
{
assert(is_degraded_object(soid));
}
recover_object_replicas(soid, v);
}
- waiting_for_degraded_object[soid].push_back(m);
+ waiting_for_degraded_object[soid].push_back(op);
}
bool PGLSParentFilter::filter(bufferlist& xattr_data, bufferlist& outdata)
return false;
}
-void ReplicatedPG::do_pg_op(MOSDOp *op)
+void ReplicatedPG::do_pg_op(OpRequest *op)
{
- dout(10) << "do_pg_op " << *op << dendl;
+ MOSDOp *m = (MOSDOp *)op->request;
+ assert(m->get_header().type == CEPH_MSG_OSD_OP);
+ dout(10) << "do_pg_op " << *m << dendl;
bufferlist outdata;
int result = 0;
PGLSFilter *filter = NULL;
bufferlist filter_out;
- snapid_t snapid = op->get_snapid();
+ snapid_t snapid = m->get_snapid();
- for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+ for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); p++) {
bufferlist::iterator bp = p->indata.begin();
switch (p->op.op) {
case CEPH_OSD_OP_PGLS_FILTER:
::decode(mname, bp);
}
catch (const buffer::error& e) {
- dout(0) << "unable to decode PGLS_FILTER description in " << *op << dendl;
+ dout(0) << "unable to decode PGLS_FILTER description in " << *m << dendl;
result = -EINVAL;
break;
}
// fall through
case CEPH_OSD_OP_PGLS:
- if (op->get_pg() != info.pgid) {
- dout(10) << " pgls pg=" << op->get_pg() << " != " << info.pgid << dendl;
+ if (m->get_pg() != info.pgid) {
+ dout(10) << " pgls pg=" << m->get_pg() << " != " << info.pgid << dendl;
result = 0; // hmm?
} else {
- dout(10) << " pgls pg=" << op->get_pg() << " count " << p->op.pgls.count << dendl;
+ dout(10) << " pgls pg=" << m->get_pg() << " count " << p->op.pgls.count << dendl;
// read into a buffer
vector<hobject_t> sentries;
PGLSResponse response;
::decode(response.handle, bp);
}
catch (const buffer::error& e) {
- dout(0) << "unable to decode PGLS handle in " << *op << dendl;
+ dout(0) << "unable to decode PGLS handle in " << *m << dendl;
result = -EINVAL;
break;
}
}
// reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(),
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
reply->set_data(outdata);
reply->set_result(result);
- osd->client_messenger->send_message(reply, op->get_connection());
+ osd->client_messenger->send_message(reply, m->get_connection());
op->put();
delete filter;
}
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
*/
-void ReplicatedPG::do_op(MOSDOp *op)
+void ReplicatedPG::do_op(OpRequest *op)
{
- if ((op->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) {
- if (pg_op_must_wait(op)) {
+ MOSDOp *m = (MOSDOp*)op->request;
+ assert(m->get_header().type == CEPH_MSG_OSD_OP);
+ if ((m->get_rmw_flags() & CEPH_OSD_FLAG_PGOP)) {
+ if (pg_op_must_wait(m)) {
wait_for_all_missing(op);
return;
}
return do_pg_op(op);
}
- dout(10) << "do_op " << *op << (op->may_write() ? " may_write" : "") << dendl;
+ dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl;
- if (finalizing_scrub && op->may_write()) {
+ if (finalizing_scrub && m->may_write()) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_active.push_back(op);
return;
}
// missing object?
- hobject_t head(op->get_oid(), op->get_object_locator().key,
- CEPH_NOSNAP, op->get_pg().ps());
+ hobject_t head(m->get_oid(), m->get_object_locator().key,
+ CEPH_NOSNAP, m->get_pg().ps());
if (is_missing_object(head)) {
wait_for_missing_object(head, op);
return;
}
// degraded object?
- if (op->may_write() && is_degraded_object(head)) {
+ if (m->may_write() && is_degraded_object(head)) {
wait_for_degraded_object(head, op);
return;
}
// missing snapdir?
- hobject_t snapdir(op->get_oid(), op->get_object_locator().key,
- CEPH_SNAPDIR, op->get_pg().ps());
+ hobject_t snapdir(m->get_oid(), m->get_object_locator().key,
+ CEPH_SNAPDIR, m->get_pg().ps());
if (is_missing_object(snapdir)) {
wait_for_missing_object(snapdir, op);
return;
}
// degraded object?
- if (op->may_write() && is_degraded_object(snapdir)) {
+ if (m->may_write() && is_degraded_object(snapdir)) {
wait_for_degraded_object(snapdir, op);
return;
}
- entity_inst_t client = op->get_source_inst();
+ entity_inst_t client = m->get_source_inst();
ObjectContext *obc;
- bool can_create = op->may_write();
+ bool can_create = m->may_write();
snapid_t snapid;
- int r = find_object_context(hobject_t(op->get_oid(),
- op->get_object_locator().key,
- op->get_snapid(), op->get_pg().ps()),
- op->get_object_locator(),
+ int r = find_object_context(hobject_t(m->get_oid(),
+ m->get_object_locator().key,
+ m->get_snapid(), m->get_pg().ps()),
+ m->get_object_locator(),
&obc, can_create, &snapid);
if (r) {
if (r == -EAGAIN) {
// If we're not the primary of this OSD, and we have
// CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise,
// we have to wait for the object.
- if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
+ if (is_primary() || (!(m->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
// missing the specific snap we need; requeue and wait.
assert(!can_create); // only happens on a read
- hobject_t soid(op->get_oid(), op->get_object_locator().key,
- snapid, op->get_pg().ps());
+ hobject_t soid(m->get_oid(), m->get_object_locator().key,
+ snapid, m->get_pg().ps());
wait_for_missing_object(soid, op);
return;
}
}
// make sure locator is consistent
- if (op->get_object_locator() != obc->obs.oi.oloc) {
- dout(10) << " provided locator " << op->get_object_locator()
+ if (m->get_object_locator() != obc->obs.oi.oloc) {
+ dout(10) << " provided locator " << m->get_object_locator()
<< " != object's " << obc->obs.oi.oloc
<< " on " << obc->obs.oi.soid << dendl;
- osd->clog.warn() << "bad locator " << op->get_object_locator()
+ osd->clog.warn() << "bad locator " << m->get_object_locator()
<< " on object " << obc->obs.oi.oloc
- << " loc " << op->get_object_locator()
- << " op " << *op << "\n";
+ << " loc " << m->get_object_locator()
+ << " op " << *m << "\n";
}
- if ((op->may_read()) && (obc->obs.oi.lost)) {
+ if ((m->may_read()) && (obc->obs.oi.lost)) {
// This object is lost. Reading from it returns an error.
dout(20) << __func__ << ": object " << obc->obs.oi.soid
<< " is lost" << dendl;
bool ok;
dout(10) << "do_op mode is " << mode << dendl;
assert(!mode.wake); // we should never have woken waiters here.
- if ((op->may_read() && op->may_write()) ||
- (op->get_flags() & CEPH_OSD_FLAG_RWORDERED))
+ if ((m->may_read() && m->may_write()) ||
+ (m->get_flags() & CEPH_OSD_FLAG_RWORDERED))
ok = mode.try_rmw(client);
- else if (op->may_write())
+ else if (m->may_write())
ok = mode.try_write(client);
- else if (op->may_read())
+ else if (m->may_read())
ok = mode.try_read(client);
else
assert(0);
return;
}
- if (!op->may_write() && !obc->obs.exists) {
+ if (!m->may_write() && !obc->obs.exists) {
osd->reply_op_error(op, -ENOENT);
put_object_context(obc);
return;
// src_oids
map<hobject_t,ObjectContext*> src_obc;
- for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+ for (vector<OSDOp>::iterator p = m->ops.begin(); p != m->ops.end(); p++) {
OSDOp& osd_op = *p;
if (!ceph_osd_op_type_multi(osd_op.op.op))
continue;
if (osd_op.soid.oid.name.length()) {
object_locator_t src_oloc;
- get_src_oloc(op->get_oid(), op->get_object_locator(), src_oloc);
- hobject_t src_oid(osd_op.soid, src_oloc.key, op->get_pg().ps());
+ get_src_oloc(m->get_oid(), m->get_object_locator(), src_oloc);
+ hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps());
if (!src_obc.count(src_oid)) {
ObjectContext *sobc;
snapid_t ssnapid;
int r = find_object_context(src_oid, src_oloc, &sobc, false, &ssnapid);
if (r == -EAGAIN) {
// missing the specific snap we need; requeue and wait.
- hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, op->get_pg().ps());
+ hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, m->get_pg().ps());
wait_for_missing_object(wait_oid, op);
} else if (r) {
osd->reply_op_error(op, r);
sobc->obs.oi.oloc.key != obc->obs.oi.soid.oid.name &&
sobc->obs.oi.soid.oid.name != obc->obs.oi.oloc.key) {
dout(1) << " src_oid " << osd_op.soid << " oloc " << sobc->obs.oi.oloc << " != "
- << op->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
+ << m->get_oid() << " oloc " << obc->obs.oi.oloc << dendl;
osd->reply_op_error(op, -EINVAL);
} else if (is_degraded_object(sobc->obs.oi.soid) ||
(before_backfill && sobc->obs.oi.soid > backfill_target_info->last_backfill)) {
}
const hobject_t& soid = obc->obs.oi.soid;
- OpContext *ctx = new OpContext(op, op->get_reqid(), op->ops,
+ OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
&obc->obs, obc->ssc,
this);
ctx->obc = obc;
ctx->src_obc = src_obc;
- if (op->may_write()) {
+ if (m->may_write()) {
// snap
if (pool->info.is_pool_snaps_mode()) {
// use pool's snapc
ctx->snapc = pool->snapc;
} else {
// client specified snapc
- ctx->snapc.seq = op->get_snap_seq();
- ctx->snapc.snaps = op->get_snaps();
+ ctx->snapc.seq = m->get_snap_seq();
+ ctx->snapc.snaps = m->get_snaps();
}
- if ((op->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
+ if ((m->get_flags() & CEPH_OSD_FLAG_ORDERSNAP) &&
ctx->snapc.seq < obc->ssc->snapset.seq) {
dout(10) << " ORDERSNAP flag set and snapc seq " << ctx->snapc.seq
<< " < snapset seq " << obc->ssc->snapset.seq
assert(ctx->at_version > info.last_update);
assert(ctx->at_version > log.head);
- ctx->mtime = op->get_mtime();
+ ctx->mtime = m->get_mtime();
dout(10) << "do_op " << soid << " " << ctx->ops
<< " ov " << obc->obs.oi.version << " av " << ctx->at_version
uint64_t old_size = obc->obs.oi.size;
eversion_t old_version = obc->obs.oi.version;
- if (op->may_read()) {
+ if (m->may_read()) {
dout(10) << " taking ondisk_read_lock" << dendl;
obc->ondisk_read_lock();
}
int result = prepare_transaction(ctx);
- if (op->may_read()) {
+ if (m->may_read()) {
dout(10) << " dropping ondisk_read_lock" << dendl;
obc->ondisk_read_unlock();
}
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
+ op->put();
return;
}
// prepare the reply
- ctx->reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
+ ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
ctx->reply->claim_op_out_data(ctx->ops);
ctx->reply->get_header().data_off = ctx->data_off;
ctx->reply->set_result(result);
reply->set_version(info.last_update);
ctx->reply = NULL;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
- osd->client_messenger->send_message(reply, op->get_connection());
+ osd->client_messenger->send_message(reply, m->get_connection());
op->put();
delete ctx;
put_object_context(obc);
return;
}
- assert(op->may_write());
+ assert(m->may_write());
// trim log?
calc_trim_to();
void ReplicatedPG::log_op_stats(OpContext *ctx)
{
- MOSDOp *op = (MOSDOp*)ctx->op;
+ MOSDOp *m = (MOSDOp*)ctx->op->request;
utime_t now = ceph_clock_now(g_ceph_context);
utime_t latency = now;
- latency -= ctx->op->get_recv_stamp();
+ latency -= ctx->op->request->get_recv_stamp();
utime_t rlatency;
if (ctx->readable_stamp != utime_t()) {
rlatency = ctx->readable_stamp;
- rlatency -= ctx->op->get_recv_stamp();
+ rlatency -= ctx->op->request->get_recv_stamp();
}
uint64_t inb = ctx->bytes_written;
osd->logger->inc(l_osd_op_inb, inb);
osd->logger->finc(l_osd_op_lat, latency);
- if (op->may_read() && op->may_write()) {
+ if (m->may_read() && m->may_write()) {
osd->logger->inc(l_osd_op_rw);
osd->logger->inc(l_osd_op_rw_inb, inb);
osd->logger->inc(l_osd_op_rw_outb, outb);
osd->logger->finc(l_osd_op_rw_rlat, rlatency);
osd->logger->finc(l_osd_op_rw_lat, latency);
- } else if (op->may_read()) {
+ } else if (m->may_read()) {
osd->logger->inc(l_osd_op_r);
osd->logger->inc(l_osd_op_r_outb, outb);
osd->logger->finc(l_osd_op_r_lat, latency);
- } else if (op->may_write()) {
+ } else if (m->may_write()) {
osd->logger->inc(l_osd_op_w);
osd->logger->inc(l_osd_op_w_inb, inb);
osd->logger->finc(l_osd_op_w_rlat, rlatency);
} else
assert(0);
- dout(15) << "log_op_stats " << *op
+ dout(15) << "log_op_stats " << *m
<< " inb " << inb
<< " outb " << outb
<< " rlat " << rlatency
<< " lat " << latency << dendl;
}
-void ReplicatedPG::log_subop_stats(MOSDSubOp *op, int tag_inb, int tag_lat)
+void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
{
utime_t now = ceph_clock_now(g_ceph_context);
utime_t latency = now;
- latency -= op->get_recv_stamp();
+ latency -= op->request->get_recv_stamp();
- uint64_t inb = op->get_data().length();
+ uint64_t inb = op->request->get_data().length();
osd->logger->inc(l_osd_sop);
osd->logger->inc(tag_inb, inb);
osd->logger->finc(tag_lat, latency);
- dout(15) << "log_subop_stats " << *op << " inb " << inb << " latency " << latency << dendl;
+ dout(15) << "log_subop_stats " << *op->request << " inb " << inb << " latency " << latency << dendl;
}
-void ReplicatedPG::do_sub_op(MOSDSubOp *op)
+void ReplicatedPG::do_sub_op(OpRequest *op)
{
- dout(15) << "do_sub_op " << *op << dendl;
+ MOSDSubOp *m = (MOSDSubOp*)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+ dout(15) << "do_sub_op " << *op->request << dendl;
- if (op->ops.size() >= 1) {
- OSDOp& first = op->ops[0];
+ if (m->ops.size() >= 1) {
+ OSDOp& first = m->ops[0];
switch (first.op.op) {
case CEPH_OSD_OP_PULL:
sub_op_pull(op);
sub_op_modify(op);
}
-void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
+void ReplicatedPG::do_sub_op_reply(OpRequest *op)
{
+ MOSDSubOpReply *r = (MOSDSubOpReply *)op->request;
+ assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
if (r->ops.size() >= 1) {
OSDOp& first = r->ops[0];
switch (first.op.op) {
case CEPH_OSD_OP_PUSH:
// continue peer recovery
- sub_op_push_reply(r);
+ sub_op_push_reply(op);
return;
case CEPH_OSD_OP_SCRUB_RESERVE:
- sub_op_scrub_reserve_reply(r);
+ sub_op_scrub_reserve_reply(op);
return;
}
}
- sub_op_modify_reply(r);
+ sub_op_modify_reply(op);
}
-void ReplicatedPG::do_scan(MOSDPGScan *m)
+void ReplicatedPG::do_scan(OpRequest *op)
{
+ MOSDPGScan *m = (MOSDPGScan*)op->request;
+ assert(m->get_header().type == MSG_OSD_PG_SCAN);
dout(10) << "do_scan " << *m << dendl;
switch (m->op) {
break;
}
- m->put();
+ op->put();
}
-void ReplicatedPG::do_backfill(MOSDPGBackfill *m)
+void ReplicatedPG::do_backfill(OpRequest *op)
{
+ MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
+ assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
dout(10) << "do_backfill " << *m << dendl;
switch (m->op) {
break;
}
- m->put();
+ op->put();
}
/* Returns head of snap_trimq as snap_to_trim and the relevant objects as
ObjectContext *src_obc = 0;
if (ceph_osd_op_type_multi(op.op)) {
object_locator_t src_oloc;
- get_src_oloc(soid.oid, ((MOSDOp *)ctx->op)->get_object_locator(), src_oloc);
+ get_src_oloc(soid.oid, ((MOSDOp *)ctx->op->request)->get_object_locator(), src_oloc);
hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash);
src_obc = ctx->src_obc[src_oid];
dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
case CEPH_OSD_OP_NOTIFY_ACK:
{
osd->watch_lock.Lock();
- entity_name_t source = ctx->op->get_source();
+ entity_name_t source = ctx->op->request->get_source();
map<entity_name_t, watch_info_t>::iterator oi_iter = oi.watchers.find(source);
Watch::Notification *notif = osd->watch->get_notif(op.watch.cookie);
if (oi_iter != oi.watchers.end() && notif) {
{
if (ctx->watch_connect || ctx->watch_disconnect ||
!ctx->notifies.empty() || !ctx->notify_acks.empty()) {
- OSD::Session *session = (OSD::Session *)ctx->op->get_connection()->get_priv();
+ OSD::Session *session = (OSD::Session *)ctx->op->request->get_connection()->get_priv();
ObjectContext *obc = ctx->obc;
object_info_t& oi = ctx->new_obs.oi;
hobject_t& soid = oi.soid;
// discard my reference to the buffer
if (repop->ctx->op)
- repop->ctx->op->clear_data();
+ repop->ctx->op->request->clear_data();
repop->applying = false;
repop->applied = true;
void ReplicatedPG::eval_repop(RepGather *repop)
{
- MOSDOp *op = (MOSDOp *)repop->ctx->op;
+ MOSDOp *m = (MOSDOp *)repop->ctx->op->request;
- if (op)
+ if (m)
dout(10) << "eval_repop " << *repop
- << " wants=" << (op->wants_ack() ? "a":"") << (op->wants_ondisk() ? "d":"")
+ << " wants=" << (m->wants_ack() ? "a":"") << (m->wants_ondisk() ? "d":"")
<< dendl;
else
dout(10) << "eval_repop " << *repop << " (no op)" << dendl;
mode.is_rmw_mode()))
apply_repop(repop);
- if (op) {
+ if (m) {
// an 'ondisk' reply implies 'ack'. so, prefer to send just one
// ondisk instead of ack followed by ondisk.
log_op_stats(repop->ctx);
- if (op->wants_ondisk() && !repop->sent_disk) {
+ if (m->wants_ondisk() && !repop->sent_disk) {
// send commit.
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
else
- reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
- assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
- osd->client_messenger->send_message(reply, op->get_connection());
+ assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+ osd->client_messenger->send_message(reply, m->get_connection());
repop->sent_disk = true;
}
}
// applied?
if (repop->waitfor_ack.empty()) {
- if (op->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
+ if (m->wants_ack() && !repop->sent_ack && !repop->sent_disk) {
// send ack
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
else
- reply = new MOSDOpReply(op, 0, get_osdmap()->get_epoch(), 0);
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
- assert(entity_name_t::TYPE_OSD != op->get_connection()->peer_type);
- osd->client_messenger->send_message(reply, op->get_connection());
+ assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+ osd->client_messenger->send_message(reply, m->get_connection());
repop->sent_ack = true;
}
{
OpContext *ctx = repop->ctx;
const hobject_t& soid = ctx->obs->oi.soid;
- MOSDOp *op = (MOSDOp *)ctx->op;
+ MOSDOp *m = (MOSDOp *)ctx->op->request;
dout(7) << "issue_repop rep_tid " << repop->rep_tid
<< " o " << soid
get_osdmap()->get_epoch(),
repop->rep_tid, repop->ctx->at_version);
- if (op && op->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) {
+ if (m && m->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC) {
// replicate original op for parallel execution on replica
assert(0 == "broken implementation, do not use");
wr->oloc = repop->ctx->obs->oi.oloc;
wr->old_version = old_version;
wr->snapset = repop->obc->ssc->snapset;
wr->snapc = repop->ctx->snapc;
- wr->set_data(repop->ctx->op->get_data()); // _copy_ bufferlist
+ wr->set_data(repop->ctx->op->request->get_data()); // _copy_ bufferlist
} else {
// ship resulting transaction, log entries, and pg_stats
if (peer == backfill_target && soid >= backfill_pos) {
tid_t rep_tid)
{
if (ctx->op)
- dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op << dendl;
+ dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl;
else
dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
int fromosd, eversion_t peer_lcod)
{
- MOSDOp *op = (MOSDOp *)repop->ctx->op;
+ MOSDOp *m = (MOSDOp *)repop->ctx->op->request;
- if (op)
- dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *op
+ if (m)
+ dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m
<< " result " << result
<< " ack_type " << ack_type
<< " from osd." << fromosd
// sub op modify
-void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
+void ReplicatedPG::sub_op_modify(OpRequest *op)
{
- const hobject_t& soid = op->poid;
+ MOSDSubOp *m = (MOSDSubOp*)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+
+ const hobject_t& soid = m->poid;
const char *opname;
- if (op->noop)
+ if (m->noop)
opname = "no-op";
- else if (op->ops.size())
- opname = ceph_osd_op_name(op->ops[0].op.op);
+ else if (m->ops.size())
+ opname = ceph_osd_op_name(m->ops[0].op.op);
else
opname = "trans";
dout(10) << "sub_op_modify " << opname
<< " " << soid
- << " v " << op->version
- << (op->noop ? " NOOP" : "")
- << (op->logbl.length() ? " (transaction)" : " (parallel exec")
- << " " << op->logbl.length()
+ << " v " << m->version
+ << (m->noop ? " NOOP" : "")
+ << (m->logbl.length() ? " (transaction)" : " (parallel exec")
+ << " " << m->logbl.length()
<< dendl;
// sanity checks
- assert(op->map_epoch >= info.history.same_interval_since);
+ assert(m->map_epoch >= info.history.same_interval_since);
assert(is_active());
assert(is_replica());
rm->ackerosd = ackerosd;
rm->last_complete = info.last_complete;
- if (!op->noop) {
- if (op->logbl.length()) {
+ if (!m->noop) {
+ if (m->logbl.length()) {
// shipped transaction and log entries
vector<Log::Entry> log;
- bufferlist::iterator p = op->get_data().begin();
+ bufferlist::iterator p = m->get_data().begin();
::decode(rm->opt, p);
- p = op->logbl.begin();
+ p = m->logbl.begin();
::decode(log, p);
- info.stats = op->pg_stats;
+ info.stats = m->pg_stats;
update_snap_collections(log, rm->localt);
- append_log(log, op->pg_trim_to, rm->localt);
+ append_log(log, m->pg_trim_to, rm->localt);
rm->tls.push_back(&rm->localt);
rm->tls.push_back(&rm->opt);
// TODO: this is severely broken because we don't know whether this object is really lost or
// not. We just always assume that it's not right now.
// Also, we're taking the address of a variable on the stack.
- object_info_t oi(soid, op->oloc);
+ object_info_t oi(soid, m->oloc);
oi.lost = false; // I guess?
- oi.version = op->old_version;
- oi.size = op->old_size;
- ObjectState obs(oi, op->old_exists);
- SnapSetContext ssc(op->poid.oid);
+ oi.version = m->old_version;
+ oi.size = m->old_size;
+ ObjectState obs(oi, m->old_exists);
+ SnapSetContext ssc(m->poid.oid);
- rm->ctx = new OpContext(op, op->reqid, op->ops, &obs, &ssc, this);
+ rm->ctx = new OpContext(op, m->reqid, m->ops, &obs, &ssc, this);
- rm->ctx->mtime = op->mtime;
- rm->ctx->at_version = op->version;
- rm->ctx->snapc = op->snapc;
+ rm->ctx->mtime = m->mtime;
+ rm->ctx->at_version = m->version;
+ rm->ctx->snapc = m->snapc;
- ssc.snapset = op->snapset;
+ ssc.snapset = m->snapset;
rm->ctx->obc->ssc = &ssc;
prepare_transaction(rm->ctx);
- append_log(rm->ctx->log, op->pg_trim_to, rm->ctx->local_t);
+ append_log(rm->ctx->log, m->pg_trim_to, rm->ctx->local_t);
rm->tls.push_back(&rm->ctx->op_t);
rm->tls.push_back(&rm->ctx->local_t);
} else {
// just trim the log
- if (op->pg_trim_to != eversion_t()) {
- trim(rm->localt, op->pg_trim_to);
+ if (m->pg_trim_to != eversion_t()) {
+ trim(rm->localt, m->pg_trim_to);
rm->tls.push_back(&rm->localt);
}
}
void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
{
lock();
- dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op << dendl;
+ dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
+ MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
if (!rm->committed) {
// send ack to acker only if we haven't sent a commit already
- MOSDSubOpReply *ack = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
}
rm->applied = true;
bool done = rm->applied && rm->committed;
- assert(info.last_update >= rm->op->version);
- assert(last_update_applied < rm->op->version);
- last_update_applied = rm->op->version;
+ assert(info.last_update >= m->version);
+ assert(last_update_applied < m->version);
+ last_update_applied = m->version;
if (finalizing_scrub) {
assert(active_rep_scrub);
assert(info.last_update <= active_rep_scrub->scrub_to);
lock();
// send commit.
- dout(10) << "sub_op_modify_commit on op " << *rm->op
+ dout(10) << "sub_op_modify_commit on op " << *rm->op->request
<< ", sending commit to osd." << rm->ackerosd
<< dendl;
if (get_osdmap()->is_up(rm->ackerosd)) {
last_complete_ondisk = rm->last_complete;
- MOSDSubOpReply *commit = new MOSDSubOpReply(rm->op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
}
}
-void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
+void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
{
+ MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
+ assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
// must be replication.
tid_t rep_tid = r->get_tid();
int fromosd = r->get_source().num();
r->get_last_complete_ondisk());
}
- r->put();
+ op->put();
}
osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
}
-void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
+void ReplicatedPG::sub_op_push_reply(OpRequest *op)
{
+ MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
+ assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
int peer = reply->get_source().num();
}
}
}
- reply->put();
+ op->put();
}
void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
* process request to pull an entire object.
* NOTE: called from opqueue.
*/
-void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
+void ReplicatedPG::sub_op_pull(OpRequest *op)
{
- const hobject_t soid = op->poid;
+ MOSDSubOp *m = (MOSDSubOp*)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
- dout(7) << "op_pull " << soid << " v " << op->version
- << " from " << op->get_source()
+ const hobject_t soid = m->poid;
+
+ dout(7) << "op_pull " << soid << " v " << m->version
+ << " from " << m->get_source()
<< dendl;
assert(!is_primary()); // we should be a replica or stray.
struct stat st;
int r = osd->store->stat(coll, soid, &st);
if (r != 0) {
- osd->clog.error() << info.pgid << " " << op->get_source() << " tried to pull " << soid
+ osd->clog.error() << info.pgid << " " << m->get_source() << " tried to pull " << soid
<< " but got " << cpp_strerror(-r) << "\n";
- send_push_op_blank(soid, op->get_source().num());
+ send_push_op_blank(soid, m->get_source().num());
} else {
uint64_t size = st.st_size;
bool complete = false;
- if (!op->data_subset.empty() && op->data_subset.range_end() >= size)
+ if (!m->data_subset.empty() && m->data_subset.range_end() >= size)
complete = true;
// complete==true implies we are definitely complete.
// complete==false means nothing. we don't know because the primary may
// not be pulling the entire object.
- r = send_push_op(soid, op->version, op->get_source().num(), size, op->first, complete,
- op->data_subset, op->clone_subsets);
+ r = send_push_op(soid, m->version, m->get_source().num(), size, m->first, complete,
+ m->data_subset, m->clone_subsets);
if (r < 0)
- send_push_op_blank(soid, op->get_source().num());
+ send_push_op_blank(soid, m->get_source().num());
}
log_subop_stats(op, 0, l_osd_sop_pull_lat);
}
-void ReplicatedPG::_committed_pushed_object(MOSDSubOp *op, epoch_t same_since, eversion_t last_complete)
+void ReplicatedPG::_committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t last_complete)
{
lock();
if (same_since == info.history.same_interval_since) {
/** op_push
* NOTE: called from opqueue.
*/
-void ReplicatedPG::sub_op_push(MOSDSubOp *op)
+void ReplicatedPG::sub_op_push(OpRequest *op)
{
- const hobject_t& soid = op->poid;
- eversion_t v = op->version;
- OSDOp& push = op->ops[0];
+ MOSDSubOp *m = (MOSDSubOp*)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+
+ const hobject_t& soid = m->poid;
+ eversion_t v = m->version;
+ OSDOp& push = m->ops[0];
dout(7) << "op_push "
<< soid
<< " v " << v
- << " " << op->oloc
+ << " " << m->oloc
<< " len " << push.op.extent.length
- << " data_subset " << op->data_subset
- << " clone_subsets " << op->clone_subsets
- << " data len " << op->get_data().length()
+ << " data_subset " << m->data_subset
+ << " clone_subsets " << m->clone_subsets
+ << " data len " << m->get_data().length()
<< dendl;
if (v == eversion_t()) {
map<hobject_t, interval_set<uint64_t> > clone_subsets;
bufferlist data;
- op->claim_data(data);
+ m->claim_data(data);
// we need these later, and they get clobbered by t.setattrs()
bufferlist oibl;
- if (op->attrset.count(OI_ATTR))
- oibl.push_back(op->attrset[OI_ATTR]);
+ if (m->attrset.count(OI_ATTR))
+ oibl.push_back(m->attrset[OI_ATTR]);
bufferlist ssbl;
- if (op->attrset.count(SS_ATTR))
- ssbl.push_back(op->attrset[SS_ATTR]);
+ if (m->attrset.count(SS_ATTR))
+ ssbl.push_back(m->attrset[SS_ATTR]);
// determine data/clone subsets
- data_subset = op->data_subset;
+ data_subset = m->data_subset;
if (data_subset.empty() && push.op.extent.length && push.op.extent.length == data.length())
data_subset.insert(0, push.op.extent.length);
- clone_subsets = op->clone_subsets;
+ clone_subsets = m->clone_subsets;
pull_info_t *pi = 0;
- bool first = op->first;
- bool complete = op->complete;
+ bool first = m->first;
+ bool complete = m->complete;
// op->complete == true means we reached the end of the object (file size)
// op->complete == false means nothing; we may not have asked for the whole thing.
// did we learn object size?
if (pi->need_size) {
- dout(10) << " learned object size is " << op->old_size << dendl;
- pi->data_subset.erase(op->old_size, (uint64_t)-1 - op->old_size);
+ dout(10) << " learned object size is " << m->old_size << dendl;
+ pi->data_subset.erase(m->old_size, (uint64_t)-1 - m->old_size);
pi->need_size = false;
}
complete = pi->data_subset.range_end() == data_subset.range_end();
}
- if (op->complete && !complete) {
+ if (m->complete && !complete) {
dout(0) << " uh oh, we reached EOF on peer before we got everything we wanted" << dendl;
_failed_push(op);
return;
} else {
// head|unversioned. for now, primary will _only_ pull data copies of the head (no cloning)
- assert(op->clone_subsets.empty());
+ assert(m->clone_subsets.empty());
}
}
dout(15) << " data_subset " << data_subset
if (data_subset.empty())
t->touch(coll, soid);
- t->setattrs(coll, soid, op->attrset);
+ t->setattrs(coll, soid, m->attrset);
if (soid.snap && soid.snap < CEPH_NOSNAP &&
- op->attrset.count(OI_ATTR)) {
+ m->attrset.count(OI_ATTR)) {
bufferlist bl;
- bl.push_back(op->attrset[OI_ATTR]);
+ bl.push_back(m->attrset[OI_ATTR]);
object_info_t oi(bl);
if (oi.snaps.size()) {
coll_t lc = make_snap_collection(*t, oi.snaps[0]);
// track ObjectContext
if (is_primary()) {
dout(10) << " setting up obc for " << soid << dendl;
- ObjectContext *obc = get_object_context(soid, op->oloc, true);
+ ObjectContext *obc = get_object_context(soid, m->oloc, true);
assert(obc->registered);
obc->ondisk_write_lock();
} else {
// ack if i'm a replica and being pushed to.
- MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- assert(entity_name_t::TYPE_OSD == op->get_connection()->peer_type);
- osd->cluster_messenger->send_message(reply, op->get_connection());
+ MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+ osd->cluster_messenger->send_message(reply, m->get_connection());
}
if (complete) {
op->put(); // at the end... soid is a ref to op->soid!
}
-void ReplicatedPG::_failed_push(MOSDSubOp *op)
+void ReplicatedPG::_failed_push(OpRequest *op)
{
- const hobject_t& soid = op->poid;
- int from = op->get_source().num();
+ MOSDSubOp *m = (MOSDSubOp*)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+ const hobject_t& soid = m->poid;
+ int from = m->get_source().num();
map<hobject_t,set<int> >::iterator p = missing_loc.find(soid);
if (p != missing_loc.end()) {
dout(0) << "_failed_push " << soid << " from osd." << from
op->put();
}
-void ReplicatedPG::sub_op_remove(MOSDSubOp *op)
+void ReplicatedPG::sub_op_remove(OpRequest *op)
{
- dout(7) << "sub_op_remove " << op->poid << dendl;
+ MOSDSubOp *m = (MOSDSubOp*)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+ dout(7) << "sub_op_remove " << m->poid << dendl;
ObjectStore::Transaction *t = new ObjectStore::Transaction;
- remove_object_with_snap_hardlinks(*t, op->poid);
+ remove_object_with_snap_hardlinks(*t, m->poid);
int r = osd->store->queue_transaction(&osr, t);
assert(r == 0);
{
// Wake anyone waiting for this object. Now that it's been marked as lost,
// we will just return an error code.
- map<hobject_t, list<class Message*> >::iterator wmo =
+ map<hobject_t, list<OpRequest*> >::iterator wmo =
waiting_for_missing_object.find(oid);
if (wmo != waiting_for_missing_object.end()) {
osd->requeue_ops(this, wmo->second);
void ReplicatedPG::apply_and_flush_repops(bool requeue)
{
- list<Message*> rq;
+ list<OpRequest*> rq;
// apply all repops
while (!repop_queue.empty()) {
repop->aborted = true;
if (requeue && repop->ctx->op) {
- dout(10) << " requeuing " << *repop->ctx->op << dendl;
+ dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
rq.push_back(repop->ctx->op);
repop->ctx->op = 0;
}
// take object waiters
requeue_object_waiters(waiting_for_missing_object);
- for (map<hobject_t,list<Message*> >::iterator p = waiting_for_degraded_object.begin();
+ for (map<hobject_t,list<OpRequest*> >::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
waiting_for_degraded_object.erase(p++)) {
osd->requeue_ops(this, p->second);
dout(10) << "on_role_change" << dendl;
// take commit waiters
- for (map<eversion_t, list<Message*> >::iterator p = waiting_for_ondisk.begin();
+ for (map<eversion_t, list<OpRequest*> >::iterator p = waiting_for_ondisk.begin();
p != waiting_for_ondisk.end();
p++)
osd->requeue_ops(this, p->second);