From: J. Eric Ivancich Date: Wed, 11 May 2022 00:02:05 +0000 (-0400) Subject: rgw: initial integration of Arrow Flight code into RGW X-Git-Tag: v18.1.0~508^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=34f38970de7fdf8e1e4b24cfbfa595e15ec2b95f;p=ceph.git rgw: initial integration of Arrow Flight code into RGW Add base files rgw_arrow.h and rgw_arrow.cc. Additionally handle initialization and shutdown of a Flight Server in radosgw. Integration requires WITH_RADOSGW_ARROW_FLIGHT to be defined. Signed-off-by: J. Eric Ivancich --- diff --git a/src/common/subsys.h b/src/common/subsys.h index a6f4b8500d3..3e558b44092 100644 --- a/src/common/subsys.h +++ b/src/common/subsys.h @@ -63,6 +63,7 @@ SUBSYS(rgw_sync, 1, 5) SUBSYS(rgw_datacache, 1, 5) SUBSYS(rgw_access, 1, 5) SUBSYS(rgw_dbstore, 1, 5) +SUBSYS(rgw_flight, 1, 5) SUBSYS(javaclient, 1, 5) SUBSYS(asok, 1, 5) SUBSYS(throttle, 1, 1) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 25404632bb6..22c7ded4fb8 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -219,6 +219,12 @@ endif() if(WITH_JAEGER) list(APPEND librgw_common_srcs rgw_tracer.cc) endif() +if(WITH_RADOSGW_ARROW_FLIGHT) + # NOTE: eventually don't want this in common but just in radosgw daemon + # list(APPEND radosgw_srcs rgw_flight.cc rgw_flight_frontend.cc) + list(APPEND librgw_common_srcs rgw_flight.cc rgw_flight_frontend.cc) +endif(WITH_RADOSGW_ARROW_FLIGHT) + add_library(rgw_common STATIC ${librgw_common_srcs}) target_compile_definitions(rgw_common @@ -255,6 +261,7 @@ target_link_libraries(rgw_common ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${ARROW_LIBRARIES} + ${ARROW_FLIGHT_LIBRARIES} ${ALLOC_LIBS} PUBLIC ${LUA_LIBRARIES} @@ -383,6 +390,7 @@ target_link_libraries(rgw_a common_utf8 global ${CRYPTO_LIBS} ${ARROW_LIBRARIES} + ${ARROW_FLIGHT_LIBRARIES} OATH::OATH PUBLIC rgw_common @@ -409,6 +417,14 @@ set(radosgw_srcs add_executable(radosgw ${radosgw_srcs}) +if(WITH_RADOSGW_ARROW_FLIGHT) + # target_compile_definitions(radosgw PUBLIC WITH_ARROW_FLIGHT) + target_compile_definitions(rgw_common PUBLIC WITH_ARROW_FLIGHT) + target_include_directories(rgw_common + PUBLIC "${CMAKE_SOURCE_DIR}/src/arrow/cpp/src") + # target_include_directories(radosgw PUBLIC Arrow::Arrow) +endif(WITH_RADOSGW_ARROW_FLIGHT) + target_compile_definitions(radosgw PUBLIC "-DCLS_CLIENT_HIDE_IOCTX") target_include_directories(radosgw PUBLIC "${CMAKE_SOURCE_DIR}/src/dmclock/support/src" @@ -429,6 +445,13 @@ set(radosgw_admin_srcs rgw_admin.cc rgw_sync_checkpoint.cc rgw_orphan.cc) + +# this is unsatisfying and hopefully temporary; ARROW should not be +# part of radosgw_admin +if(WITH_RADOSGW_ARROW_FLIGHT) + list(APPEND radosgw_admin_srcs rgw_flight.cc) +endif(WITH_RADOSGW_ARROW_FLIGHT) + add_executable(radosgw-admin ${radosgw_admin_srcs}) target_link_libraries(radosgw-admin ${rgw_libs} librados cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client @@ -437,6 +460,13 @@ target_link_libraries(radosgw-admin ${rgw_libs} librados global ${LIB_RESOLV} OATH::OATH ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES}) + +# this is unsatisfying and hopefully temporary; ARROW should not be +# part of radosgw_admin +if(WITH_RADOSGW_ARROW_FLIGHT) + target_link_libraries(radosgw-admin ${ARROW_LIBRARIES} ${ARROW_FLIGHT_LIBRARIES}) +endif(WITH_RADOSGW_ARROW_FLIGHT) + install(TARGETS radosgw-admin DESTINATION bin) set(radosgw_es_srcs diff --git a/src/rgw/rgw_appmain.cc b/src/rgw/rgw_appmain.cc index 69672fa807d..77e29d55954 100644 --- a/src/rgw/rgw_appmain.cc +++ b/src/rgw/rgw_appmain.cc @@ -62,6 +62,9 @@ #ifdef WITH_RADOSGW_KAFKA_ENDPOINT #include "rgw_kafka.h" #endif +#ifdef WITH_ARROW_FLIGHT +#include "rgw_flight_frontend.h" +#endif #include "rgw_asio_frontend.h" #include "rgw_dmclock_scheduler_ctx.h" #include "rgw_lua.h" @@ -433,6 +436,16 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib) rgwlib->set_fe(static_cast(fe)); } } + else if (framework == "arrow_flight") { +#ifdef WITH_ARROW_FLIGHT + int port; + config->get_val("port", 8077, &port); + fe = new rgw::flight::FlightFrontend(cct, config, store, port); +#else + derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl; + continue; +#endif + } service_map_meta["frontend_type#" + stringify(fe_count)] = framework; service_map_meta["frontend_config#" + stringify(fe_count)] = config->get_config(); diff --git a/src/rgw/rgw_flight.cc b/src/rgw/rgw_flight.cc new file mode 100644 index 00000000000..be0a17c8f32 --- /dev/null +++ b/src/rgw/rgw_flight.cc @@ -0,0 +1,131 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include +#include +#include + +#include "arrow/type.h" +#include "arrow/flight/server.h" + +#include "common/dout.h" +#include "rgw_op.h" + +#include "rgw_flight.h" +#include "rgw_flight_frontend.h" + + +#define dout_subsys ceph_subsys_rgw_flight + +#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " +#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " +#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " +#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " + +#define INFO INFO_F(dp) +#define STATUS STATUS_F(dp) +#define WARN WARN_F(dp) +#define ERROR ERROR_F(dp) + + +namespace rgw::flight { + + std::atomic next_flight_key = 0; + + FlightData::FlightData(const req_state* state) : + key(next_flight_key++), + expires(coarse_real_clock::now() + lifespan) + { +#if 0 + bucket = new rgw::sal::Bucket(*state->bucket); +#endif + } + + FlightStore::~FlightStore() { + // empty + } + + MemoryFlightStore::~MemoryFlightStore() { + // empty + } + + FlightKey MemoryFlightStore::add_flight(FlightData&& flight) { + FlightKey key = flight.key; + + auto p = map.insert( {key, flight} ); + ceph_assertf(p.second, + "unable to add FlightData to MemoryFlightStore"); // temporary until error handling + + return key; + } + // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; } + int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; } + int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; } + int MemoryFlightStore::expire_flights() { return 0; } + + FlightServer::FlightServer(boost::intrusive_ptr& _cct, + rgw::sal::Store* _store, + FlightStore* _flight_store) : + cct(_cct), + dp(cct.get(), dout_subsys, "rgw arrow_flight: "), + store(_store), + flight_store(_flight_store) + { + INFO << "FlightServer constructed" << dendl; + } + + FlightServer::~FlightServer() + { + INFO << "FlightServer destructed" << dendl; + } + + +class RGWFlightListing : public flt::FlightListing { +public: + + RGWFlightListing() { +#if 0 + const int64_t total_records = 2; + const int64_t total_bytes = 2048; + const std::vector endpoints; + const auto descriptor = flt::FlightDescriptor::Command("fake-cmd"); + arw::FieldVector fields; + const arw::Schema schema(fields, nullptr); + auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes); +#endif + } + + arw::Status Next(std::unique_ptr* info) { + *info = nullptr; + return arw::Status::OK(); + } +}; + + + arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) { + *listings = std::make_unique(); + return arw::Status::OK(); + } + + static FlightServer* fs; + + void set_flight_server(FlightServer* _server) { + fs = _server; + } + + FlightServer* get_flight_server() { + return fs; + } + + FlightStore* get_flight_store() { + return fs ? fs->get_flight_store() : nullptr; + } + + FlightKey propose_flight(const req_state* request) { + FlightKey key = get_flight_store()->add_flight(FlightData(request)); + return key; + } + +} // namespace rgw::flight diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h new file mode 100644 index 00000000000..1f4d3721376 --- /dev/null +++ b/src/rgw/rgw_flight.h @@ -0,0 +1,101 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include +#include +#include + +#include "include/common_fwd.h" +#include "common/ceph_context.h" +#include "common/Thread.h" +#include "common/ceph_time.h" +#include "rgw_frontend.h" +#include "arrow/type.h" +#include "arrow/flight/server.h" + +#include "rgw_flight_frontend.h" + +namespace arw = arrow; +namespace flt = arrow::flight; + +struct req_state; + +namespace rgw::flight { + + static const coarse_real_clock::duration lifespan = std::chrono::hours(1); + + struct FlightData { + FlightKey key; + coarse_real_clock::time_point expires; + + rgw::sal::Bucket* bucket; +#if 0 + rgw::sal::Object object; + rgw::sal::User user; +#endif + + + FlightData(const req_state* state); + }; + + // stores flights that have been created and helps expire them + class FlightStore { + public: + virtual ~FlightStore(); + virtual FlightKey add_flight(FlightData&& flight) = 0; + virtual int get_flight(const FlightKey& key) = 0; + virtual int remove_flight(const FlightKey& key) = 0; + virtual int expire_flights() = 0; + }; + + class MemoryFlightStore : public FlightStore { + std::map map; + + public: + + virtual ~MemoryFlightStore(); + FlightKey add_flight(FlightData&& flight) override; + int get_flight(const FlightKey& key) override; + int remove_flight(const FlightKey& key) override; + int expire_flights() override; + }; + + class FlightServer : public flt::FlightServerBase { + + using Data1 = std::vector>; + + boost::intrusive_ptr cct; + const DoutPrefix dp; + rgw::sal::Store* store; + FlightStore* flight_store; + + std::map data; + + public: + + static constexpr int default_port = 8077; + + FlightServer(boost::intrusive_ptr& _cct, + rgw::sal::Store* _store, + FlightStore* _flight_store); + ~FlightServer() override; + + FlightStore* get_flight_store() { + return flight_store; + } + + arw::Status ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr* listings) override; + }; // class FlightServer + + // GLOBAL + + void set_flight_server(FlightServer* _server); + FlightServer* get_flight_server(); + FlightStore* get_flight_store(); + FlightKey propose_flight(const req_state* request); + +} // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc new file mode 100644 index 00000000000..1751e2515d3 --- /dev/null +++ b/src/rgw/rgw_flight_frontend.cc @@ -0,0 +1,103 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + + +#include "arrow/type.h" +#include "arrow/flight/server.h" + +#include "rgw_flight_frontend.h" +#include "rgw_flight.h" + +#define dout_subsys ceph_subsys_rgw_flight + +namespace rgw::flight { + + FlightFrontend::FlightFrontend(boost::intrusive_ptr& _cct, + RGWFrontendConfig* _config, + rgw::sal::Store* _store, + int _port) : + cct(_cct), + dp(cct.get(), dout_subsys, "rgw arrow_flight: "), + config(_config), + port(_port) + { + FlightStore* flight_store = new MemoryFlightStore(); + flight_server = new FlightServer(_cct, _store, flight_store); + } + + FlightFrontend::~FlightFrontend() { + delete flight_server; + } + + int FlightFrontend::init() { + if (port <= 0) { + port = FlightServer::default_port; + } + const std::string url = + std::string("grpc+tcp://localhost:") + std::to_string(port); + flt::Location location; + arw::Status s = flt::Location::Parse(url, &location); + if (!s.ok()) { + return -EINVAL; + } + + flt::FlightServerOptions options(location); + options.verify_client = false; + s = flight_server->Init(options); + if (!s.ok()) { + return -EINVAL; + } + + dout(20) << "STATUS: " << __func__ << + ": FlightServer inited; will use port " << port << dendl; + return 0; + } + + int FlightFrontend::run() { + try { + flight_thread = make_named_thread(server_thread_name, + &FlightServer::Serve, + flight_server); + set_flight_server(flight_server); + + dout(20) << "INFO: " << __func__ << + ": FlightServer thread started, id=" << flight_thread.get_id() << + ", joinable=" << flight_thread.joinable() << dendl; + return 0; + } catch (std::system_error& e) { + derr << "ERROR: " << __func__ << + ": FlightServer thread failed to start" << dendl; + return -ENOSPC; + } + } + + void FlightFrontend::stop() { + set_flight_server(nullptr); + flight_server->Shutdown(); + flight_server->Wait(); + dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl; + } + + void FlightFrontend::join() { + flight_thread.join(); + dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl; + } + + void FlightFrontend::pause_for_new_config() { + // ignore since config changes won't alter flight_server + } + + void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store, + rgw_auth_registry_ptr_t auth_registry) { + // ignore since config changes won't alter flight_server + } + + int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { + // do work here + dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl; + + // chain upwards + return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + } + +} // namespace rgw::flight diff --git a/src/rgw/rgw_flight_frontend.h b/src/rgw/rgw_flight_frontend.h new file mode 100644 index 00000000000..c74878a884b --- /dev/null +++ b/src/rgw/rgw_flight_frontend.h @@ -0,0 +1,69 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include "include/common_fwd.h" +#include "common/Thread.h" +#include "rgw_frontend.h" +#include "rgw_op.h" + + +namespace rgw::flight { + + using FlightKey = uint32_t; + + class FlightServer; + + class FlightFrontend : public RGWFrontend { + + static constexpr std::string_view server_thread_name = + "Arrow Flight Server thread"; + + boost::intrusive_ptr& cct; + const DoutPrefix dp; + FlightServer* flight_server; // pointer so header file doesn't need to pull in too much + std::thread flight_thread; + RGWFrontendConfig* config; + int port; + + public: + + // port <= 0 -> let server decide; typically 8077 + FlightFrontend(boost::intrusive_ptr& cct, + RGWFrontendConfig* config, + rgw::sal::Store* store, + int port = -1); + ~FlightFrontend() override; + int init() override; + int run() override; + void stop() override; + void join() override; + + void pause_for_new_config() override; + void unpause_with_new_config(rgw::sal::Store* store, + rgw_auth_registry_ptr_t auth_registry) override; + + }; // class FlightFrontend + + class FlightGetObj_Filter : public RGWGetObj_Filter { + + FlightKey key; + + public: + + FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) : + RGWGetObj_Filter(next), + key(_key) + { + // empty + } + + int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override; +#if 0 + // this would allow the range to be modified if necessary; + int fixup_range(off_t& ofs, off_t& end) override; +#endif + }; + +} // namespace rgw::flight diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index c847bfe2957..9c238827445 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -67,6 +67,11 @@ #include "compressor/Compressor.h" +#ifdef WITH_ARROW_FLIGHT +#include "rgw_flight.h" +#include "rgw_flight_frontend.h" +#endif + #ifdef WITH_LTTNG #define TRACEPOINT_DEFINE #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE @@ -2150,6 +2155,9 @@ void RGWGetObj::execute(optional_yield y) RGWGetObj_CB cb(this); RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb; boost::optional decompress; +#ifdef WITH_ARROW_FLIGHT + boost::optional flight_filter; +#endif std::unique_ptr decrypt; std::unique_ptr run_lua; map::iterator attr_iter; @@ -2228,6 +2236,16 @@ void RGWGetObj::execute(optional_yield y) goto done_err; } +#ifdef WITH_ARROW_FLIGHT + if (ofs == 0) { + rgw::flight::FlightKey key = rgw::flight::propose_flight(s); + ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl; + + flight_filter.emplace(key, filter); + filter = &*flight_filter; + } +#endif + op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info); if (op_ret < 0) { ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;