SUBSYS(rgw_datacache, 1, 5)
SUBSYS(rgw_access, 1, 5)
SUBSYS(rgw_dbstore, 1, 5)
+SUBSYS(rgw_flight, 1, 5)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)
if(WITH_JAEGER)
list(APPEND librgw_common_srcs rgw_tracer.cc)
endif()
+if(WITH_RADOSGW_ARROW_FLIGHT)
+ # NOTE: eventually don't want this in common but just in radosgw daemon
+ # list(APPEND radosgw_srcs rgw_flight.cc rgw_flight_frontend.cc)
+ list(APPEND librgw_common_srcs rgw_flight.cc rgw_flight_frontend.cc)
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
add_library(rgw_common STATIC ${librgw_common_srcs})
target_compile_definitions(rgw_common
${CURL_LIBRARIES}
${EXPAT_LIBRARIES}
${ARROW_LIBRARIES}
+ ${ARROW_FLIGHT_LIBRARIES}
${ALLOC_LIBS}
PUBLIC
${LUA_LIBRARIES}
common_utf8 global
${CRYPTO_LIBS}
${ARROW_LIBRARIES}
+ ${ARROW_FLIGHT_LIBRARIES}
OATH::OATH
PUBLIC
rgw_common
add_executable(radosgw ${radosgw_srcs})
+if(WITH_RADOSGW_ARROW_FLIGHT)
+ # target_compile_definitions(radosgw PUBLIC WITH_ARROW_FLIGHT)
+ target_compile_definitions(rgw_common PUBLIC WITH_ARROW_FLIGHT)
+ target_include_directories(rgw_common
+ PUBLIC "${CMAKE_SOURCE_DIR}/src/arrow/cpp/src")
+ # target_include_directories(radosgw PUBLIC Arrow::Arrow)
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
target_compile_definitions(radosgw PUBLIC "-DCLS_CLIENT_HIDE_IOCTX")
target_include_directories(radosgw
PUBLIC "${CMAKE_SOURCE_DIR}/src/dmclock/support/src"
rgw_admin.cc
rgw_sync_checkpoint.cc
rgw_orphan.cc)
+
+# this is unsatisfying and hopefully temporary; ARROW should not be
+# part of radosgw_admin
+if(WITH_RADOSGW_ARROW_FLIGHT)
+ list(APPEND radosgw_admin_srcs rgw_flight.cc)
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
add_executable(radosgw-admin ${radosgw_admin_srcs})
target_link_libraries(radosgw-admin ${rgw_libs} librados
cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
global ${LIB_RESOLV}
OATH::OATH
${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
+
+# this is unsatisfying and hopefully temporary; ARROW should not be
+# part of radosgw_admin
+if(WITH_RADOSGW_ARROW_FLIGHT)
+ target_link_libraries(radosgw-admin ${ARROW_LIBRARIES} ${ARROW_FLIGHT_LIBRARIES})
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
install(TARGETS radosgw-admin DESTINATION bin)
set(radosgw_es_srcs
#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
#include "rgw_kafka.h"
#endif
+#ifdef WITH_ARROW_FLIGHT
+#include "rgw_flight_frontend.h"
+#endif
#include "rgw_asio_frontend.h"
#include "rgw_dmclock_scheduler_ctx.h"
#include "rgw_lua.h"
rgwlib->set_fe(static_cast<RGWLibFrontend*>(fe));
}
}
+ else if (framework == "arrow_flight") {
+#ifdef WITH_ARROW_FLIGHT
+ int port;
+ config->get_val("port", 8077, &port);
+ fe = new rgw::flight::FlightFrontend(cct, config, store, port);
+#else
+ derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl;
+ continue;
+#endif
+ }
service_map_meta["frontend_type#" + stringify(fe_count)] = framework;
service_map_meta["frontend_config#" + stringify(fe_count)] = config->get_config();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <iostream>
+#include <mutex>
+#include <map>
+
+#include "arrow/type.h"
+#include "arrow/flight/server.h"
+
+#include "common/dout.h"
+#include "rgw_op.h"
+
+#include "rgw_flight.h"
+#include "rgw_flight_frontend.h"
+
+
+#define dout_subsys ceph_subsys_rgw_flight
+
+#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
+#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
+#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": "
+#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": "
+
+#define INFO INFO_F(dp)
+#define STATUS STATUS_F(dp)
+#define WARN WARN_F(dp)
+#define ERROR ERROR_F(dp)
+
+
+namespace rgw::flight {
+
+ std::atomic<FlightKey> next_flight_key = 0;
+
+ FlightData::FlightData(const req_state* state) :
+ key(next_flight_key++),
+ expires(coarse_real_clock::now() + lifespan)
+ {
+#if 0
+ bucket = new rgw::sal::Bucket(*state->bucket);
+#endif
+ }
+
+ FlightStore::~FlightStore() {
+ // empty
+ }
+
+ MemoryFlightStore::~MemoryFlightStore() {
+ // empty
+ }
+
+ FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
+ FlightKey key = flight.key;
+
+ auto p = map.insert( {key, flight} );
+ ceph_assertf(p.second,
+ "unable to add FlightData to MemoryFlightStore"); // temporary until error handling
+
+ return key;
+ }
+ // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; }
+ int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; }
+ int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; }
+ int MemoryFlightStore::expire_flights() { return 0; }
+
+ FlightServer::FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
+ rgw::sal::Store* _store,
+ FlightStore* _flight_store) :
+ cct(_cct),
+ dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
+ store(_store),
+ flight_store(_flight_store)
+ {
+ INFO << "FlightServer constructed" << dendl;
+ }
+
+ FlightServer::~FlightServer()
+ {
+ INFO << "FlightServer destructed" << dendl;
+ }
+
+
+class RGWFlightListing : public flt::FlightListing {
+public:
+
+ RGWFlightListing() {
+#if 0
+ const int64_t total_records = 2;
+ const int64_t total_bytes = 2048;
+ const std::vector<flt::FlightEndpoint> endpoints;
+ const auto descriptor = flt::FlightDescriptor::Command("fake-cmd");
+ arw::FieldVector fields;
+ const arw::Schema schema(fields, nullptr);
+ auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes);
+#endif
+ }
+
+ arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
+ *info = nullptr;
+ return arw::Status::OK();
+ }
+};
+
+
+ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
+ const flt::Criteria* criteria,
+ std::unique_ptr<flt::FlightListing>* listings) {
+ *listings = std::make_unique<RGWFlightListing>();
+ return arw::Status::OK();
+ }
+
+ static FlightServer* fs;
+
+ void set_flight_server(FlightServer* _server) {
+ fs = _server;
+ }
+
+ FlightServer* get_flight_server() {
+ return fs;
+ }
+
+ FlightStore* get_flight_store() {
+ return fs ? fs->get_flight_store() : nullptr;
+ }
+
+ FlightKey propose_flight(const req_state* request) {
+ FlightKey key = get_flight_store()->add_flight(FlightData(request));
+ return key;
+ }
+
+} // namespace rgw::flight
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <map>
+#include <mutex>
+#include <atomic>
+
+#include "include/common_fwd.h"
+#include "common/ceph_context.h"
+#include "common/Thread.h"
+#include "common/ceph_time.h"
+#include "rgw_frontend.h"
+#include "arrow/type.h"
+#include "arrow/flight/server.h"
+
+#include "rgw_flight_frontend.h"
+
+namespace arw = arrow;
+namespace flt = arrow::flight;
+
+struct req_state;
+
+namespace rgw::flight {
+
+ static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
+
+ struct FlightData {
+ FlightKey key;
+ coarse_real_clock::time_point expires;
+
+ rgw::sal::Bucket* bucket;
+#if 0
+ rgw::sal::Object object;
+ rgw::sal::User user;
+#endif
+
+
+ FlightData(const req_state* state);
+ };
+
+ // stores flights that have been created and helps expire them
+ class FlightStore {
+ public:
+ virtual ~FlightStore();
+ virtual FlightKey add_flight(FlightData&& flight) = 0;
+ virtual int get_flight(const FlightKey& key) = 0;
+ virtual int remove_flight(const FlightKey& key) = 0;
+ virtual int expire_flights() = 0;
+ };
+
+ class MemoryFlightStore : public FlightStore {
+ std::map<FlightKey, FlightData> map;
+
+ public:
+
+ virtual ~MemoryFlightStore();
+ FlightKey add_flight(FlightData&& flight) override;
+ int get_flight(const FlightKey& key) override;
+ int remove_flight(const FlightKey& key) override;
+ int expire_flights() override;
+ };
+
+ class FlightServer : public flt::FlightServerBase {
+
+ using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
+
+ boost::intrusive_ptr<ceph::common::CephContext> cct;
+ const DoutPrefix dp;
+ rgw::sal::Store* store;
+ FlightStore* flight_store;
+
+ std::map<std::string, Data1> data;
+
+ public:
+
+ static constexpr int default_port = 8077;
+
+ FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
+ rgw::sal::Store* _store,
+ FlightStore* _flight_store);
+ ~FlightServer() override;
+
+ FlightStore* get_flight_store() {
+ return flight_store;
+ }
+
+ arw::Status ListFlights(const flt::ServerCallContext& context,
+ const flt::Criteria* criteria,
+ std::unique_ptr<flt::FlightListing>* listings) override;
+ }; // class FlightServer
+
+ // GLOBAL
+
+ void set_flight_server(FlightServer* _server);
+ FlightServer* get_flight_server();
+ FlightStore* get_flight_store();
+ FlightKey propose_flight(const req_state* request);
+
+} // namespace rgw::flight
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+
+#include "arrow/type.h"
+#include "arrow/flight/server.h"
+
+#include "rgw_flight_frontend.h"
+#include "rgw_flight.h"
+
+#define dout_subsys ceph_subsys_rgw_flight
+
+namespace rgw::flight {
+
+ FlightFrontend::FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
+ RGWFrontendConfig* _config,
+ rgw::sal::Store* _store,
+ int _port) :
+ cct(_cct),
+ dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
+ config(_config),
+ port(_port)
+ {
+ FlightStore* flight_store = new MemoryFlightStore();
+ flight_server = new FlightServer(_cct, _store, flight_store);
+ }
+
+ FlightFrontend::~FlightFrontend() {
+ delete flight_server;
+ }
+
+ int FlightFrontend::init() {
+ if (port <= 0) {
+ port = FlightServer::default_port;
+ }
+ const std::string url =
+ std::string("grpc+tcp://localhost:") + std::to_string(port);
+ flt::Location location;
+ arw::Status s = flt::Location::Parse(url, &location);
+ if (!s.ok()) {
+ return -EINVAL;
+ }
+
+ flt::FlightServerOptions options(location);
+ options.verify_client = false;
+ s = flight_server->Init(options);
+ if (!s.ok()) {
+ return -EINVAL;
+ }
+
+ dout(20) << "STATUS: " << __func__ <<
+ ": FlightServer inited; will use port " << port << dendl;
+ return 0;
+ }
+
+ int FlightFrontend::run() {
+ try {
+ flight_thread = make_named_thread(server_thread_name,
+ &FlightServer::Serve,
+ flight_server);
+ set_flight_server(flight_server);
+
+ dout(20) << "INFO: " << __func__ <<
+ ": FlightServer thread started, id=" << flight_thread.get_id() <<
+ ", joinable=" << flight_thread.joinable() << dendl;
+ return 0;
+ } catch (std::system_error& e) {
+ derr << "ERROR: " << __func__ <<
+ ": FlightServer thread failed to start" << dendl;
+ return -ENOSPC;
+ }
+ }
+
+ void FlightFrontend::stop() {
+ set_flight_server(nullptr);
+ flight_server->Shutdown();
+ flight_server->Wait();
+ dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl;
+ }
+
+ void FlightFrontend::join() {
+ flight_thread.join();
+ dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl;
+ }
+
+ void FlightFrontend::pause_for_new_config() {
+ // ignore since config changes won't alter flight_server
+ }
+
+ void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store,
+ rgw_auth_registry_ptr_t auth_registry) {
+ // ignore since config changes won't alter flight_server
+ }
+
+ int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
+ // do work here
+ dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl;
+
+ // chain upwards
+ return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
+ }
+
+} // namespace rgw::flight
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include "include/common_fwd.h"
+#include "common/Thread.h"
+#include "rgw_frontend.h"
+#include "rgw_op.h"
+
+
+namespace rgw::flight {
+
+ using FlightKey = uint32_t;
+
+ class FlightServer;
+
+ class FlightFrontend : public RGWFrontend {
+
+ static constexpr std::string_view server_thread_name =
+ "Arrow Flight Server thread";
+
+ boost::intrusive_ptr<ceph::common::CephContext>& cct;
+ const DoutPrefix dp;
+ FlightServer* flight_server; // pointer so header file doesn't need to pull in too much
+ std::thread flight_thread;
+ RGWFrontendConfig* config;
+ int port;
+
+ public:
+
+ // port <= 0 -> let server decide; typically 8077
+ FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& cct,
+ RGWFrontendConfig* config,
+ rgw::sal::Store* store,
+ int port = -1);
+ ~FlightFrontend() override;
+ int init() override;
+ int run() override;
+ void stop() override;
+ void join() override;
+
+ void pause_for_new_config() override;
+ void unpause_with_new_config(rgw::sal::Store* store,
+ rgw_auth_registry_ptr_t auth_registry) override;
+
+ }; // class FlightFrontend
+
+ class FlightGetObj_Filter : public RGWGetObj_Filter {
+
+ FlightKey key;
+
+ public:
+
+ FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) :
+ RGWGetObj_Filter(next),
+ key(_key)
+ {
+ // empty
+ }
+
+ int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+#if 0
+ // this would allow the range to be modified if necessary;
+ int fixup_range(off_t& ofs, off_t& end) override;
+#endif
+ };
+
+} // namespace rgw::flight
#include "compressor/Compressor.h"
+#ifdef WITH_ARROW_FLIGHT
+#include "rgw_flight.h"
+#include "rgw_flight_frontend.h"
+#endif
+
#ifdef WITH_LTTNG
#define TRACEPOINT_DEFINE
#define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
RGWGetObj_CB cb(this);
RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb;
boost::optional<RGWGetObj_Decompress> decompress;
+#ifdef WITH_ARROW_FLIGHT
+ boost::optional<rgw::flight::FlightGetObj_Filter> flight_filter;
+#endif
std::unique_ptr<RGWGetObj_Filter> decrypt;
std::unique_ptr<RGWGetObj_Filter> run_lua;
map<string, bufferlist>::iterator attr_iter;
goto done_err;
}
+#ifdef WITH_ARROW_FLIGHT
+ if (ofs == 0) {
+ rgw::flight::FlightKey key = rgw::flight::propose_flight(s);
+ ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl;
+
+ flight_filter.emplace(key, filter);
+ filter = &*flight_filter;
+ }
+#endif
+
op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
if (op_ret < 0) {
ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;