#include "messages/MPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDBackoff.h"
#include "messages/MOSDMap.h"
#include "messages/MPoolOp.h"
handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
return true;
+ case CEPH_MSG_OSD_BACKOFF:
+ handle_osd_backoff(static_cast<MOSDBackoff*>(m));
+ return true;
+
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(static_cast<MWatchNotify*>(m));
m->put();
{
// rwlock is locked unique
+ // clear backoffs
+ session->backoffs.clear();
+ session->backoffs_by_id.clear();
+
// resend ops
map<ceph_tid_t,Op*> resend; // resend in tid order
for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
// rwlock is locked
// op->session->lock is locked
+ // backoff?
+ hobject_t hoid = op->target.get_hobj();
+ auto q = op->session->backoffs.lower_bound(hoid);
+ if (q != op->session->backoffs.begin()) {
+ --q;
+ if (cmp_bitwise(hoid, q->second.end) >= 0) {
+ ++q;
+ }
+ }
+ if (q != op->session->backoffs.end()) {
+ ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
+ << "," << q->second.end << ")" << dendl;
+ int r = cmp_bitwise(hoid, q->second.begin);
+ if (r == 0 || (r > 0 && cmp_bitwise(hoid, q->second.end) < 0)) {
+ ldout(cct, 10) << __func__ << " backoff on " << hoid << ", queuing "
+ << op << " tid " << op->tid << dendl;
+ return;
+ }
+ }
+
if (!m) {
assert(op->tid > 0);
m = _prepare_osd_op(op);
put_session(s);
}
+void Objecter::handle_osd_backoff(MOSDBackoff *m)
+{
+ ldout(cct, 10) << __func__ << " " << *m << dendl;
+ shunique_lock sul(rwlock, ceph::acquire_shared);
+ if (!initialized.read()) {
+ m->put();
+ return;
+ }
+
+ ConnectionRef con = m->get_connection();
+ OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ if (!s || s->con != con) {
+ ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
+ m->put();
+ return;
+ }
+
+ get_session(s);
+ s->put(); // from get_priv() above
+
+ OSDSession::unique_lock sl(s->lock);
+
+ switch (m->op) {
+ case CEPH_OSD_BACKOFF_OP_BLOCK:
+ {
+ // register
+ OSDBackoff& b = s->backoffs[m->begin];
+ s->backoffs_by_id.insert(make_pair(m->id, &b));
+ b.id = m->id;
+ b.begin = m->begin;
+ b.end = m->end;
+
+ // ack
+ Message *r = new MOSDBackoff(CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
+ m->id, m->begin, m->end,
+ osdmap->get_epoch());
+ // this priority must match the MOSDOps from _prepare_osd_op
+ r->set_priority(cct->_conf->osd_client_op_priority);
+ con->send_message(r);
+ }
+ break;
+
+ case CEPH_OSD_BACKOFF_OP_UNBLOCK:
+ {
+ auto p = s->backoffs_by_id.find(m->id);
+ while (p != s->backoffs_by_id.end() &&
+ p->second->id == m->id) {
+ OSDBackoff *b = p->second;
+ if (b->begin != m->begin &&
+ b->end != m->end) {
+ lderr(cct) << __func__ << " got id " << m->id << " unblock on ["
+ << m->begin << "," << m->end << ") but backoff is ["
+ << b->begin << "," << b->end << ")" << dendl;
+ // hrmpf, unblock it anyway.
+ }
+ ldout(cct, 10) << __func__ << " unblock backoff " << b->id
+ << " [" << b->begin << "," << b->end
+ << ")" << dendl;
+ s->backoffs.erase(b->begin);
+ p = s->backoffs_by_id.erase(p);
+
+ // check for any ops to resend
+ for (auto& q : s->ops) {
+ int r = q.second->target.contained_by(m->begin, m->end);
+ ldout(cct, 20) << __func__ << " contained_by " << r << " on "
+ << q.second->target.get_hobj() << dendl;
+ if (r) {
+ _send_op(q.second);
+ }
+ }
+ }
+ }
+ break;
+
+ default:
+ ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
+ }
+
+ sul.unlock();
+ sl.unlock();
+
+ m->put();
+ put_session(s);
+}
uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
uint32_t pos)
op_target_t() = default;
+ hobject_t get_hobj() {
+ return hobject_t(target_oid,
+ target_oloc.key,
+ CEPH_NOSNAP,
+ target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
+ target_oloc.pool,
+ target_oloc.nspace);
+ }
+
+ bool contained_by(const hobject_t& begin, const hobject_t& end) {
+ hobject_t h = get_hobj();
+ int r = cmp_bitwise(h, begin);
+ return r == 0 || (r > 0 && cmp_bitwise(h, end) < 0);
+ }
+
void dump(Formatter *f) const;
};
};
// -- osd sessions --
+ struct OSDBackoff {
+ uint64_t id;
+ hobject_t begin, end;
+ };
+
struct OSDSession : public RefCountedObject {
boost::shared_mutex lock;
using lock_guard = std::lock_guard<decltype(lock)>;
map<uint64_t, LingerOp*> linger_ops;
map<ceph_tid_t,CommandOp*> command_ops;
+ // backoffs
+ map<hobject_t,OSDBackoff,hobject_t::BitwiseComparator> backoffs;
+ multimap<uint64_t,OSDBackoff*> backoffs_by_id;
+
int osd;
int incarnation;
ConnectionRef con;
}
void handle_osd_op_reply(class MOSDOpReply *m);
+ void handle_osd_backoff(class MOSDBackoff *m);
void handle_watch_notify(class MWatchNotify *m);
void handle_osd_map(class MOSDMap *m);
void wait_for_osd_map();