]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
Update RGW Arrow Flight code to adjust to API changes in Apache Arrow.
authorJ. Eric Ivancich <ivancich@redhat.com>
Fri, 29 Mar 2024 18:38:04 +0000 (14:38 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Fri, 3 May 2024 18:55:47 +0000 (14:55 -0400)
Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/rgw/rgw_flight.cc
src/rgw/rgw_flight.h
src/rgw/rgw_flight_frontend.cc

index 955edcced89f0a8b9a13201eb8cc26e74ba78138..4aaaa4626894e9d7cb55f18fe958bc81cb50d077 100644 (file)
@@ -17,7 +17,6 @@
 
 #include "arrow/type.h"
 #include "arrow/buffer.h"
-#include "arrow/util/string_view.h"
 #include "arrow/io/interfaces.h"
 #include "arrow/ipc/reader.h"
 #include "arrow/table.h"
@@ -175,7 +174,7 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
       previous_key(null_flight_key)
       { }
 
-    arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
+    arrow::Result<std::unique_ptr<flt::FlightInfo>> Next() override {
       std::optional<FlightData> fd = flight_store->after_key(previous_key);
       if (fd) {
        previous_key = fd->key;
@@ -188,11 +187,9 @@ arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
 
        ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
                              flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
-       *info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
-       return arw::Status::OK();
+       return std::make_unique<flt::FlightInfo>(std::move(info_obj));
       } else {
-       *info = nullptr;
-       return arw::Status::OK();
+       return nullptr;
       }
     }
   }; // class RGWFlightListing
@@ -346,7 +343,7 @@ public:
     }
   }
 
-  arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+  arw::Result<std::string_view> Peek(int64_t nbytes) override {
     INFO << "called, not implemented" << dendl;
     return arw::Status::NotImplemented("peek not currently allowed");
   }
@@ -458,7 +455,7 @@ public:
     return flight_data.obj_size;
   }
 
-  arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+  arw::Result<std::string_view> Peek(int64_t nbytes) override {
     std::iostream::pos_type here = file.tellg();
     if (here == -1) {
       return arw::Status::IOError(
@@ -620,7 +617,7 @@ public:
     return flight_data.obj_size;
   }
 
-  arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+  arw::Result<std::string_view> Peek(int64_t nbytes) override {
     INFO << "entered: " << nbytes << " bytes" << dendl;
 
     int64_t saved_position = position;
@@ -716,7 +713,14 @@ arw::Status FlightServer::DoGet(const flt::ServerCallContext &context,
 
   std::vector<std::shared_ptr<arw::RecordBatch>> batches;
   arw::TableBatchReader batch_reader(*table);
-  ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
+  while (true) {
+    std::shared_ptr<arw::RecordBatch> p;
+    auto s = batch_reader.ReadNext(&p);
+    if (!s.ok()) {
+      break;
+    }
+    batches.push_back(p);
+  }
 
   ARROW_ASSIGN_OR_RAISE(auto owning_reader,
                        arw::RecordBatchReader::Make(
index bb0a987d0a19e7b351f9611cff9c8558a5552b42..d2f65d9a5b5b7d21abd67bd20192c675d46a234e 100644 (file)
@@ -22,7 +22,6 @@
 #include "rgw_frontend.h"
 #include "arrow/type.h"
 #include "arrow/flight/server.h"
-#include "arrow/util/string_view.h"
 
 #include "rgw_flight_frontend.h"
 
@@ -122,6 +121,7 @@ class FlightServer : public flt::FlightServerBase {
   FlightStore* flight_store;
 
   std::map<std::string, Data1> data;
+  arw::Status serve_return_value;
 
 public:
 
@@ -132,6 +132,12 @@ public:
               const DoutPrefix& dp);
   ~FlightServer() override;
 
+  // provides a version of Serve that has no return value, to avoid
+  // warnings when launching in a thread
+  void ServeAlt() {
+    serve_return_value = Serve();
+  }
+
   FlightStore* get_flight_store() {
     return flight_store;
   }
@@ -153,14 +159,14 @@ public:
                    std::unique_ptr<flt::FlightDataStream> *stream) override;
 }; // class FlightServer
 
-class OwningStringView : public arw::util::string_view {
+class OwningStringView : public std::string_view {
 
   uint8_t* buffer;
   int64_t capacity;
   int64_t consumed;
 
   OwningStringView(uint8_t* _buffer, int64_t _size) :
-    arw::util::string_view((const char*) _buffer, _size),
+    std::string_view((const char*) _buffer, _size),
     buffer(_buffer),
     capacity(_size),
     consumed(_size)
index c29703fe5137525ba69c297e4c7ec00ab735dbb0..a673dbe3afbdf588882cf6ebb98cef205fbfe282 100644 (file)
@@ -63,16 +63,16 @@ int FlightFrontend::init() {
   }
   const std::string url =
     std::string("grpc+tcp://localhost:") + std::to_string(port);
-  flt::Location location;
-  arw::Status s = flt::Location::Parse(url, &location);
-  if (!s.ok()) {
-    ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
+  auto r = flt::Location::Parse(url);
+  if (!r.ok()) {
+    ERROR << "could not parse server uri: " << url << dendl;
     return -EINVAL;
   }
+  flt::Location location = *r;
 
   flt::FlightServerOptions options(location);
   options.verify_client = false;
-  s = env.flight_server->Init(options);
+  auto s = env.flight_server->Init(options);
   if (!s.ok()) {
     ERROR << "couldn't init flight server; status=" << s << dendl;
     return -EINVAL;
@@ -85,7 +85,7 @@ int FlightFrontend::init() {
 int FlightFrontend::run() {
   try {
     flight_thread = make_named_thread(server_thread_name,
-                                     &FlightServer::Serve,
+                                     &FlightServer::ServeAlt,
                                      env.flight_server);
 
     INFO << "FlightServer thread started, id=" <<
@@ -99,8 +99,19 @@ int FlightFrontend::run() {
 }
 
 void FlightFrontend::stop() {
-  env.flight_server->Shutdown();
-  env.flight_server->Wait();
+  arw::Status s;
+  s = env.flight_server->Shutdown();
+  if (!s.ok()) {
+    ERROR << "call to Shutdown failed; status=" << s << dendl;
+    return;
+  }
+
+  s = env.flight_server->Wait();
+  if (!s.ok()) {
+    ERROR << "call to Wait failed; status=" << s << dendl;
+    return;
+  }
+
   INFO << "FlightServer shut down" << dendl;
 }
 
@@ -186,7 +197,7 @@ int FlightGetObj_Filter::handle_data(bufferlist& bl,
                              arrow::io::ReadableFile::Open(temp_file_name));
        const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
 
-       file->Close();
+       ARROW_RETURN_NOT_OK(file->Close());
 
        num_rows = metadata->num_rows();
        kv_metadata = metadata->key_value_metadata();