// read or error?
if (ctx->op_t->empty() || result < 0) {
+ // finish side-effects
+ if (result == 0)
+ do_osd_op_effects(ctx, m->get_connection());
+
if (ctx->pending_async_reads.empty()) {
complete_read_ctx(result, ctx);
} else {
in_progress_async_reads.push_back(make_pair(op, ctx));
ctx->start_async_reads(this);
}
+
return;
}
<< entity << dendl;
oi.watchers.erase(oi_iter);
t->nop(); // update oi on disk
- ctx->watch_disconnects.push_back(w);
+ ctx->watch_disconnects.push_back(
+ OpContext::watch_disconnect_t(cookie, entity, false));
} else {
dout(10) << " can't remove: no watch by " << entity << dendl;
}
}
}
-void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
+void ReplicatedPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
{
- ConnectionRef conn(ctx->op->get_req()->get_connection());
+ entity_name_t entity = ctx->reqid.name;
+ dout(15) << "do_osd_op_effects " << entity << " con " << conn.get() << dendl;
+
+ // disconnects first
+ for (list<OpContext::watch_disconnect_t>::iterator i =
+ ctx->watch_disconnects.begin();
+ i != ctx->watch_disconnects.end();
+ ++i) {
+ pair<uint64_t, entity_name_t> watcher(i->cookie, i->name);
+ if (ctx->obc->watchers.count(watcher)) {
+ WatchRef watch = ctx->obc->watchers[watcher];
+ dout(10) << "do_osd_op_effects disconnect watcher " << watcher << dendl;
+ ctx->obc->watchers.erase(watcher);
+ watch->remove(i->send_disconnect);
+ } else {
+ dout(10) << "do_osd_op_effects disconnect failed to find watcher "
+ << watcher << dendl;
+ }
+ }
+
+ if (!conn)
+ return;
boost::intrusive_ptr<OSD::Session> session((OSD::Session *)conn->get_priv());
if (!session.get())
return;
session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
- entity_name_t entity = ctx->reqid.name;
-
- dout(15) << "do_osd_op_effects on session " << session.get() << dendl;
for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
i != ctx->watch_connects.end();
watch->connect(conn, i->second);
}
- for (list<watch_info_t>::iterator i = ctx->watch_disconnects.begin();
- i != ctx->watch_disconnects.end();
- ++i) {
- pair<uint64_t, entity_name_t> watcher(i->cookie, entity);
- dout(15) << "do_osd_op_effects applying watch disconnect on session "
- << session.get() << " and watcher " << watcher << dendl;
- if (ctx->obc->watchers.count(watcher)) {
- WatchRef watch = ctx->obc->watchers[watcher];
- dout(10) << "do_osd_op_effects applying disconnect found watcher "
- << watcher << dendl;
- ctx->obc->watchers.erase(watcher);
- watch->remove();
- } else {
- dout(10) << "do_osd_op_effects failed to find watcher "
- << watcher << dendl;
- }
- }
-
for (list<notify_info_t>::iterator p = ctx->notifies.begin();
p != ctx->notifies.end();
++p) {
dout(10) << "do_osd_op_effects, notify " << *p << dendl;
+ ConnectionRef conn(ctx->op->get_req()->get_connection());
NotifyRef notif(
Notify::makeNotifyRef(
conn,
if (result < 0)
return result;
- // finish side-effects
- if (result == 0)
- do_osd_op_effects(ctx);
-
// read-op? done?
if (ctx->op_t->empty() && !ctx->modify) {
unstable_stats.add(ctx->delta_stats);
if (repop->all_applied && repop->all_committed) {
repop->rep_done = true;
+ do_osd_op_effects(
+ repop->ctx,
+ repop->ctx->op ? repop->ctx->op->get_req()->get_connection() :
+ ConnectionRef());
+
calc_min_last_complete_ondisk();
// kick snap_trimmer if necessary
return;
}
- watch->send_disconnect();
-
- obc->watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
- obc->obs.oi.watchers.erase(make_pair(watch->get_cookie(), watch->get_entity()));
- watch->remove();
-
vector<OSDOp> ops;
ceph_tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
ctx->mtime = ceph_clock_now(cct);
ctx->at_version = get_next_version();
- entity_inst_t nobody;
+ object_info_t& oi = ctx->new_obs.oi;
+ oi.watchers.erase(make_pair(watch->get_cookie(),
+ watch->get_entity()));
- RepGather *repop = new_repop(ctx, obc, rep_tid);
+ watch_info_t w(watch->get_cookie(),
+ cct->_conf->osd_client_watch_timeout, // fixme someday
+ watch->get_peer_addr());
+ ctx->watch_disconnects.push_back(
+ OpContext::watch_disconnect_t(watch->get_cookie(), watch->get_entity(), true));
- PGBackend::PGTransaction *t = ctx->op_t;
+ entity_inst_t nobody;
+ RepGather *repop = new_repop(ctx, obc, rep_tid);
+ PGBackend::PGTransaction *t = ctx->op_t;
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
ctx->at_version,
- obc->obs.oi.version,
+ oi.version,
0,
osd_reqid_t(), ctx->mtime));
- obc->obs.oi.prior_version = repop->obc->obs.oi.version;
- obc->obs.oi.version = ctx->at_version;
+ oi.prior_version = repop->obc->obs.oi.version;
+ oi.version = ctx->at_version;
bufferlist bl;
- ::encode(obc->obs.oi, bl);
+ ::encode(oi, bl);
setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
if (pool.info.require_rollback()) {
// side effects
list<pair<watch_info_t,bool> > watch_connects; ///< new watch + will_ping flag
- list<watch_info_t> watch_disconnects;
+ struct watch_disconnect_t {
+ uint64_t cookie;
+ entity_name_t name;
+ bool send_disconnect;
+ watch_disconnect_t(uint64_t c, entity_name_t n, bool sd)
+ : cookie(c), name(n), send_disconnect(sd) {}
+ };
+ list<watch_disconnect_t> watch_disconnects; ///< old watch + send_discon
list<notify_info_t> notifies;
struct NotifyAck {
boost::optional<uint64_t> watch_cookie;
int do_tmapup(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op);
int do_tmapup_slow(OpContext *ctx, bufferlist::iterator& bp, OSDOp& osd_op, bufferlist& bl);
- void do_osd_op_effects(OpContext *ctx);
+ void do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn);
private:
hobject_t earliest_backfill() const;
bool check_src_targ(const hobject_t& soid, const hobject_t& toid) const;
return discarded;
}
-void Watch::remove()
+void Watch::remove(bool send_disconnect)
{
dout(10) << "remove" << dendl;
+ if (send_disconnect && conn) {
+ bufferlist empty;
+ MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
+ CEPH_WATCH_EVENT_DISCONNECT, empty));
+ conn->send_message(reply);
+ }
for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
i != in_progress_notifies.end();
++i) {
conn->send_message(notify_msg);
}
-void Watch::send_disconnect()
-{
- if (!conn)
- return;
- bufferlist empty;
- MWatchNotify *reply(new MWatchNotify(cookie, 0, 0,
- CEPH_WATCH_EVENT_DISCONNECT, empty));
- conn->send_message(reply);
-}
-
void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
{
dout(10) << "notify_ack" << dendl;