MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
_op->get_nonconst_req());
parent->maybe_preempt_replica_scrub(op->op.soid);
- handle_sub_write(op->op.from, _op, op->op, _op->pg_trace);
+ handle_sub_write(op->op.from, _op, op->op, _op->pg_trace, *get_parent()->get_eclistener());
return true;
}
case MSG_OSD_EC_WRITE_REPLY: {
pg_shard_t from,
OpRequestRef msg,
ECSubWrite &op,
- const ZTracer::Trace &trace)
+ const ZTracer::Trace &trace,
+ ECListener&)
{
if (msg) {
msg->mark_event("sub_op_started");
* the pull on the affected objects and pushes from in-memory buffers
* on any now complete unaffected objects.
*/
+#ifndef WITH_SEASTAR
get_parent()->schedule_recovery_work(
get_parent()->bless_unlocked_gencontext(
new FinishReadOp(*this, op.tid)),
1);
+#else
+ // TODO
+ ceph_abort_msg("not yet implemented");
+#endif
}
}
#include <boost/intrusive/set.hpp>
#include <boost/intrusive/list.hpp>
-#include "OSD.h"
-#include "PGBackend.h"
#include "erasure-code/ErasureCodeInterface.h"
#include "ECUtil.h"
+#if WITH_SEASTAR
+#include "ExtentCache.h"
+#include "crimson/osd/object_context.h"
+#include "os/Transaction.h"
+#include "osd/OSDMap.h"
+#include "osd/osd_op_util.h"
+
+struct ECTransaction {
+ struct WritePlan {
+ bool invalidates_cache = false; // Yes, both are possible
+ std::map<hobject_t,extent_set> to_read;
+ std::map<hobject_t,extent_set> will_write; // superset of to_read
+
+ std::map<hobject_t,ECUtil::HashInfoRef> hash_infos;
+ };
+};
+
+typedef void* OpRequestRef;
+typedef crimson::osd::ObjectContextRef ObjectContextRef;
+#else
+#include "common/WorkQueue.h"
+#endif
+
#include "ECTransaction.h"
#include "ExtentCache.h"
//forward declaration
struct ECSubWrite;
-struct ECSubWriteReply;
-struct ECSubRead;
-struct ECSubReadReply;
+struct PGLog;
// ECListener -- an interface decoupling the pipelines from
// particular implementation of ECBackend (crimson vs cassical).
virtual void cancel_pull(
const hobject_t &soid) = 0;
// XXX
+#ifndef WITH_SEASTAR
+ virtual GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+ GenContext<ThreadPool::TPHandle&> *c) = 0;
+
virtual void schedule_recovery_work(
GenContext<ThreadPool::TPHandle&> *c,
uint64_t cost) = 0;
+#endif
virtual epoch_t get_interval_start_epoch() const = 0;
virtual const std::set<pg_shard_t> &get_acting_shards() const = 0;
virtual void apply_stats(
const hobject_t &soid,
const object_stat_sum_t &delta_stats) = 0;
+
+ // new batch
+ virtual bool is_missing_object(const hobject_t& oid) const = 0;
+ virtual void add_local_next_event(const pg_log_entry_t& e) = 0;
+ virtual void log_operation(
+ std::vector<pg_log_entry_t>&& logv,
+ const std::optional<pg_hit_set_history_t> &hset_history,
+ const eversion_t &trim_to,
+ const eversion_t &roll_forward_to,
+ const eversion_t &min_last_complete_ondisk,
+ bool transaction_applied,
+ ObjectStore::Transaction &t,
+ bool async = false) = 0;
+ virtual void op_applied(
+ const eversion_t &applied_version) = 0;
};
struct ECCommon {
pg_shard_t from,
OpRequestRef msg,
ECSubWrite &op,
- const ZTracer::Trace &trace
+ const ZTracer::Trace &trace,
+ ECListener& eclistener
) = 0;
virtual void objects_read_and_reconstruct(
bool want_attrs)
: to_read(to_read), need(need), want_attrs(want_attrs) {}
};
- friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+ friend std::ostream &operator<<(std::ostream &lhs, const read_request_t &rhs);
struct ReadOp;
/**
* Low level async read mechanism
std::map<shard_id_t, pg_shard_t> &shards,
bool for_recovery);
- friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+ friend std::ostream &operator<<(std::ostream &lhs, const ReadOp &rhs);
friend struct FinishReadOp;
void get_want_to_read_shards(std::set<int> *want_to_read) const;
pg_t pgid,
const ECUtil::stripe_info_t &sinfo,
std::map<hobject_t,extent_map> *written,
- std::map<shard_id_t, ObjectStore::Transaction> *transactions,
+ std::map<shard_id_t, ceph::os::Transaction> *transactions,
DoutPrefixProvider *dpp,
const ceph_release_t require_osd_release = ceph_release_t::unknown) = 0;
};
using OpRef = std::unique_ptr<Op>;
using op_list = boost::intrusive::list<Op>;
- friend ostream &operator<<(ostream &lhs, const Op &rhs);
+ friend std::ostream &operator<<(std::ostream &lhs, const Op &rhs);
ExtentCache cache;
std::map<ceph_tid_t, OpRef> tid_to_op_map; /// Owns Op structure
void clear() {
pipeline_state = CACHE_VALID;
}
- friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+ friend std::ostream &operator<<(std::ostream &lhs, const pipeline_state_t &rhs);
} pipeline_state;
op_list waiting_state; /// writes waiting on pipe_state
ECSubWrite &op,
const ZTracer::Trace &trace
) {
- ec_backend.handle_sub_write(from, std::move(msg), op, trace);
+ ec_backend.handle_sub_write(from, std::move(msg), op, trace, *get_parent());
}
// end of iface