namespace rgw { namespace cache {
-class RedisDriver;
-
-class RedisCacheAioRequest: public CacheAioRequest {
- public:
- RedisCacheAioRequest(RedisDriver* cache_driver) : cache_driver(cache_driver) {}
- virtual ~RedisCacheAioRequest() = default;
- virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
- virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
- private:
- RedisDriver* cache_driver;
-};
-
class RedisDriver : public CacheDriver {
public:
RedisDriver(Partition& _partition_info) : partition_info(_partition_info),
virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name) override;
virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val) override;
- virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
return 0;
}
-std::unique_ptr<CacheAioRequest> SSDDriver::get_cache_aio_request_ptr(const DoutPrefixProvider* dpp)
+template <typename Executor1, typename CompletionHandler>
+auto SSDDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
{
- return std::make_unique<SSDCacheAioRequest>(this);
+ auto p = Completion::create(ex1, std::move(handler));
+ return p;
+}
+
+template <typename ExecutionContext, typename CompletionToken>
+auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+ off_t read_ofs, off_t read_len, CompletionToken&& token)
+{
+ std::string location = partition_info.location + key;
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
+
+ using Op = AsyncReadOp;
+ using Signature = typename Op::Signature;
+ boost::asio::async_completion<CompletionToken, Signature> init(token);
+ auto p = Op::create(ctx.get_executor(), init.completion_handler);
+ auto& op = p->user_data;
+
+ int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
+ if(0 == ret) {
+ ret = ::aio_read(op.aio_cb.get());
+ }
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
+ if(ret < 0) {
+ auto ec = boost::system::error_code{-ret, boost::system::system_category()};
+ ceph::async::post(std::move(p), ec, bufferlist{});
+ } else {
+ (void)p.release();
+ }
+ return init.result.get();
+}
+
+rgw::Aio::OpFunc SSDDriver::ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
+ off_t read_ofs, off_t read_len, const std::string& key) {
+ return [this, dpp, y, read_ofs, read_len, key] (Aio* aio, AioResult& r) mutable {
+ ceph_assert(y);
+ ldpp_dout(dpp, 20) << "SSDCache: cache_read_op(): Read From Cache, oid=" << r.obj.oid << dendl;
+
+ using namespace boost::asio;
+ spawn::yield_context yield = y.get_yield_context();
+ async_completion<spawn::yield_context, void()> init(yield);
+ auto ex = get_associated_executor(init.completion_handler);
+
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
+ this->get_async(dpp, y.get_io_context(), key, read_ofs, read_len, bind_executor(ex, SSDDriver::libaio_handler{aio, r}));
+ };
}
rgw::AioResultList SSDDriver::get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id)
{
rgw_raw_obj r_obj;
r_obj.oid = key;
- return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
+ return aio->get(r_obj, ssd_cache_read_op(dpp, y, this, ofs, len, key), cost, id);
}
void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c)
void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval)
{
SSDDriver::AsyncWriteRequest* c = static_cast<SSDDriver::AsyncWriteRequest*>(sigval.sival_ptr);
- ldpp_dout(c->dpp, 20) << "SSDCache: " << __func__ << "()" << dendl;
c->priv_data->libaio_write_completion_cb(c);
}
ceph::async::dispatch(std::move(p), ec, std::move(op.result));
}
-template <typename Executor1, typename CompletionHandler>
-auto SSDDriver::AsyncReadOp::create(const Executor1& ex1, CompletionHandler&& handler)
-{
- auto p = Completion::create(ex1, std::move(handler));
- return p;
-}
-
-template <typename ExecutionContext, typename CompletionToken>
-auto SSDDriver::get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
- off_t read_ofs, off_t read_len, CompletionToken&& token)
-{
- std::string location = partition_info.location + key;
- ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): location=" << location << dendl;
-
- using Op = AsyncReadOp;
- using Signature = typename Op::Signature;
- boost::asio::async_completion<CompletionToken, Signature> init(token);
- auto p = Op::create(ctx.get_executor(), init.completion_handler);
- auto& op = p->user_data;
-
- int ret = op.init(dpp, cct, location, read_ofs, read_len, p.get());
- if(0 == ret) {
- ret = ::aio_read(op.aio_cb.get());
- }
- ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): ::aio_read(), ret=" << ret << dendl;
- if(ret < 0) {
- auto ec = boost::system::error_code{-ret, boost::system::system_category()};
- ceph::async::post(std::move(p), ec, bufferlist{});
- } else {
- (void)p.release();
- }
- return init.result.get();
-}
-
int SSDDriver::update_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs)
{
std::string location = partition_info.location + key;
return removexattr(location.c_str(), attr_name.c_str());
}
-
-void SSDCacheAioRequest::cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
-{
- using namespace boost::asio;
- spawn::yield_context yield = y.get_yield_context();
- async_completion<spawn::yield_context, void()> init(yield);
- auto ex = get_associated_executor(init.completion_handler);
-
- ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): key=" << key << dendl;
- cache_driver->get_async(dpp, y.get_io_context(), key, ofs, len, bind_executor(ex, SSDDriver::libaio_handler{aio, r}));
-}
-
-void SSDCacheAioRequest::cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r)
-{
- //TODO - implement cache_aio_write
-}
-
} } // namespace rgw::cache
#include "rgw_common.h"
#include "rgw_cache_driver.h"
-namespace rgw { namespace cache { //cal stands for Cache Abstraction Layer
-
-class SSDDriver;
-
-class SSDCacheAioRequest: public CacheAioRequest {
-public:
- SSDCacheAioRequest(SSDDriver* cache_driver) : cache_driver(cache_driver) {}
- virtual ~SSDCacheAioRequest() = default;
- virtual void cache_aio_read(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, off_t ofs, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
- virtual void cache_aio_write(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, bufferlist& bl, uint64_t len, rgw::Aio* aio, rgw::AioResult& r) override;
-private:
- SSDDriver* cache_driver;
-};
+namespace rgw { namespace cache {
class SSDDriver : public CacheDriver {
public:
static std::optional<Partition> get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type);
static std::vector<Partition> list_partitions(const DoutPrefixProvider* dpp);
- virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
-
struct libaio_handler {
rgw::Aio* throttle = nullptr;
rgw::AioResult& r;
throttle->put(r);
}
};
- template <typename ExecutionContext, typename CompletionToken>
- auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
- off_t read_ofs, off_t read_len, CompletionToken&& token);
+
protected:
inline static std::unordered_map<std::string, Partition> partitions;
std::unordered_map<std::string, Entry> entries;
}
};
+template <typename ExecutionContext, typename CompletionToken>
+ auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
+ off_t read_ofs, off_t read_len, CompletionToken&& token);
+
+rgw::Aio::OpFunc ssd_cache_read_op(const DoutPrefixProvider *dpp, optional_yield y, rgw::cache::CacheDriver* cache_driver,
+ off_t read_ofs, off_t read_len, const std::string& key);
+
using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
struct AsyncReadOp {