if (finished.empty()) {
finished_lock.Unlock();
} else {
- list<OpRequest*> waiting;
+ list<OpRequestRef> waiting;
waiting.splice(waiting.begin(), finished);
finished_lock.Unlock();
dout(2) << "do_waiters -- start" << dendl;
- for (list<OpRequest*>::iterator it = waiting.begin();
+ for (list<OpRequestRef>::iterator it = waiting.begin();
it != waiting.end();
it++)
dispatch_op(*it);
}
}
-void OSD::dispatch_op(OpRequest *op)
+void OSD::dispatch_op(OpRequestRef op)
{
switch (op->request->get_type()) {
default:
{
- OpRequestRef *op = new OpRequest(m, &op_tracker);
+ OpRequestRef op = op_tracker.create_request(m);
op->mark_event("waiting_for_osdmap");
// no map? starting up?
if (!osdmap) {
// =====================================================
// MAP
-void OSD::wait_for_new_map(OpRequest *op)
+void OSD::wait_for_new_map(OpRequestRef op)
{
// ask?
if (waiting_for_osdmap.empty()) {
op_wq.lock();
- list<OpRequest*> rq;
+ list<OpRequestRef> rq;
while (true) {
PG *pg = op_wq._dequeue();
if (!pg)
// thread did something very strange :/
assert(!pg->op_queue.empty());
- OpRequest *op = pg->op_queue.front();
+ OpRequestRef op = pg->op_queue.front();
pg->op_queue.pop_front();
pg->unlock();
pg->put();
}
// scan pgs with waiters
- map<pg_t, list<OpRequest*> >::iterator p = waiting_for_pg.begin();
+ map<pg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
while (p != waiting_for_pg.end()) {
pg_t pgid = p->first;
return true;
}
-bool OSD::require_osd_peer(OpRequest *op)
+bool OSD::require_osd_peer(OpRequestRef op)
{
if (!op->request->get_connection()->peer_is_osd()) {
dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr()
* require that we have same (or newer) map, and that
* the source is the pg primary.
*/
-bool OSD::require_same_or_newer_map(OpRequest *op, epoch_t epoch)
+bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
{
Message *m = op->request;
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
/*
* holding osd_lock
*/
-void OSD::handle_pg_create(OpRequest *op)
+void OSD::handle_pg_create(OpRequestRef op)
{
MOSDPGCreate *m = (MOSDPGCreate*)op->request;
assert(m->get_header().type == MSG_OSD_PG_CREATE);
* includes pg_info_t.
* NOTE: called with opqueue active.
*/
-void OSD::handle_pg_notify(OpRequest *op)
+void OSD::handle_pg_notify(OpRequestRef op)
{
MOSDPGNotify *m = (MOSDPGNotify*)op->request;
assert(m->get_header().type == MSG_OSD_PG_NOTIFY);
op->put();
}
-void OSD::handle_pg_log(OpRequest *op)
+void OSD::handle_pg_log(OpRequestRef op)
{
MOSDPGLog *m = (MOSDPGLog*) op->request;
assert(m->get_header().type == MSG_OSD_PG_LOG);
op->put();
}
-void OSD::handle_pg_info(OpRequest *op)
+void OSD::handle_pg_info(OpRequestRef op)
{
MOSDPGInfo *m = (MOSDPGInfo *)op->request;
assert(m->get_header().type == MSG_OSD_PG_INFO);
op->put();
}
-void OSD::handle_pg_trim(OpRequest *op)
+void OSD::handle_pg_trim(OpRequestRef op)
{
MOSDPGTrim *m = (MOSDPGTrim *)op->request;
assert(m->get_header().type == MSG_OSD_PG_TRIM);
op->put();
}
-void OSD::handle_pg_scan(OpRequest *op)
+void OSD::handle_pg_scan(OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan*)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
pg->put();
}
-bool OSD::scan_is_queueable(PG *pg, OpRequest *op)
+bool OSD::scan_is_queueable(PG *pg, OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan *)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
return true;
}
-void OSD::handle_pg_backfill(OpRequest *op)
+void OSD::handle_pg_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
pg->put();
}
-bool OSD::backfill_is_queueable(PG *pg, OpRequest *op)
+bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
-void OSD::handle_pg_missing(OpRequest *op)
+void OSD::handle_pg_missing(OpRequestRef op)
{
MOSDPGMissing *m = (MOSDPGMissing *)op->request;
assert(m->get_header().type == MSG_OSD_PG_MISSING);
* from primary to replica | stray
* NOTE: called with opqueue active.
*/
-void OSD::handle_pg_query(OpRequest *op)
+void OSD::handle_pg_query(OpRequestRef op)
{
assert(osd_lock.is_locked());
}
-void OSD::handle_pg_remove(OpRequest *op)
+void OSD::handle_pg_remove(OpRequestRef op)
{
MOSDPGRemove *m = (MOSDPGRemove *)op->request;
assert(m->get_header().type == MSG_OSD_PG_REMOVE);
// =========================================================
// OPS
-void OSD::reply_op_error(OpRequest *op, int err)
+void OSD::reply_op_error(OpRequestRef op, int err)
{
reply_op_error(op, err, eversion_t());
}
-void OSD::reply_op_error(OpRequest *op, int err, eversion_t v)
+void OSD::reply_op_error(OpRequestRef op, int err, eversion_t v)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
op->put();
}
-void OSD::handle_misdirected_op(PG *pg, OpRequest *op)
+void OSD::handle_misdirected_op(PG *pg, OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
reply_op_error(op, -ENXIO);
}
-void OSD::handle_op(OpRequest *op)
+void OSD::handle_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
return true;
}
-void OSD::handle_sub_op(OpRequest *op)
+void OSD::handle_sub_op(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
pg->put();
}
-void OSD::handle_sub_op_reply(OpRequest *op)
+void OSD::handle_sub_op_reply(OpRequestRef op)
{
MOSDSubOpReply *m = (MOSDSubOpReply*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOPREPLY);
*
* @return true if the op is queueable; false otherwise.
*/
-bool OSD::op_is_queueable(PG *pg, OpRequest *op)
+bool OSD::op_is_queueable(PG *pg, OpRequestRef op)
{
assert(pg->is_locked());
MOSDOp *m = (MOSDOp*)op->request;
/*
* discard operation, or return true. no side-effects.
*/
-bool OSD::subop_is_queueable(PG *pg, OpRequest *op)
+bool OSD::subop_is_queueable(PG *pg, OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
/*
* enqueue called with osd_lock held
*/
-void OSD::enqueue_op(PG *pg, OpRequest *op)
+void OSD::enqueue_op(PG *pg, OpRequestRef op)
{
dout(15) << *pg << " enqueue_op " << op->request << " "
<< *(op->request) << dendl;
* thread is currently chewing on so as not to violate ordering from
* the clients' perspective.
*/
-void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
+void OSD::requeue_ops(PG *pg, list<OpRequestRef>& ls)
{
dout(15) << *pg << " requeue_ops " << ls << dendl;
assert(pg->is_locked());
assert(&ls != &pg->op_queue);
// set current queue contents aside..
- list<OpRequest*> orig_queue;
+ list<OpRequestRef> orig_queue;
orig_queue.swap(pg->op_queue);
// grab whole list at once, in case methods we call below start adding things
// back on the list reference we were passed!
- list<OpRequest*> q;
+ list<OpRequestRef> q;
q.swap(ls);
// requeue old items, now at front.
while (!q.empty()) {
- OpRequest *op = q.front();
+ OpRequestRef op = q.front();
q.pop_front();
enqueue_op(pg, op);
}
*/
void OSD::dequeue_op(PG *pg)
{
- OpRequest *op = 0;
+ OpRequestRef op;
osd_lock.Lock();
{
#include "auth/KeyRing.h"
#include "messages/MOSDRepScrub.h"
+#include "OpRequest.h"
#include <map>
#include <memory>
void create_logger();
void tick();
void _dispatch(Message *m);
- void dispatch_op(OpRequest *op);
+ void dispatch_op(OpRequestRef op);
public:
ClassHandler *class_handler;
void update_osd_stat();
// -- waiters --
- list<OpRequest*> finished;
+ list<OpRequestRef> finished;
Mutex finished_lock;
- void take_waiters(list<class OpRequest*>& ls) {
+ void take_waiters(list<OpRequestRef>& ls) {
finished_lock.Lock();
finished.splice(finished.end(), ls);
finished_lock.Unlock();
}
- void take_waiter(OpRequest *op) {
+ void take_waiter(OpRequestRef op) {
finished_lock.Lock();
finished.push_back(op);
finished_lock.Unlock();
}
- void push_waiters(list<OpRequest*>& ls) {
+ void push_waiters(list<OpRequestRef>& ls) {
assert(osd_lock.is_locked()); // currently, at least. be careful if we change this (see #743)
finished_lock.Lock();
finished.splice(finished.begin(), ls);
}
} op_wq;
- void enqueue_op(PG *pg, OpRequest *op);
- void requeue_ops(PG *pg, list<OpRequest*>& ls);
+ void enqueue_op(PG *pg, OpRequestRef op);
+ void requeue_ops(PG *pg, list<OpRequestRef>& ls);
void dequeue_op(PG *pg);
static void static_dequeueop(OSD *o, PG *pg) {
o->dequeue_op(pg);
OSDMapRef osdmap;
utime_t had_map_since;
RWLock map_lock;
- list<OpRequest*> waiting_for_osdmap;
+ list<OpRequestRef> waiting_for_osdmap;
Mutex peer_map_epoch_lock;
map<int, epoch_t> peer_map_epoch;
Session *session = 0);
void _share_map_outgoing(const entity_inst_t& inst);
- void wait_for_new_map(OpRequest *op);
+ void wait_for_new_map(OpRequestRef op);
void handle_osd_map(class MOSDMap *m);
void note_down_osd(int osd);
void note_up_osd(int osd);
// -- placement groups --
map<int, PGPool*> pool_map;
hash_map<pg_t, PG*> pg_map;
- map<pg_t, list<OpRequest*> > waiting_for_pg;
+ map<pg_t, list<OpRequestRef> > waiting_for_pg;
PGRecoveryStats pg_recovery_stats;
PGPool *_get_pool(int id);
}
}
void wake_all_pg_waiters() {
- for (map<pg_t, list<OpRequest*> >::iterator p = waiting_for_pg.begin();
+ for (map<pg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
p != waiting_for_pg.end();
p++)
take_waiters(p->second);
hash_map<pg_t, create_pg_info> creating_pgs;
bool can_create_pg(pg_t pgid);
- void handle_pg_create(OpRequest *op);
+ void handle_pg_create(OpRequestRef op);
void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin);
void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
void repeer(PG *pg, map< int, map<pg_t,pg_query_t> >& query_map);
bool require_mon_peer(Message *m);
- bool require_osd_peer(OpRequest *op);
+ bool require_osd_peer(OpRequestRef op);
- bool require_same_or_newer_map(OpRequest *op, epoch_t e);
+ bool require_same_or_newer_map(OpRequestRef op, epoch_t e);
- void handle_pg_query(OpRequest *op);
- void handle_pg_missing(OpRequest *op);
- void handle_pg_notify(OpRequest *op);
- void handle_pg_log(OpRequest *op);
- void handle_pg_info(OpRequest *op);
- void handle_pg_trim(OpRequest *op);
+ void handle_pg_query(OpRequestRef op);
+ void handle_pg_missing(OpRequestRef op);
+ void handle_pg_notify(OpRequestRef op);
+ void handle_pg_log(OpRequestRef op);
+ void handle_pg_info(OpRequestRef op);
+ void handle_pg_trim(OpRequestRef op);
- void handle_pg_scan(OpRequest *op);
- bool scan_is_queueable(PG *pg, OpRequest *op);
+ void handle_pg_scan(OpRequestRef op);
+ bool scan_is_queueable(PG *pg, OpRequestRef op);
- void handle_pg_backfill(OpRequest *op);
- bool backfill_is_queueable(PG *pg, OpRequest *op);
+ void handle_pg_backfill(OpRequestRef op);
+ bool backfill_is_queueable(PG *pg, OpRequestRef op);
- void handle_pg_remove(OpRequest *op);
+ void handle_pg_remove(OpRequestRef op);
void queue_pg_for_deletion(PG *pg);
void _remove_pg(PG *pg);
void handle_signal(int signum);
- void reply_op_error(OpRequest *op, int r);
- void reply_op_error(OpRequest *op, int r, eversion_t v);
- void handle_misdirected_op(PG *pg, OpRequest *op);
+ void reply_op_error(OpRequestRef op, int r);
+ void reply_op_error(OpRequestRef op, int r, eversion_t v);
+ void handle_misdirected_op(PG *pg, OpRequestRef op);
void handle_rep_scrub(MOSDRepScrub *m);
void handle_scrub(class MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
- void handle_op(OpRequest *op);
- void handle_sub_op(OpRequest *op);
- void handle_sub_op_reply(OpRequest *op);
+ void handle_op(OpRequestRef op);
+ void handle_sub_op(OpRequestRef op);
+ void handle_sub_op_reply(OpRequestRef op);
private:
/// check if we can throw out op from a disconnected client
/// check if op has sufficient caps
bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
/// check if op should be (re)queued for processing
- bool op_is_queueable(PG *pg, OpRequest *op);
+ bool op_is_queueable(PG *pg, OpRequestRef op);
/// check if subop should be (re)queued for processing
- bool subop_is_queueable(PG *pg, OpRequest *op);
+ bool subop_is_queueable(PG *pg, OpRequestRef op);
public:
void force_remount();
static const uint8_t flag_started = 1 << 3;
static const uint8_t flag_sub_op_sent = 1 << 4;
-public:
OpRequest(Message *req, OpTracker *tracker) :
request(req), xitem(this),
warn_interval_multiplier(1),
received_time = request->get_recv_stamp();
tracker->register_inflight_op(&xitem);
}
+public:
~OpRequest() {
assert(request);
request->put();
map<hobject_t, set<int> >::iterator ml = missing_loc.find(soid);
if (ml == missing_loc.end()) {
- map<hobject_t, list<class OpRequest*> >::iterator wmo =
+ map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(soid);
if (wmo != waiting_for_missing_object.end()) {
osd->requeue_ops(this, wmo->second);
}
}
-void PG::do_request(OpRequest *op)
+void PG::do_request(OpRequestRef op)
{
// do any pending flush
do_pending_flush();
{
assert(is_replay() && is_active());
eversion_t c = info.last_update;
- list<OpRequest*> replay;
+ list<OpRequestRef> replay;
dout(10) << "replay_queued_ops" << dendl;
state_clear(PG_STATE_REPLAY);
- for (map<eversion_t,OpRequest*>::iterator p = replay_queue.begin();
+ for (map<eversion_t,OpRequestRef>::iterator p = replay_queue.begin();
p != replay_queue.end();
p++) {
if (p->first.version != c.version+1) {
}
}
-void PG::requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m)
+void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m)
{
- for (map<hobject_t, list<OpRequest*> >::iterator it = m.begin();
+ for (map<hobject_t, list<OpRequestRef> >::iterator it = m.begin();
it != m.end();
it++)
osd->requeue_ops(this, it->second);
}
-void PG::sub_op_scrub_map(OpRequest *op)
+void PG::sub_op_scrub_map(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
get_osdmap()->get_cluster_inst(replica));
}
-void PG::sub_op_scrub_reserve(OpRequest *op)
+void PG::sub_op_scrub_reserve(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
op->put();
}
-void PG::sub_op_scrub_reserve_reply(OpRequest *op)
+void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
op->put();
}
-void PG::sub_op_scrub_unreserve(OpRequest *op)
+void PG::sub_op_scrub_unreserve(OpRequestRef op)
{
assert(op->request->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_unreserve" << dendl;
op->put();
}
-void PG::sub_op_scrub_stop(OpRequest *op)
+void PG::sub_op_scrub_stop(OpRequestRef op)
{
op->mark_started();
clear_stats();
// take replay queue waiters
- list<OpRequest*> ls;
- for (map<eversion_t,OpRequest*>::iterator it = replay_queue.begin();
+ list<OpRequestRef> ls;
+ for (map<eversion_t,OpRequestRef>::iterator it = replay_queue.begin();
it != replay_queue.end();
it++)
ls.push_back(it->second);
#include "include/xlist.h"
#include "include/atomic.h"
+#include "OpRequest.h"
#include "OSDMap.h"
#include "os/ObjectStore.h"
#include "msg/Messenger.h"
class OSD;
-class OpRequest;
class MOSDOp;
class MOSDSubOp;
class MOSDSubOpReply;
}
- list<OpRequest*> op_queue; // op queue
+ list<OpRequestRef> op_queue; // op queue
bool dirty_info, dirty_log;
// pg waiters
- list<OpRequest*> waiting_for_active;
- list<OpRequest*> waiting_for_all_missing;
- map<hobject_t, list<OpRequest*> > waiting_for_missing_object,
+ list<OpRequestRef> waiting_for_active;
+ list<OpRequestRef> waiting_for_all_missing;
+ map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
waiting_for_degraded_object;
- map<eversion_t,list<OpRequest*> > waiting_for_ondisk;
- map<eversion_t,OpRequest*> replay_queue;
+ map<eversion_t,list<OpRequestRef> > waiting_for_ondisk;
+ map<eversion_t,OpRequestRef> replay_queue;
- void requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m);
+ void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
// stats
Mutex pg_stats_lock;
bool sched_scrub();
void replica_scrub(class MOSDRepScrub *op);
- void sub_op_scrub_map(OpRequest *op);
- void sub_op_scrub_reserve(OpRequest *op);
- void sub_op_scrub_reserve_reply(OpRequest *op);
- void sub_op_scrub_unreserve(OpRequest *op);
- void sub_op_scrub_stop(OpRequest *op);
+ void sub_op_scrub_map(OpRequestRef op);
+ void sub_op_scrub_reserve(OpRequestRef op);
+ void sub_op_scrub_reserve_reply(OpRequestRef op);
+ void sub_op_scrub_unreserve(OpRequestRef op);
+ void sub_op_scrub_stop(OpRequestRef op);
// -- recovery state --
// abstract bits
- void do_request(OpRequest *op);
+ void do_request(OpRequestRef op);
- virtual void do_op(OpRequest *op) = 0;
- virtual void do_sub_op(OpRequest *op) = 0;
- virtual void do_sub_op_reply(OpRequest *op) = 0;
- virtual void do_scan(OpRequest *op) = 0;
- virtual void do_backfill(OpRequest *op) = 0;
+ virtual void do_op(OpRequestRef op) = 0;
+ virtual void do_sub_op(OpRequestRef op) = 0;
+ virtual void do_sub_op_reply(OpRequestRef op) = 0;
+ virtual void do_scan(OpRequestRef op) = 0;
+ virtual void do_backfill(OpRequestRef op) = 0;
virtual bool snap_trimmer() = 0;
virtual int do_command(vector<string>& cmd, ostream& ss,
return missing.missing.count(soid);
}
-void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
+void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op)
{
assert(is_missing_object(soid));
op->mark_delayed();
}
-void ReplicatedPG::wait_for_all_missing(OpRequest *op)
+void ReplicatedPG::wait_for_all_missing(OpRequestRef op)
{
waiting_for_all_missing.push_back(op);
}
return false;
}
-void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op)
+void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef op)
{
assert(is_degraded_object(soid));
return false;
}
-void ReplicatedPG::do_pg_op(OpRequest *op)
+void ReplicatedPG::do_pg_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp *)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
*/
-void ReplicatedPG::do_op(OpRequest *op)
+void ReplicatedPG::do_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
<< " lat " << latency << dendl;
}
-void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
+void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
{
utime_t now = ceph_clock_now(g_ceph_context);
utime_t latency = now;
-void ReplicatedPG::do_sub_op(OpRequest *op)
+void ReplicatedPG::do_sub_op(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
sub_op_modify(op);
}
-void ReplicatedPG::do_sub_op_reply(OpRequest *op)
+void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
{
MOSDSubOpReply *r = (MOSDSubOpReply *)op->request;
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
sub_op_modify_reply(op);
}
-void ReplicatedPG::do_scan(OpRequest *op)
+void ReplicatedPG::do_scan(OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan*)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
op->put();
}
-void ReplicatedPG::do_backfill(OpRequest *op)
+void ReplicatedPG::do_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
- OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, ssc, this);
+ OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = get_osdmap()->get_epoch();
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
- OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
+ OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
+ &obc->obs, obc->ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = get_osdmap()->get_epoch();
// sub op modify
-void ReplicatedPG::sub_op_modify(OpRequest *op)
+void ReplicatedPG::sub_op_modify(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
}
}
-void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
+void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
{
MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
return new_info;
}
-void ReplicatedPG::handle_pull_response(OpRequest *op)
+void ReplicatedPG::handle_pull_response(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
bufferlist data;
}
}
-void ReplicatedPG::handle_push(OpRequest *op)
+void ReplicatedPG::handle_push(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
dout(10) << "handle_push "
osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
}
-void ReplicatedPG::sub_op_push_reply(OpRequest *op)
+void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
* process request to pull an entire object.
* NOTE: called from opqueue.
*/
-void ReplicatedPG::sub_op_pull(OpRequest *op)
+void ReplicatedPG::sub_op_pull(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
}
-void ReplicatedPG::_committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t last_complete)
+void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t last_complete)
{
lock();
if (same_since == info.history.same_interval_since) {
/** op_push
* NOTE: called from opqueue.
*/
-void ReplicatedPG::sub_op_push(OpRequest *op)
+void ReplicatedPG::sub_op_push(OpRequestRef op)
{
op->mark_started();
if (is_primary()) {
return;
}
-void ReplicatedPG::_failed_push(OpRequest *op)
+void ReplicatedPG::_failed_push(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
op->put();
}
-void ReplicatedPG::sub_op_remove(OpRequest *op)
+void ReplicatedPG::sub_op_remove(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
{
// Wake anyone waiting for this object. Now that it's been marked as lost,
// we will just return an error code.
- map<hobject_t, list<OpRequest*> >::iterator wmo =
+ map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(oid);
if (wmo != waiting_for_missing_object.end()) {
osd->requeue_ops(this, wmo->second);
void ReplicatedPG::apply_and_flush_repops(bool requeue)
{
- list<OpRequest*> rq;
+ list<OpRequestRef> rq;
// apply all repops
while (!repop_queue.empty()) {
if (requeue && repop->ctx->op) {
dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
rq.push_back(repop->ctx->op);
- repop->ctx->op = 0;
+ repop->ctx->op = OpRequestRef();
}
remove_repop(repop);
// take object waiters
requeue_object_waiters(waiting_for_missing_object);
- for (map<hobject_t,list<OpRequest*> >::iterator p = waiting_for_degraded_object.begin();
+ for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
waiting_for_degraded_object.erase(p++)) {
osd->requeue_ops(this, p->second);
dout(10) << "on_role_change" << dendl;
// take commit waiters
- for (map<eversion_t, list<OpRequest*> >::iterator p = waiting_for_ondisk.begin();
+ for (map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.begin();
p != waiting_for_ondisk.end();
p++)
osd->requeue_ops(this, p->second);
osd->store->queue_transaction(&osr, t,
new C_OSD_AppliedRecoveredObject(this, t, obc),
- new C_OSD_CommittedPushedObject(this, NULL,
+ new C_OSD_CommittedPushedObject(this, OpRequestRef(),
info.history.same_interval_since,
info.last_complete),
new C_OSD_OndiskWriteUnlock(obc));
}
state_t state;
int num_wr;
- list<OpRequest*> waiting;
+ list<OpRequestRef> waiting;
list<Cond*> waiting_cond;
bool wake;
* Capture all object state associated with an in-progress read or write.
*/
struct OpContext {
- OpRequest *op;
+ OpRequestRef op;
osd_reqid_t reqid;
vector<OSDOp>& ops;
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
- OpContext(OpRequest *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
+ OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
ObjectState *_obs, SnapSetContext *_ssc,
ReplicatedPG *_pg) :
op(_op), reqid(_reqid), ops(_ops), obs(_obs),
bufferlist data_received,
interval_set<uint64_t> *intervals_usable,
bufferlist *data_usable);
- void handle_pull_response(OpRequest *op);
- void handle_push(OpRequest *op);
+ void handle_pull_response(OpRequestRef op);
+ void handle_push(OpRequestRef op);
int send_push(int peer,
ObjectRecoveryInfo recovery_info,
ObjectRecoveryProgress progress,
struct RepModify {
ReplicatedPG *pg;
- OpRequest *op;
+ OpRequestRef op;
OpContext *ctx;
bool applied, committed;
int ackerosd;
ObjectStore::Transaction opt, localt;
list<ObjectStore::Transaction*> tls;
- RepModify() : pg(NULL), op(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
+ RepModify() : pg(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
bytes_written(0) {}
};
};
struct C_OSD_CommittedPushedObject : public Context {
ReplicatedPG *pg;
- OpRequest *op;
+ OpRequestRef op;
epoch_t same_since;
eversion_t last_complete;
- C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequest *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
+ C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequestRef o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
if (op)
op->get();
pg->get();
}
};
- void sub_op_remove(OpRequest *op);
+ void sub_op_remove(OpRequestRef op);
- void sub_op_modify(OpRequest *op);
+ void sub_op_modify(OpRequestRef op);
void sub_op_modify_applied(RepModify *rm);
void sub_op_modify_commit(RepModify *rm);
- void sub_op_modify_reply(OpRequest *op);
+ void sub_op_modify_reply(OpRequestRef op);
void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
- void _committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t lc);
+ void _committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t lc);
void recover_got(hobject_t oid, eversion_t v);
- void sub_op_push(OpRequest *op);
- void _failed_push(OpRequest *op);
- void sub_op_push_reply(OpRequest *op);
- void sub_op_pull(OpRequest *op);
+ void sub_op_push(OpRequestRef op);
+ void _failed_push(OpRequestRef op);
+ void sub_op_push_reply(OpRequestRef op);
+ void sub_op_pull(OpRequestRef op);
- void log_subop_stats(OpRequest *op, int tag_inb, int tag_lat);
+ void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
// -- scrub --
int do_command(vector<string>& cmd, ostream& ss, bufferlist& idata, bufferlist& odata);
- void do_op(OpRequest *op);
+ void do_op(OpRequestRef op);
bool pg_op_must_wait(MOSDOp *op);
- void do_pg_op(OpRequest *op);
- void do_sub_op(OpRequest *op);
- void do_sub_op_reply(OpRequest *op);
- void do_scan(OpRequest *op);
- void do_backfill(OpRequest *op);
+ void do_pg_op(OpRequestRef op);
+ void do_sub_op(OpRequestRef op);
+ void do_sub_op_reply(OpRequestRef op);
+ void do_scan(OpRequestRef op);
+ void do_backfill(OpRequestRef op);
bool get_obs_to_trim(snapid_t &snap_to_trim,
coll_t &col_to_trim,
vector<hobject_t> &obs_to_trim);
bool same_for_rep_modify_since(epoch_t e);
bool is_missing_object(const hobject_t& oid);
- void wait_for_missing_object(const hobject_t& oid, OpRequest *op);
- void wait_for_all_missing(OpRequest *op);
+ void wait_for_missing_object(const hobject_t& oid, OpRequestRef op);
+ void wait_for_all_missing(OpRequestRef op);
bool is_degraded_object(const hobject_t& oid);
- void wait_for_degraded_object(const hobject_t& oid, OpRequest *op);
+ void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);