return partitions.erase(key);
}
-/*
-uint64_t RedisDriver::get_free_space(const DoutPrefixProvider* dpp)
-{
- int result = -1;
-
- if (!client.is_connected())
- find_client(dpp);
-
- try {
- client.info([&result](cpp_redis::reply &reply) {
- if (!reply.is_null()) {
- int usedMem = -1;
- int maxMem = -1;
-
- std::istringstream iss(reply.as_string());
- std::string line;
- while (std::getline(iss, line)) {
- size_t pos = line.find_first_of(":");
- if (pos != std::string::npos) {
- if (line.substr(0, pos) == "used_memory") {
- usedMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
- } else if (line.substr(0, line.find_first_of(":")) == "maxmemory") {
- maxMem = std::stoi(line.substr(pos + 1, line.length() - pos - 2));
- }
- }
- }
-
- if (usedMem > -1 && maxMem > -1)
- result = maxMem - usedMem;
- }
- });
-
- client.sync_commit(std::chrono::milliseconds(1000));
- } catch(std::exception &e) {
- return -1;
- }
-
- return result;
-}
-*/
-
std::optional<Partition> RedisDriver::get_partition_info(const DoutPrefixProvider* dpp, const std::string& name, const std::string& type)
{
std::string key = name + type;
return partitions_v;
}
-/* Currently an attribute but will also be part of the Entry metadata once consistency is guaranteed -Sam
-int RedisDriver::update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight)
-{
- auto iter = entries.find(key);
-
- if (iter != entries.end()) {
- iter->second.localWeight = localWeight;
- return 0;
- }
-
- return -1;
-}
-*/
-
int RedisDriver::initialize(CephContext* cct, const DoutPrefixProvider* dpp)
{
this->cct = cct;
{
std::string entry = partition_info.location + key;
- /* Every set will be treated as new */ // or maybe, if key exists, simply return? -Sam
+ /* Every set will be treated as new */
try {
boost::system::error_code ec;
response<std::string> resp;
return -1;
}
- return 0; // why is offset necessarily 0? -Sam
+ this->free_space -= bl.length();
+ return 0;
}
int RedisDriver::get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y)
int RedisDriver::del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y)
{
std::string entry = partition_info.location + key;
+ response<int> resp;
try {
boost::system::error_code ec;
- response<int> resp;
request req;
- req.push("DEL", entry);
+ req.push("HEXISTS", entry, "data");
redis_exec(conn, ec, req, resp, y);
if (ec)
return -1;
-
- return std::get<0>(resp).value() - 1;
} catch(std::exception &e) {
return -1;
}
+
+ if (std::get<0>(resp).value()) {
+ response<std::string> data;
+ response<int> ret;
+
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", entry, "data");
+
+ redis_exec(conn, ec, req, data, y);
+
+ if (ec)
+ return -1;
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("DEL", entry);
+
+ redis_exec(conn, ec, req, ret, y);
+
+ if (!std::get<0>(ret).value() || ec)
+ return -1;
+ } catch(std::exception &e) {
+ return -1;
+ }
+
+ this->free_space += std::get<0>(data).value().length();
+ }
+
+ return 0;
}
int RedisDriver::append_data(const DoutPrefixProvider* dpp, const::std::string& key, bufferlist& bl_data, optional_yield y)
return -1;
}
+ this->free_space -= bl_data.length();
return 0;
}
int RedisDriver::delete_data(const DoutPrefixProvider* dpp, const::std::string& key, optional_yield y)
{
std::string entry = partition_info.location + key;
- response<int> value;
response<int> resp;
try {
}
if (std::get<0>(resp).value()) {
+ response<std::string> data;
+ response<int> ret;
+
+ try {
+ boost::system::error_code ec;
+ request req;
+ req.push("HGET", entry, "data");
+
+ redis_exec(conn, ec, req, data, y);
+
+ if (ec) {
+ return -1;
+ }
+ } catch(std::exception &e) {
+ return -1;
+ }
+
try {
boost::system::error_code ec;
request req;
req.push("HDEL", entry, "data");
- redis_exec(conn, ec, req, value, y);
+ redis_exec(conn, ec, req, ret, y);
- if (!std::get<0>(value).value() || ec) {
+ if (!std::get<0>(ret).value() || ec) {
return -1;
}
} catch(std::exception &e) {
return -1;
}
+
+ this->free_space += std::get<0>(data).value().length();
}
return 0;
redis_exec(conn, ec, req, resp, y);
- if (ec)
+ if (std::get<0>(resp).value().empty() || ec)
return -1;
for (auto const& it : std::get<0>(resp).value()) {
return aio->get(r_obj, redis_read_op(y, conn, ofs, len, entry), cost, id);
}
-#if 0
-int RedisDriver::AsyncReadOp::init(const DoutPrefixProvider *dpp, CephContext* cct, const std::string& file_path, off_t read_ofs, off_t read_len, void* arg)
+int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) {
+ // TODO: implement
+ return -1;
+}
+
+void RedisDriver::shutdown()
{
// call cancel() on the connection's executor
boost::asio::dispatch(conn->get_executor(), [c = conn] { c->cancel(); });
}
-#endif
-
-int RedisDriver::put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs)
-{
- return 0;
-}
} } // namespace rgw::cache
remove_partition_info(partition_info);
}
- //int update_local_weight(const DoutPrefixProvider* dpp, std::string key, int localWeight); // may need to exist for base class -Sam
-
/* Partition */
virtual Partition get_current_partition_info(const DoutPrefixProvider* dpp) override { return partition_info; }
virtual uint64_t get_free_space(const DoutPrefixProvider* dpp) override { return free_space; } // how to get this from redis server? -Sam
virtual int initialize(CephContext* cct, const DoutPrefixProvider* dpp) override;
virtual int put(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs, optional_yield y) override;
+ virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
virtual int get(const DoutPrefixProvider* dpp, const std::string& key, off_t offset, uint64_t len, bufferlist& bl, rgw::sal::Attrs& attrs, optional_yield y) override;
virtual rgw::AioResultList get_async(const DoutPrefixProvider* dpp, optional_yield y, rgw::Aio* aio, const std::string& key, off_t ofs, uint64_t len, uint64_t cost, uint64_t id) override;
virtual int del(const DoutPrefixProvider* dpp, const std::string& key, optional_yield y) override;
virtual int delete_attrs(const DoutPrefixProvider* dpp, const std::string& key, rgw::sal::Attrs& del_attrs, optional_yield y) override;
virtual int set_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, const std::string& attr_val, optional_yield y) override;
virtual std::string get_attr(const DoutPrefixProvider* dpp, const std::string& key, const std::string& attr_name, optional_yield y) override;
+ void shutdown();
- virtual int put_async(const DoutPrefixProvider* dpp, const std::string& key, bufferlist& bl, uint64_t len, rgw::sal::Attrs& attrs) override;
struct redis_response {
boost::redis::response<std::string> resp;
};
- void shutdown();
struct redis_aio_handler {
rgw::Aio* throttle = nullptr;
std::shared_ptr<redis_response> s;
/* Read Callback */
- void operator()(boost::system::error_code ec, long unsigned int size) const {
+ void operator()(boost::system::error_code ec, auto) const {
r.result = -ec.value();
r.data.append(std::get<0>(s->resp).value().c_str());
throttle->put(r);