]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial integration of Arrow Flight code into RGW
authorJ. Eric Ivancich <ivancich@redhat.com>
Wed, 11 May 2022 00:02:05 +0000 (20:02 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Thu, 15 Dec 2022 17:49:40 +0000 (12:49 -0500)
Add base files rgw_arrow.h and rgw_arrow.cc. Additionally handle
initialization and shutdown of a Flight Server in radosgw.

Integration requires WITH_RADOSGW_ARROW_FLIGHT to be defined.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/common/subsys.h
src/rgw/CMakeLists.txt
src/rgw/rgw_appmain.cc
src/rgw/rgw_flight.cc [new file with mode: 0644]
src/rgw/rgw_flight.h [new file with mode: 0644]
src/rgw/rgw_flight_frontend.cc [new file with mode: 0644]
src/rgw/rgw_flight_frontend.h [new file with mode: 0644]
src/rgw/rgw_op.cc

index a6f4b8500d35bfe253491a90fb06fe2cc103ec39..3e558b44092519e446b2bcb370397c2ce3aecb0e 100644 (file)
@@ -63,6 +63,7 @@ SUBSYS(rgw_sync, 1, 5)
 SUBSYS(rgw_datacache, 1, 5)
 SUBSYS(rgw_access, 1, 5)
 SUBSYS(rgw_dbstore, 1, 5)
+SUBSYS(rgw_flight, 1, 5)
 SUBSYS(javaclient, 1, 5)
 SUBSYS(asok, 1, 5)
 SUBSYS(throttle, 1, 1)
index 25404632bb6221072f683c11ffa097b94ea1f61a..22c7ded4fb8c1c82b7cecb117897ff5cf63085c9 100644 (file)
@@ -219,6 +219,12 @@ endif()
 if(WITH_JAEGER)
   list(APPEND librgw_common_srcs rgw_tracer.cc)
 endif()
+if(WITH_RADOSGW_ARROW_FLIGHT)
+  # NOTE: eventually don't want this in common but just in radosgw daemon
+  # list(APPEND radosgw_srcs rgw_flight.cc rgw_flight_frontend.cc)
+  list(APPEND librgw_common_srcs rgw_flight.cc rgw_flight_frontend.cc)
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
 
 add_library(rgw_common STATIC ${librgw_common_srcs})
 target_compile_definitions(rgw_common
@@ -255,6 +261,7 @@ target_link_libraries(rgw_common
     ${CURL_LIBRARIES}
     ${EXPAT_LIBRARIES}
     ${ARROW_LIBRARIES}
+    ${ARROW_FLIGHT_LIBRARIES}
     ${ALLOC_LIBS}
   PUBLIC
     ${LUA_LIBRARIES}
@@ -383,6 +390,7 @@ target_link_libraries(rgw_a
     common_utf8 global
     ${CRYPTO_LIBS}
     ${ARROW_LIBRARIES}
+    ${ARROW_FLIGHT_LIBRARIES}
     OATH::OATH
   PUBLIC
     rgw_common
@@ -409,6 +417,14 @@ set(radosgw_srcs
 
 add_executable(radosgw ${radosgw_srcs})
 
+if(WITH_RADOSGW_ARROW_FLIGHT)
+  # target_compile_definitions(radosgw PUBLIC WITH_ARROW_FLIGHT)
+  target_compile_definitions(rgw_common PUBLIC WITH_ARROW_FLIGHT)
+  target_include_directories(rgw_common
+    PUBLIC "${CMAKE_SOURCE_DIR}/src/arrow/cpp/src")
+  # target_include_directories(radosgw PUBLIC Arrow::Arrow)
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
 target_compile_definitions(radosgw PUBLIC "-DCLS_CLIENT_HIDE_IOCTX")
 target_include_directories(radosgw
   PUBLIC "${CMAKE_SOURCE_DIR}/src/dmclock/support/src"
@@ -429,6 +445,13 @@ set(radosgw_admin_srcs
   rgw_admin.cc
   rgw_sync_checkpoint.cc
   rgw_orphan.cc)
+
+# this is unsatisfying and hopefully temporary; ARROW should not be
+# part of radosgw_admin
+if(WITH_RADOSGW_ARROW_FLIGHT)
+  list(APPEND radosgw_admin_srcs rgw_flight.cc)
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
 add_executable(radosgw-admin ${radosgw_admin_srcs})
 target_link_libraries(radosgw-admin ${rgw_libs} librados
   cls_rgw_client cls_otp_client cls_lock_client cls_refcount_client
@@ -437,6 +460,13 @@ target_link_libraries(radosgw-admin ${rgw_libs} librados
   global ${LIB_RESOLV}
   OATH::OATH
   ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${BLKID_LIBRARIES})
+
+# this is unsatisfying and hopefully temporary; ARROW should not be
+# part of radosgw_admin
+if(WITH_RADOSGW_ARROW_FLIGHT)
+  target_link_libraries(radosgw-admin ${ARROW_LIBRARIES} ${ARROW_FLIGHT_LIBRARIES})
+endif(WITH_RADOSGW_ARROW_FLIGHT)
+
 install(TARGETS radosgw-admin DESTINATION bin)
 
 set(radosgw_es_srcs
index 69672fa807d52b35e835b4201dc3e19143360507..77e29d5595458719f81806273d0b0d9ba5c4a0b1 100644 (file)
@@ -62,6 +62,9 @@
 #ifdef WITH_RADOSGW_KAFKA_ENDPOINT
 #include "rgw_kafka.h"
 #endif
+#ifdef WITH_ARROW_FLIGHT
+#include "rgw_flight_frontend.h"
+#endif
 #include "rgw_asio_frontend.h"
 #include "rgw_dmclock_scheduler_ctx.h"
 #include "rgw_lua.h"
@@ -433,6 +436,16 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib)
         rgwlib->set_fe(static_cast<RGWLibFrontend*>(fe));
       }
     }
+    else if (framework == "arrow_flight") {
+#ifdef WITH_ARROW_FLIGHT
+      int port;
+      config->get_val("port", 8077, &port);
+      fe = new rgw::flight::FlightFrontend(cct, config, store, port);
+#else
+      derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl;
+      continue;
+#endif
+    }
 
     service_map_meta["frontend_type#" + stringify(fe_count)] = framework;
     service_map_meta["frontend_config#" + stringify(fe_count)] = config->get_config();
diff --git a/src/rgw/rgw_flight.cc b/src/rgw/rgw_flight.cc
new file mode 100644 (file)
index 0000000..be0a17c
--- /dev/null
@@ -0,0 +1,131 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include <iostream>
+#include <mutex>
+#include <map>
+
+#include "arrow/type.h"
+#include "arrow/flight/server.h"
+
+#include "common/dout.h"
+#include "rgw_op.h"
+
+#include "rgw_flight.h"
+#include "rgw_flight_frontend.h"
+
+
+#define dout_subsys ceph_subsys_rgw_flight
+
+#define INFO_F(dp)   ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
+#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
+#define WARN_F(dp)   ldpp_dout(&dp,  0) << "WARNING: " << __func__ << ": "
+#define ERROR_F(dp)  ldpp_dout(&dp,  0) << "ERROR: " << __func__ << ": "
+
+#define INFO   INFO_F(dp)
+#define STATUS STATUS_F(dp)
+#define WARN   WARN_F(dp)
+#define ERROR  ERROR_F(dp)
+
+
+namespace rgw::flight {
+
+  std::atomic<FlightKey> next_flight_key = 0;
+
+  FlightData::FlightData(const req_state* state) :
+    key(next_flight_key++),
+    expires(coarse_real_clock::now() + lifespan)
+  {
+#if 0
+    bucket = new rgw::sal::Bucket(*state->bucket);
+#endif
+  }
+
+  FlightStore::~FlightStore() {
+    // empty
+  }
+
+  MemoryFlightStore::~MemoryFlightStore() {
+    // empty
+  }
+
+  FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
+    FlightKey key = flight.key;
+
+    auto p = map.insert( {key, flight} );
+    ceph_assertf(p.second,
+                "unable to add FlightData to MemoryFlightStore"); // temporary until error handling
+
+    return key;
+  }
+  // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; }
+  int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; }
+  int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; }
+  int MemoryFlightStore::expire_flights() { return 0; }
+
+  FlightServer::FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
+                            rgw::sal::Store* _store,
+                            FlightStore* _flight_store) :
+    cct(_cct),
+    dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
+    store(_store),
+    flight_store(_flight_store)
+  {
+    INFO << "FlightServer constructed" << dendl;
+  }
+
+  FlightServer::~FlightServer()
+  {
+    INFO << "FlightServer destructed" << dendl;
+  }
+
+
+class RGWFlightListing : public flt::FlightListing {
+public:
+
+  RGWFlightListing() {
+#if 0
+    const int64_t total_records = 2;
+    const int64_t total_bytes = 2048;
+    const std::vector<flt::FlightEndpoint> endpoints;
+    const auto descriptor = flt::FlightDescriptor::Command("fake-cmd");
+    arw::FieldVector fields;
+    const arw::Schema schema(fields, nullptr);
+    auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes);
+#endif
+  }
+
+  arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
+    *info = nullptr;
+    return arw::Status::OK();
+  }
+};
+
+
+  arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
+                                       const flt::Criteria* criteria,
+                                       std::unique_ptr<flt::FlightListing>* listings) {
+    *listings = std::make_unique<RGWFlightListing>();
+    return arw::Status::OK();
+  }
+
+  static FlightServer* fs;
+
+  void set_flight_server(FlightServer* _server) {
+    fs = _server;
+  }
+
+  FlightServer* get_flight_server() {
+    return fs;
+  }
+
+  FlightStore* get_flight_store() {
+    return fs ? fs->get_flight_store() : nullptr;
+  }
+
+  FlightKey propose_flight(const req_state* request) {
+    FlightKey key = get_flight_store()->add_flight(FlightData(request));
+    return key;
+  }
+
+} // namespace rgw::flight
diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h
new file mode 100644 (file)
index 0000000..1f4d372
--- /dev/null
@@ -0,0 +1,101 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <map>
+#include <mutex>
+#include <atomic>
+
+#include "include/common_fwd.h"
+#include "common/ceph_context.h"
+#include "common/Thread.h"
+#include "common/ceph_time.h"
+#include "rgw_frontend.h"
+#include "arrow/type.h"
+#include "arrow/flight/server.h"
+
+#include "rgw_flight_frontend.h"
+
+namespace arw = arrow;
+namespace flt = arrow::flight;
+
+struct req_state;
+
+namespace rgw::flight {
+
+  static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
+
+  struct FlightData {
+    FlightKey key;
+    coarse_real_clock::time_point expires;
+
+    rgw::sal::Bucket* bucket;
+#if 0
+    rgw::sal::Object object;
+    rgw::sal::User user;
+#endif
+
+
+    FlightData(const req_state* state);
+  };
+
+  // stores flights that have been created and helps expire them
+  class FlightStore {
+  public:
+    virtual ~FlightStore();
+    virtual FlightKey add_flight(FlightData&& flight) = 0;
+    virtual int get_flight(const FlightKey& key) = 0;
+    virtual int remove_flight(const FlightKey& key) = 0;
+    virtual int expire_flights() = 0;
+  };
+
+  class MemoryFlightStore : public FlightStore {
+    std::map<FlightKey, FlightData> map;
+
+  public:
+
+    virtual ~MemoryFlightStore();
+    FlightKey add_flight(FlightData&& flight) override;
+    int get_flight(const FlightKey& key) override;
+    int remove_flight(const FlightKey& key) override;
+    int expire_flights() override;
+  };
+
+  class FlightServer : public flt::FlightServerBase {
+
+    using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
+
+    boost::intrusive_ptr<ceph::common::CephContext> cct;
+    const DoutPrefix dp;
+    rgw::sal::Store* store;
+    FlightStore* flight_store;
+
+    std::map<std::string, Data1> data;
+
+  public:
+
+    static constexpr int default_port = 8077;
+
+    FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
+                rgw::sal::Store* _store,
+                FlightStore* _flight_store);
+    ~FlightServer() override;
+
+    FlightStore* get_flight_store() {
+      return flight_store;
+    }
+
+    arw::Status ListFlights(const flt::ServerCallContext& context,
+                           const flt::Criteria* criteria,
+                           std::unique_ptr<flt::FlightListing>* listings) override;
+  }; // class FlightServer
+
+  // GLOBAL
+
+  void set_flight_server(FlightServer* _server);
+  FlightServer* get_flight_server();
+  FlightStore* get_flight_store();
+  FlightKey propose_flight(const req_state* request);
+
+} // namespace rgw::flight
diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc
new file mode 100644 (file)
index 0000000..1751e25
--- /dev/null
@@ -0,0 +1,103 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+
+#include "arrow/type.h"
+#include "arrow/flight/server.h"
+
+#include "rgw_flight_frontend.h"
+#include "rgw_flight.h"
+
+#define dout_subsys ceph_subsys_rgw_flight
+
+namespace rgw::flight {
+
+  FlightFrontend::FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
+                                RGWFrontendConfig* _config,
+                                rgw::sal::Store* _store,
+                                int _port) :
+    cct(_cct),
+    dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
+    config(_config),
+    port(_port)
+  {
+    FlightStore* flight_store = new MemoryFlightStore();
+    flight_server = new FlightServer(_cct, _store, flight_store);
+  }
+
+  FlightFrontend::~FlightFrontend() {
+    delete flight_server;
+  }
+
+  int FlightFrontend::init() {
+    if (port <= 0) {
+      port = FlightServer::default_port;
+    }
+    const std::string url =
+      std::string("grpc+tcp://localhost:") + std::to_string(port);
+    flt::Location location;
+    arw::Status s = flt::Location::Parse(url, &location);
+    if (!s.ok()) {
+      return -EINVAL;
+    }
+
+    flt::FlightServerOptions options(location);
+    options.verify_client = false;
+    s = flight_server->Init(options);
+    if (!s.ok()) {
+      return -EINVAL;
+    }
+
+    dout(20) << "STATUS: " << __func__ <<
+      ": FlightServer inited; will use port " << port << dendl;
+    return 0;
+  }
+
+  int FlightFrontend::run() {
+    try {
+      flight_thread = make_named_thread(server_thread_name,
+                                       &FlightServer::Serve,
+                                       flight_server);
+      set_flight_server(flight_server);
+
+      dout(20) << "INFO: " << __func__ <<
+       ": FlightServer thread started, id=" << flight_thread.get_id() <<
+       ", joinable=" << flight_thread.joinable() << dendl;
+      return 0;
+    } catch (std::system_error& e) {
+      derr << "ERROR: " << __func__ <<
+       ": FlightServer thread failed to start" << dendl;
+      return -ENOSPC;
+    }
+  }
+
+  void FlightFrontend::stop() {
+    set_flight_server(nullptr);
+    flight_server->Shutdown();
+    flight_server->Wait();
+    dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl;
+  }
+
+  void FlightFrontend::join() {
+    flight_thread.join();
+    dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl;
+  }
+
+  void FlightFrontend::pause_for_new_config() {
+    // ignore since config changes won't alter flight_server
+  }
+
+  void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store,
+                              rgw_auth_registry_ptr_t auth_registry) {
+    // ignore since config changes won't alter flight_server
+  }
+
+  int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
+    // do work here
+    dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl;
+
+    // chain upwards
+    return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
+  }
+
+} // namespace rgw::flight
diff --git a/src/rgw/rgw_flight_frontend.h b/src/rgw/rgw_flight_frontend.h
new file mode 100644 (file)
index 0000000..c74878a
--- /dev/null
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include "include/common_fwd.h"
+#include "common/Thread.h"
+#include "rgw_frontend.h"
+#include "rgw_op.h"
+
+
+namespace rgw::flight {
+
+  using FlightKey = uint32_t;
+
+  class FlightServer;
+
+  class FlightFrontend : public RGWFrontend {
+
+    static constexpr std::string_view server_thread_name =
+      "Arrow Flight Server thread";
+
+    boost::intrusive_ptr<ceph::common::CephContext>& cct;
+    const DoutPrefix dp;
+    FlightServer* flight_server; // pointer so header file doesn't need to pull in too much
+    std::thread flight_thread;
+    RGWFrontendConfig* config;
+    int port;
+
+  public:
+
+    // port <= 0 -> let server decide; typically 8077
+    FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& cct,
+                  RGWFrontendConfig* config,
+                  rgw::sal::Store* store,
+                  int port = -1);
+    ~FlightFrontend() override;
+    int init() override;
+    int run() override;
+    void stop() override;
+    void join() override;
+
+    void pause_for_new_config() override;
+    void unpause_with_new_config(rgw::sal::Store* store,
+                                rgw_auth_registry_ptr_t auth_registry) override;
+
+  }; // class FlightFrontend
+
+  class FlightGetObj_Filter : public RGWGetObj_Filter {
+
+    FlightKey key;
+
+  public:
+
+    FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) :
+      RGWGetObj_Filter(next),
+      key(_key)
+    {
+      // empty
+    }
+
+    int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+#if 0
+    // this would allow the range to be modified if necessary;
+    int fixup_range(off_t& ofs, off_t& end) override;
+#endif
+  };
+
+} // namespace rgw::flight
index c847bfe29578e3828bf1b980c3201b9a7ca5d252..9c2388274454652d66961f299e6e81bd869a53f1 100644 (file)
 
 #include "compressor/Compressor.h"
 
+#ifdef WITH_ARROW_FLIGHT
+#include "rgw_flight.h"
+#include "rgw_flight_frontend.h"
+#endif
+
 #ifdef WITH_LTTNG
 #define TRACEPOINT_DEFINE
 #define TRACEPOINT_PROBE_DYNAMIC_LINKAGE
@@ -2150,6 +2155,9 @@ void RGWGetObj::execute(optional_yield y)
   RGWGetObj_CB cb(this);
   RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb;
   boost::optional<RGWGetObj_Decompress> decompress;
+#ifdef WITH_ARROW_FLIGHT
+  boost::optional<rgw::flight::FlightGetObj_Filter> flight_filter;
+#endif
   std::unique_ptr<RGWGetObj_Filter> decrypt;
   std::unique_ptr<RGWGetObj_Filter> run_lua;
   map<string, bufferlist>::iterator attr_iter;
@@ -2228,6 +2236,16 @@ void RGWGetObj::execute(optional_yield y)
     goto done_err;
   }
 
+#ifdef WITH_ARROW_FLIGHT
+  if (ofs == 0) {
+    rgw::flight::FlightKey key = rgw::flight::propose_flight(s);
+    ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl;
+
+    flight_filter.emplace(key, filter);
+    filter = &*flight_filter;
+  }
+#endif
+
   op_ret = rgw_compression_info_from_attrset(attrs, need_decompress, cs_info);
   if (op_ret < 0) {
     ldpp_dout(this, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;