OpRequestRef op;
{
Mutex::Locker l(qlock);
- assert(pg_for_processing.count(&*pg));
+ if (!pg_for_processing.count(&*pg)) {
+ pg->unlock();
+ return;
+ }
assert(pg_for_processing[&*pg].size());
op = pg_for_processing[&*pg].front();
pg_for_processing[&*pg].pop_front();
pg->unlock();
}
+
+void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+{
+ osd->op_wq.dequeue(pg, dequeued);
+}
+
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
ClassHandler *&class_handler;
+ void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
+
// -- superblock --
Mutex publish_lock, pre_publish_lock;
OSDSuperblock superblock;
return op.first == pg;
}
};
- void dequeue(PG *pg) {
+ void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
lock();
- pqueue.remove_by_filter(Pred(pg));
+ if (!dequeued) {
+ pqueue.remove_by_filter(Pred(pg));
+ pg_for_processing.erase(pg);
+ } else {
+ list<pair<PGRef, OpRequestRef> > _dequeued;
+ pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+ i != _dequeued.end();
+ ++i) {
+ dequeued->push_back(i->second);
+ }
+ if (pg_for_processing.count(pg)) {
+ dequeued->splice(
+ dequeued->begin(),
+ pg_for_processing[pg]);
+ pg_for_processing.erase(pg);
+ }
+ }
unlock();
}
bool _empty() {
index();
}
+static void split_list(
+ list<OpRequestRef> *from,
+ list<OpRequestRef> *to,
+ unsigned match,
+ unsigned bits)
+{
+ for (list<OpRequestRef>::iterator i = from->begin();
+ i != from->end();
+ ) {
+ if (PG::split_request(*i, match, bits)) {
+ to->push_back(*i);
+ from->erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+static void split_replay_queue(
+ map<eversion_t, OpRequestRef> *from,
+ map<eversion_t, OpRequestRef> *to,
+ unsigned match,
+ unsigned bits)
+{
+ for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
+ i != from->end();
+ ) {
+ if (PG::split_request(i->second, match, bits)) {
+ to->insert(*i);
+ from->erase(i++);
+ } else {
+ ++i;
+ }
+ }
+}
+
+void PG::split_ops(PG *child, unsigned split_bits) {
+ unsigned match = child->info.pgid.m_seed;
+ assert(waiting_for_map.empty());
+ assert(waiting_for_all_missing.empty());
+ assert(waiting_for_missing_object.empty());
+ assert(waiting_for_degraded_object.empty());
+ assert(waiting_for_ack.empty());
+ assert(waiting_for_ondisk.empty());
+ split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
+
+ osd->dequeue_pg(this, &waiting_for_active);
+ split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits);
+}
+
void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
{
child->osdmap_ref = osdmap_ref;
// History
child->past_intervals = past_intervals;
+
+ split_ops(child, split_bits);
+ _split_into(child_pgid, child, split_bits);
}
void PG::defer_recovery()
return true;
}
+bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
+{
+ unsigned mask = ~((~0)<<bits);
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
+ case MSG_OSD_SUBOP:
+ return false;
+ case MSG_OSD_SUBOPREPLY:
+ return false;
+ case MSG_OSD_PG_SCAN:
+ return false;
+ case MSG_OSD_PG_BACKFILL:
+ return false;
+ }
+ return false;
+}
+
bool PG::must_delay_request(OpRequestRef op)
{
switch (op->request->get_type()) {
waiting_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
+ void split_ops(PG *child, unsigned split_bits);
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
void requeue_ops(list<OpRequestRef> &l);
void finish_recovery_op(const hobject_t& soid, bool dequeue=false);
void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
loff_t get_log_write_pos() {
return 0;
bool must_delay_request(OpRequestRef op);
+ static bool split_request(OpRequestRef op, unsigned match, unsigned bits);
+
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
unlock();
}
+void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)
+{
+ assert(repop_queue.empty());
+}
/*
* pg status change notification
virtual void _scrub_finish();
object_stat_collection_t scrub_cstat;
+ virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits);
void apply_and_flush_repops(bool requeue);
void calc_trim_to();