finished_lock("OSD::finished_lock"),
admin_ops_hook(NULL),
historic_ops_hook(NULL),
- op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200),
map_lock("OSD::map_lock"),
void OSD::enqueue_op(PG *pg, OpRequestRef op)
{
dout(15) << "enqueue_op " << op << " " << *(op->request) << dendl;
- pg->queue_op(op);
+ op_wq.queue(make_pair(PGRef(pg), op));
}
-bool OSD::OpWQ::_enqueue(PG *pg)
+void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
{
- pg->get();
- osd->op_queue.push_back(pg);
- osd->op_queue_len++;
- osd->logger->set(l_osd_opq, osd->op_queue_len);
- return true;
+ pqueue.enqueue(item.second->request->get_source_inst(),
+ 1, 1, item);
+ osd->logger->set(l_osd_opq, pqueue.length());
}
-PG *OSD::OpWQ::_dequeue()
+void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
{
- if (osd->op_queue.empty())
- return NULL;
- PG *pg = osd->op_queue.front();
- osd->op_queue.pop_front();
- osd->op_queue_len--;
- osd->logger->set(l_osd_opq, osd->op_queue_len);
+ {
+ Mutex::Locker l(qlock);
+ if (pg_for_processing.count(&*(item.first))) {
+ pg_for_processing[&*(item.first)].push_front(item.second);
+ item.second = pg_for_processing[&*(item.first)].back();
+ pg_for_processing[&*(item.first)].pop_back();
+ }
+ }
+ pqueue.enqueue_front(item.second->request->get_source_inst(),
+ 1, 1, item);
+ osd->logger->set(l_osd_opq, pqueue.length());
+}
+
+PGRef OSD::OpWQ::_dequeue()
+{
+ assert(!pqueue.empty());
+ PGRef pg;
+ {
+ Mutex::Locker l(qlock);
+ pair<PGRef, OpRequestRef> ret = pqueue.dequeue();
+ pg = ret.first;
+ pg_for_processing[&*pg].push_back(ret.second);
+ }
+ osd->logger->set(l_osd_opq, pqueue.length());
return pg;
}
-void OSDService::queue_for_peering(PG *pg)
+void OSD::OpWQ::_process(PGRef pg)
{
- peering_wq.queue(pg);
+ pg->lock();
+ OpRequestRef op;
+ {
+ Mutex::Locker l(qlock);
+ assert(pg_for_processing.count(&*pg));
+ assert(pg_for_processing[&*pg].size());
+ op = pg_for_processing[&*pg].front();
+ pg_for_processing[&*pg].pop_front();
+ if (!(pg_for_processing[&*pg].size()))
+ pg_for_processing.erase(&*pg);
+ }
+ osd->dequeue_op(pg, op);
+ pg->unlock();
+}
+
+/*
+ * NOTE: dequeue called in worker thread, with pg lock
+ */
+void OSD::dequeue_op(PGRef pg, OpRequestRef op)
+{
+ dout(10) << "dequeue_op " << op << " " << *(op->request)
+ << " pg " << *pg << dendl;
+ if (pg->deleting)
+ return;
+
+ op->mark_reached_pg();
+
+ pg->do_request(op);
+
+ // finish
+ dout(10) << "dequeue_op " << op << " finish" << dendl;
}
-void OSDService::queue_for_op(PG *pg)
+
+void OSDService::queue_for_peering(PG *pg)
{
- op_wq.queue(pg);
+ peering_wq.queue(pg);
}
void OSD::process_peering_events(const list<PG*> &pgs)
service.send_pg_temp();
}
-/*
- * NOTE: dequeue called in worker thread, without osd_lock
- */
-void OSD::dequeue_op(PG *pg)
-{
- OpRequestRef op;
-
- pg->lock();
- if (pg->deleting) {
- pg->unlock();
- pg->put();
- return;
- }
-
- pg->lockq();
- assert(!pg->op_queue.empty());
- op = pg->op_queue.front();
- pg->op_queue.pop_front();
- pg->unlockq();
-
- dout(10) << "dequeue_op " << op << " " << *op->request << " pg " << *pg << dendl;
-
- op->mark_reached_pg();
-
- pg->do_request(op);
-
- // unlock and put pg
- pg->unlock();
- pg->put();
-
- //#warning foo
- //scrub_wq.queue(pg);
-
- // finish
- dout(10) << "dequeue_op " << op << " finish" << dendl;
-}
-
-
// --------------------------------
int OSD::init_op_flags(MOSDOp *op)
#include "common/shared_cache.hpp"
#include "common/simple_cache.hpp"
#include "common/sharedptr_registry.hpp"
+#include "common/PrioritizedQueue.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
Messenger *&client_messenger;
PerfCounters *&logger;
MonClient *&monc;
- ThreadPool::WorkQueue<PG> &op_wq;
+ ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef> &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
ThreadPool::WorkQueue<PG> &recovery_wq;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
void send_pg_temp();
void queue_for_peering(PG *pg);
- void queue_for_op(PG *pg);
bool queue_for_recovery(PG *pg);
bool queue_for_snap_trim(PG *pg) {
return snap_trim_wq.queue(pg);
HistoricOpsSocketHook *historic_ops_hook;
// -- op queue --
- list<PG*> op_queue;
- int op_queue_len;
- struct OpWQ : public ThreadPool::WorkQueue<PG> {
+ struct OpWQ: public ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>,
+ PGRef > {
+ Mutex qlock;
+ map<PG*, list<OpRequestRef> > pg_for_processing;
OSD *osd;
+ PrioritizedQueue<pair<PGRef, OpRequestRef>, entity_inst_t > pqueue;
OpWQ(OSD *o, time_t ti, ThreadPool *tp)
- : ThreadPool::WorkQueue<PG>("OSD::OpWQ", ti, ti*10, tp), osd(o) {}
-
- bool _enqueue(PG *pg);
- void _dequeue(PG *pg) {
- for (list<PG*>::iterator i = osd->op_queue.begin();
- i != osd->op_queue.end();
- ) {
- if (*i == pg) {
- osd->op_queue.erase(i++);
- pg->put();
- } else {
- ++i;
- }
+ : ThreadPool::WorkQueueVal<pair<PGRef, OpRequestRef>, PGRef >(
+ "OSD::OpWQ", ti, ti*10, tp),
+ qlock("OpWQ::qlock"),
+ osd(o) {}
+
+ void _enqueue_front(pair<PGRef, OpRequestRef> item);
+ void _enqueue(pair<PGRef, OpRequestRef> item);
+ PGRef _dequeue();
+
+ struct Pred {
+ PG *pg;
+ Pred(PG *pg) : pg(pg) {}
+ bool operator()(const pair<PGRef, OpRequestRef> &op) {
+ return op.first == pg;
}
+ };
+ void dequeue(PG *pg) {
+ lock();
+ pqueue.remove_by_filter(Pred(pg));
+ unlock();
}
bool _empty() {
- return osd->op_queue.empty();
- }
- PG *_dequeue();
- void _process(PG *pg) {
- osd->dequeue_op(pg);
- }
- void _clear() {
- assert(osd->op_queue.empty());
+ return pqueue.empty();
}
+ void _process(PGRef pg);
} op_wq;
void enqueue_op(PG *pg, OpRequestRef op);
- void dequeue_op(PG *pg);
- static void static_dequeueop(OSD *o, PG *pg) {
- o->dequeue_op(pg);
- };
+ void dequeue_op(PGRef pg, OpRequestRef op);
// -- peering queue --
struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
const hobject_t& ioid) :
osd(o), osdmap_ref(curmap), pool(_pool),
_lock("PG::_lock"),
- _qlock("PG::_qlock"),
ref(0), deleting(false), dirty_info(false), dirty_log(false),
info(p), coll(p), log_oid(loid), biginfo_oid(ioid),
recovery_item(this), scrub_item(this), scrub_finalize_item(this), snap_trim_item(this), stat_queue_item(this),
void PG::requeue_ops(list<OpRequestRef> &ls)
{
dout(15) << " requeue_ops " << ls << dendl;
- lockq();
- assert(&ls != &op_queue);
- size_t requeue_size = ls.size();
- op_queue.splice(op_queue.begin(), ls, ls.begin(), ls.end());
- for (size_t i = 0; i < requeue_size; ++i) osd->queue_for_op(this);
- unlockq();
+ for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
+ i != ls.rend();
+ ++i) {
+ osd->op_wq.queue_front(make_pair(PGRef(this), *i));
+ }
+ ls.clear();
}
return false;
}
-void PG::queue_op(OpRequestRef op)
-{
- _qlock.Lock();
- op_queue.push_back(op);
- osd->queue_for_op(this);
- _qlock.Unlock();
-}
-
void PG::take_waiters()
{
dout(10) << "take_waiters" << dendl;
* put_unlock() when done with the current pointer (_most common_).
*/
Mutex _lock;
- Mutex _qlock;
Cond _cond;
atomic_t ref;
bool deleting; // true while RemoveWQ should be chewing on us
void lock(bool no_lockdep = false);
- void lockq(bool no_lockdep = false) {
- _qlock.Lock(no_lockdep);
- }
void unlock() {
//generic_dout(0) << this << " " << info.pgid << " unlock" << dendl;
assert(!dirty_info);
assert(!dirty_log);
_lock.Unlock();
}
- void unlockq() {
- _qlock.Unlock();
- }
/* During handle_osd_map, the osd holds a write lock to the osdmap.
* *_with_map_lock_held assume that the map_lock is already held */
}
- list<OpRequestRef> op_queue; // op queue
-
bool dirty_info, dirty_log;
public:
bool can_discard_request(OpRequestRef op);
bool must_delay_request(OpRequestRef op);
- void queue_op(OpRequestRef op);
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
void intrusive_ptr_add_ref(PG *pg);
void intrusive_ptr_release(PG *pg);
+typedef boost::intrusive_ptr<PG> PGRef;
+
#endif