return true;
}
- if (op->may_write() || write_ordered || !hit_set) {
+ if (!hit_set) {
promote_object(obc, missing_oid, oloc, op);
return true;
+ } else if (op->may_write() || op->may_cache()) {
+ do_proxy_write(op, missing_oid);
+ } else {
+ if (can_proxy_read)
+ do_proxy_read(op);
+ else
+ promote_op = op; // for non-proxy case promote_object needs this
}
- if (can_proxy_read)
- do_proxy_read(op);
- else
- promote_op = op; // for non-proxy case promote_object needs this
-
// Avoid duplicate promotion
if (obc.get() && obc->is_blocked()) {
if (!can_proxy_read) {
complete_read_ctx(r, ctx);
}
-void ReplicatedPG::kick_proxy_read_blocked(hobject_t& soid)
+void ReplicatedPG::kick_proxy_ops_blocked(hobject_t& soid)
{
map<hobject_t, list<OpRequestRef> >::iterator p = in_progress_proxy_ops.find(soid);
if (p == in_progress_proxy_ops.end())
}
}
-void ReplicatedPG::cancel_proxy_read_ops(bool requeue)
+void ReplicatedPG::cancel_proxy_ops(bool requeue)
{
dout(10) << __func__ << dendl;
+
+ // cancel proxy reads
map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
while (p != proxyread_ops.end()) {
cancel_proxy_read((p++)->second);
}
+ // cancel proxy writes
+ map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
+ while (q != proxywrite_ops.end()) {
+ cancel_proxy_write((q++)->second);
+ }
+
if (requeue) {
map<hobject_t, list<OpRequestRef> >::iterator p =
in_progress_proxy_ops.begin();
tid(0), pwop(pw)
{}
void finish(int r) {
+ if (pwop->canceled)
+ return;
pg->lock();
+ if (pwop->canceled) {
+ pg->unlock();
+ return;
+ }
if (last_peering_reset == pg->get_last_peering_reset()) {
pg->finish_proxy_write(oid, tid, r);
}
map<hobject_t, list<OpRequestRef> >::iterator q = in_progress_proxy_ops.find(oid);
if (q == in_progress_proxy_ops.end()) {
dout(10) << __func__ << " no in_progress_proxy_ops found" << dendl;
+ delete pwop->ctx;
+ pwop->ctx = NULL;
return;
}
list<OpRequestRef>& in_progress_op = q->second;
pwop->ctx = NULL;
}
+void ReplicatedPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+{
+ dout(10) << __func__ << " " << pwop->soid << dendl;
+ pwop->canceled = true;
+
+ // cancel objecter op, if we can
+ if (pwop->objecter_tid) {
+ osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+ delete pwop->ctx;
+ pwop->ctx = NULL;
+ proxywrite_ops.erase(pwop->objecter_tid);
+ pwop->objecter_tid = 0;
+ }
+}
+
class PromoteCallback: public ReplicatedPG::CopyCallback {
ObjectContextRef obc;
ReplicatedPG *pg;
copy_ops.erase(cobc->obs.oi.soid);
cobc->stop_block();
- // cancel and requeue proxy reads on this object
+ // cancel and requeue proxy ops on this object
if (!r) {
- kick_proxy_read_blocked(cobc->obs.oi.soid);
+ kick_proxy_ops_blocked(cobc->obs.oi.soid);
for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
it != proxyread_ops.end(); ++it) {
if (it->second->soid == cobc->obs.oi.soid) {
cancel_proxy_read(it->second);
}
}
+ for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+ it != proxywrite_ops.end(); ++it) {
+ if (it->second->soid == cobc->obs.oi.soid) {
+ cancel_proxy_write(it->second);
+ }
+ }
}
kick_object_context_blocked(cobc);
unreg_next_scrub();
cancel_copy_ops(false);
cancel_flush_ops(false);
- cancel_proxy_read_ops(false);
+ cancel_proxy_ops(false);
apply_and_flush_repops(false);
pgbackend->on_change();
cancel_copy_ops(is_primary());
cancel_flush_ops(is_primary());
- cancel_proxy_read_ops(is_primary());
+ cancel_proxy_ops(is_primary());
// requeue object waiters
if (is_primary()) {
bool sent_disk;
bool sent_ack;
utime_t mtime;
+ bool canceled;
ProxyWriteOp(OpRequestRef _op, hobject_t oid, vector<OSDOp>& _ops)
: ctx(NULL), op(_op), soid(oid),
objecter_tid(0), ops(_ops),
user_version(0), sent_disk(false),
- sent_ack(false) { }
+ sent_ack(false), canceled(false) { }
};
typedef boost::shared_ptr<ProxyWriteOp> ProxyWriteOpRef;
int get_pgls_filter(bufferlist::iterator& iter, PGLSFilter **pfilter);
map<hobject_t, list<OpRequestRef> > in_progress_proxy_ops;
+ void kick_proxy_ops_blocked(hobject_t& soid);
+ void cancel_proxy_ops(bool requeue);
// -- proxyread --
map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
void do_proxy_read(OpRequestRef op);
void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
- void kick_proxy_read_blocked(hobject_t& soid);
void cancel_proxy_read(ProxyReadOpRef prdop);
- void cancel_proxy_read_ops(bool requeue);
friend struct C_ProxyRead;
void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid);
void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
+ void cancel_proxy_write(ProxyWriteOpRef pwop);
friend struct C_ProxyWrite_Apply;
friend struct C_ProxyWrite_Commit;