and redis driver and squashes the following commits.
rgw/cache: implementation of async put. the call
does not take into account throttling for now.
rgw/cache: dummy implementation of put_async in redis
driver to fix compilation error.
Signed-off-by: Pritha Srivastava <prsrivas@redhat.com>
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) = 0;
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) = 0;
+ virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) = 0;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) = 0;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) = 0;
{
}
+int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
+{
+ return 0;
+}
} } // namespace rgw::cache
virtual std::unique_ptr<CacheAioRequest> get_cache_aio_request_ptr(const DoutPrefixProvider* dpp) override;
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
+ virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
struct libaio_handler { // should this be the same as SSDDriver? -Sam
rgw::Aio* throttle = nullptr;
return -errno;
}
- efs::space_info space = efs::space(location);
+ efs::space_info space = efs::space(partition_info.location);
this->free_space = space.available;
if (attrs.size() > 0) {
return aio->get(r_obj, rgw::Aio::cache_read_op(dpp, y, this, ofs, len, key), cost, id);
}
+void SSDDriver::libaio_write_completion_cb(AsyncWriteRequest* c)
+{
+ efs::space_info space = efs::space(partition_info.location);
+ this->free_space = space.available;
+
+ insert_entry(c->dpp, c->key, 0, c->cb->aio_nbytes);
+}
+
+int SSDDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
+{
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, oid=" << key << ", len=" << len << dendl;
+ struct AsyncWriteRequest* wr = new struct AsyncWriteRequest(dpp);
+ int r = 0;
+ if ((r = wr->prepare_libaio_write_op(dpp, bl, len, key, partition_info.location)) < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() prepare libaio write op r=" << r << dendl;
+ return r;
+ }
+ wr->cb->aio_sigevent.sigev_notify = SIGEV_THREAD;
+ wr->cb->aio_sigevent.sigev_notify_function = SSDDriver::AsyncWriteRequest::libaio_write_cb;
+ wr->cb->aio_sigevent.sigev_notify_attributes = nullptr;
+ wr->cb->aio_sigevent.sigev_value.sival_ptr = (void*)wr;
+ wr->key = key;
+ wr->priv_data = this;
+
+ if ((r = ::aio_write(wr->cb)) != 0) {
+ ldpp_dout(dpp, 0) << "ERROR: SSDCache: " << __func__ << "() aio_write r=" << r << dendl;
+ delete wr;
+ return r;
+ }
+ return 0;
+}
+
int SSDDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key)
{
std::string location = partition_info.location + key;
return remove_entry(dpp, key);
}
+int SSDDriver::AsyncWriteRequest::prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location)
+{
+ std::string location = cache_location + key;
+ int r = 0;
+
+ ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): Write To Cache, location=" << location << dendl;
+ cb = new struct aiocb;
+ mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
+ memset(cb, 0, sizeof(struct aiocb));
+ r = fd = ::open(location.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode);
+ if (fd < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: open file failed, errno=" << errno << ", location='" << location.c_str() << "'" << dendl;
+ return r;
+ }
+ if (dpp->get_cct()->_conf->rgw_d3n_l1_fadvise != POSIX_FADV_NORMAL)
+ posix_fadvise(fd, 0, 0, dpp->get_cct()->_conf->rgw_d3n_l1_fadvise);
+ cb->aio_fildes = fd;
+
+ data = malloc(len);
+ if (!data) {
+ ldpp_dout(dpp, 0) << "ERROR: AsyncWriteRequest::prepare_libaio_write_op: memory allocation failed" << dendl;
+ ::close(fd);
+ return r;
+ }
+ cb->aio_buf = data;
+ memcpy((void*)data, bl.c_str(), len);
+ cb->aio_nbytes = len;
+ return r;
+}
+
+void SSDDriver::AsyncWriteRequest::libaio_write_cb(sigval sigval)
+{
+ SSDDriver::AsyncWriteRequest* c = static_cast<SSDDriver::AsyncWriteRequest*>(sigval.sival_ptr);
+ ldpp_dout(c->dpp, 20) << "SSDCache: " << __func__ << "()" << dendl;
+ c->priv_data->libaio_write_completion_cb(c);
+}
+
int SSDDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
{
ldpp_dout(dpp, 20) << "SSDCache: " << __func__ << "(): file_path=" << file_path << dendl;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs) override;
virtual rgw::AioResultList get_async (const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
+ virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data) = 0;
virtual int delete_data(const DoutPrefixProvider* dpp, const::std::string& key) override;
virtual int get_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& attrs) override;
template <typename ExecutionContext, typename CompletionToken>
auto get_async(const DoutPrefixProvider *dpp, ExecutionContext& ctx, const std::string& key,
off_t read_ofs, off_t read_len, CompletionToken&& token);
-
protected:
static std::unordered_map<std::string, Partition> partitions;
std::unordered_map<std::string, Entry> entries;
std::optional<Entry> get_entry(const DoutPrefixProvider* dpp, std::string key);
private:
+
// unique_ptr with custom deleter for struct aiocb
struct libaio_aiocb_deleter {
void operator()(struct aiocb* c) {
using unique_aio_cb_ptr = std::unique_ptr<struct aiocb, libaio_aiocb_deleter>;
struct AsyncReadOp {
- bufferlist result;
- unique_aio_cb_ptr aio_cb;
- using Signature = void(boost::system::error_code, bufferlist);
- using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
+ bufferlist result;
+ unique_aio_cb_ptr aio_cb;
+ using Signature = void(boost::system::error_code, bufferlist);
+ using Completion = ceph::async::Completion<Signature, AsyncReadOp>;
- int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
- static void libaio_cb_aio_dispatch(sigval sigval);
+ int init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg);
+ static void libaio_cb_aio_dispatch(sigval sigval);
- template <typename Executor1, typename CompletionHandler>
- static auto create(const Executor1& ex1, CompletionHandler&& handler);
- };
+ template <typename Executor1, typename CompletionHandler>
+ static auto create(const Executor1& ex1, CompletionHandler&& handler);
+};
+
+struct AsyncWriteRequest {
+ const DoutPrefixProvider* dpp;
+ std::string key;
+ void *data;
+ int fd;
+ struct aiocb *cb;
+ SSDDriver *priv_data;
+
+ AsyncWriteRequest(const DoutPrefixProvider* dpp) : dpp(dpp) {}
+ int prepare_libaio_write_op(const DoutPrefixProvider *dpp, bufferlist& bl, unsigned int len, std::string key, std::string cache_location);
+ static void libaio_write_cb(sigval sigval);
+
+ ~AsyncWriteRequest() {
+ ::close(fd);
+ cb->aio_buf = nullptr;
+ delete(cb);
+ }
+};
+
+void libaio_write_completion_cb(AsyncWriteRequest* c);
};