]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement initial flight server functionality 48729/head
authorJ. Eric Ivancich <ivancich@redhat.com>
Tue, 28 Jun 2022 19:35:18 +0000 (15:35 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Thu, 12 Jan 2023 18:14:50 +0000 (13:14 -0500)
Implements the ability for a flight to be created when the object is
retrieved by an S3 get.

Adds FlightServer abilities ListFlights, GetFlightInfo, GetSchema, and
DoGet. Adds an interface for a store for flight information and adds
an in-memory implemtation of it.

This code is functionality is early-stage and lacks some planned
efficiencies.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/rgw/rgw_appmain.cc
src/rgw/rgw_flight.cc
src/rgw/rgw_flight.h
src/rgw/rgw_flight_frontend.cc
src/rgw/rgw_flight_frontend.h
src/rgw/rgw_op.cc
src/rgw/rgw_process_env.h

index 77e29d5595458719f81806273d0b0d9ba5c4a0b1..361f622b992a70428b99a0e130c1c846012b2c8f 100644 (file)
@@ -440,7 +440,7 @@ int rgw::AppMain::init_frontends2(RGWLib* rgwlib)
 #ifdef WITH_ARROW_FLIGHT
       int port;
       config->get_val("port", 8077, &port);
-      fe = new rgw::flight::FlightFrontend(cct, config, store, port);
+      fe = new rgw::flight::FlightFrontend(env, config, port);
 #else
       derr << "WARNING: arrow_flight frontend requested, but not included in build; skipping" << dendl;
       continue;
index be0a17c8f320058eec780deb9b0dca17977e10fd..2299b7412858f4bd59707024fa4d22830e461b89 100644 (file)
@@ -2,12 +2,22 @@
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 #include <iostream>
+#include <fstream>
 #include <mutex>
 #include <map>
+#include <algorithm>
 
 #include "arrow/type.h"
+#include "arrow/buffer.h"
+#include "arrow/util/string_view.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+
 #include "arrow/flight/server.h"
 
+#include "parquet/arrow/reader.h"
+
 #include "common/dout.h"
 #include "rgw_op.h"
 
 #include "rgw_flight_frontend.h"
 
 
-#define dout_subsys ceph_subsys_rgw_flight
+namespace rgw::flight {
 
-#define INFO_F(dp)   ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
-#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
-#define WARN_F(dp)   ldpp_dout(&dp,  0) << "WARNING: " << __func__ << ": "
-#define ERROR_F(dp)  ldpp_dout(&dp,  0) << "ERROR: " << __func__ << ": "
+// Ticket and FlightKey
 
-#define INFO   INFO_F(dp)
-#define STATUS STATUS_F(dp)
-#define WARN   WARN_F(dp)
-#define ERROR  ERROR_F(dp)
+std::atomic<FlightKey> next_flight_key = null_flight_key;
 
+flt::Ticket FlightKeyToTicket(const FlightKey& key) {
+  flt::Ticket result;
+  result.ticket = std::to_string(key);
+  return result;
+}
 
-namespace rgw::flight {
+arw::Result<FlightKey> TicketToFlightKey(const flt::Ticket& t) {
+  try {
+    return (FlightKey) std::stoul(t.ticket);
+  } catch (std::invalid_argument const& ex) {
+    return arw::Status::Invalid(
+      "could not convert Ticket containing \"%s\" into a Flight Key",
+      t.ticket);
+  } catch (const std::out_of_range& ex) {
+    return arw::Status::Invalid(
+      "could not convert Ticket containing \"%s\" into a Flight Key due to range",
+      t.ticket);
+  }
+}
+
+// FlightData
+
+FlightData::FlightData(const std::string& _uri,
+                      const std::string& _tenant_name,
+                      const std::string& _bucket_name,
+                      const rgw_obj_key& _object_key,
+                      uint64_t _num_records,
+                      uint64_t _obj_size,
+                      std::shared_ptr<arw::Schema>& _schema,
+                      std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
+                      rgw_user _user_id) :
+  key(++next_flight_key),
+  /* expires(coarse_real_clock::now() + lifespan), */
+  uri(_uri),
+  tenant_name(_tenant_name),
+  bucket_name(_bucket_name),
+  object_key(_object_key),
+  num_records(_num_records),
+  obj_size(_obj_size),
+  schema(_schema),
+  kv_metadata(_kv_metadata),
+  user_id(_user_id)
+{ }
+
+/**** FlightStore ****/
+
+FlightStore::FlightStore(const DoutPrefix& _dp) :
+  dp(_dp)
+{ }
+
+FlightStore::~FlightStore() { }
 
-  std::atomic<FlightKey> next_flight_key = 0;
+/**** MemoryFlightStore ****/
 
-  FlightData::FlightData(const req_state* state) :
-    key(next_flight_key++),
-    expires(coarse_real_clock::now() + lifespan)
+MemoryFlightStore::MemoryFlightStore(const DoutPrefix& _dp) :
+  FlightStore(_dp)
+{ }
+
+MemoryFlightStore::~MemoryFlightStore() { }
+
+FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
+  std::pair<decltype(map)::iterator,bool> result;
   {
-#if 0
-    bucket = new rgw::sal::Bucket(*state->bucket);
-#endif
+    const std::lock_guard lock(mtx);
+    result = map.insert( {flight.key, std::move(flight)} );
   }
+  ceph_assertf(result.second,
+              "unable to add FlightData to MemoryFlightStore"); // temporary until error handling
 
-  FlightStore::~FlightStore() {
-    // empty
+  return result.first->second.key;
+}
+
+arw::Result<FlightData> MemoryFlightStore::get_flight(const FlightKey& key) const {
+  const std::lock_guard lock(mtx);
+  auto i = map.find(key);
+  if (i == map.cend()) {
+    return arw::Status::KeyError("could not find Flight with Key %" PRIu32,
+                                key);
+  } else {
+    return i->second;
   }
+}
 
-  MemoryFlightStore::~MemoryFlightStore() {
-    // empty
+// returns either the next FilghtData or, if at end, empty optional
+std::optional<FlightData> MemoryFlightStore::after_key(const FlightKey& key) const {
+  std::optional<FlightData> result;
+  {
+    const std::lock_guard lock(mtx);
+    auto i = map.upper_bound(key);
+    if (i != map.end()) {
+      result = i->second;
+    }
   }
+  return result;
+}
+
+int MemoryFlightStore::remove_flight(const FlightKey& key) {
+  return 0;
+}
+
+int MemoryFlightStore::expire_flights() {
+  return 0;
+}
+
+/**** FlightServer ****/
+
+FlightServer::FlightServer(RGWProcessEnv& _env,
+                          FlightStore* _flight_store,
+                          const DoutPrefix& _dp) :
+  env(_env),
+  driver(env.driver),
+  dp(_dp),
+  flight_store(_flight_store)
+{ }
+
+FlightServer::~FlightServer()
+{ }
 
-  FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
-    FlightKey key = flight.key;
 
-    auto p = map.insert( {key, flight} );
-    ceph_assertf(p.second,
-                "unable to add FlightData to MemoryFlightStore"); // temporary until error handling
+arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
+                                     const flt::Criteria* criteria,
+                                     std::unique_ptr<flt::FlightListing>* listings) {
 
-    return key;
+  // function local class to implement FlightListing interface
+  class RGWFlightListing : public flt::FlightListing {
+
+    FlightStore* flight_store;
+    FlightKey previous_key;
+
+  public:
+
+    RGWFlightListing(FlightStore* flight_store) :
+      flight_store(flight_store),
+      previous_key(null_flight_key)
+      { }
+
+    arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
+      std::optional<FlightData> fd = flight_store->after_key(previous_key);
+      if (fd) {
+       previous_key = fd->key;
+       auto descriptor =
+         flt::FlightDescriptor::Path(
+           { fd->tenant_name, fd->bucket_name, fd->object_key.name, fd->object_key.instance, fd->object_key.ns });
+       flt::FlightEndpoint endpoint;
+       endpoint.ticket = FlightKeyToTicket(fd->key);
+       std::vector<flt::FlightEndpoint> endpoints { endpoint };
+
+       ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
+                             flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
+       *info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
+       return arw::Status::OK();
+      } else {
+       *info = nullptr;
+       return arw::Status::OK();
+      }
+    }
+  }; // class RGWFlightListing
+
+  *listings = std::make_unique<RGWFlightListing>(flight_store);
+  return arw::Status::OK();
+} // FlightServer::ListFlights
+
+
+arw::Status FlightServer::GetFlightInfo(const flt::ServerCallContext &context,
+                                       const flt::FlightDescriptor &request,
+                                       std::unique_ptr<flt::FlightInfo> *info) {
+  return arw::Status::OK();
+} // FlightServer::GetFlightInfo
+
+
+arw::Status FlightServer::GetSchema(const flt::ServerCallContext &context,
+                                   const flt::FlightDescriptor &request,
+                                   std::unique_ptr<flt::SchemaResult> *schema) {
+  return arw::Status::OK();
+} // FlightServer::GetSchema
+
+  // A Buffer that owns its memory and frees it when the Buffer is
+  // destructed
+class OwnedBuffer : public arw::Buffer {
+
+  uint8_t* buffer;
+
+protected:
+
+  OwnedBuffer(uint8_t* _buffer, int64_t _size) :
+    Buffer(_buffer, _size),
+    buffer(_buffer)
+    { }
+
+public:
+
+  ~OwnedBuffer() override {
+    delete[] buffer;
   }
-  // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; }
-  int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; }
-  int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; }
-  int MemoryFlightStore::expire_flights() { return 0; }
 
-  FlightServer::FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
-                            rgw::sal::Store* _store,
-                            FlightStore* _flight_store) :
-    cct(_cct),
-    dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
-    store(_store),
-    flight_store(_flight_store)
-  {
-    INFO << "FlightServer constructed" << dendl;
+  static arw::Result<std::shared_ptr<OwnedBuffer>> make(int64_t size) {
+    uint8_t* buffer = new (std::nothrow) uint8_t[size];
+    if (!buffer) {
+      return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
+    }
+
+    OwnedBuffer* ptr = new OwnedBuffer(buffer, size);
+    std::shared_ptr<OwnedBuffer> result;
+    result.reset(ptr);
+    return result;
   }
 
-  FlightServer::~FlightServer()
-  {
-    INFO << "FlightServer destructed" << dendl;
+  // if what's read in is less than capacity
+  void set_size(int64_t size) {
+    size_ = size;
   }
 
+  // pointer that can be used to write into buffer
+  uint8_t* writeable_data() {
+    return buffer;
+  }
+}; // class OwnedBuffer
+
+#if 0 // remove classes used for testing and incrementally building
+
+// make local to DoGet eventually
+class LocalInputStream : public arw::io::InputStream {
+
+  std::iostream::pos_type position;
+  std::fstream file;
+  std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
+  const DoutPrefix dp;
 
-class RGWFlightListing : public flt::FlightListing {
 public:
 
-  RGWFlightListing() {
-#if 0
-    const int64_t total_records = 2;
-    const int64_t total_bytes = 2048;
-    const std::vector<flt::FlightEndpoint> endpoints;
-    const auto descriptor = flt::FlightDescriptor::Command("fake-cmd");
-    arw::FieldVector fields;
-    const arw::Schema schema(fields, nullptr);
-    auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes);
-#endif
+  LocalInputStream(std::shared_ptr<const arw::KeyValueMetadata> _kv_metadata,
+                  const DoutPrefix _dp) :
+    kv_metadata(_kv_metadata),
+    dp(_dp)
+    {}
+
+  arw::Status Open() {
+    file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in);
+    if (!file.good()) {
+      return arw::Status::IOError("unable to open file");
+    }
+
+    INFO << "file opened successfully" << dendl;
+    position = file.tellg();
+    return arw::Status::OK();
   }
 
-  arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
-    *info = nullptr;
+  arw::Status Close() override {
+    file.close();
+    INFO << "file closed" << dendl;
     return arw::Status::OK();
   }
-};
 
+  arw::Result<int64_t> Tell() const override {
+    if (position < 0) {
+      return arw::Status::IOError(
+       "could not query file implementaiton with tellg");
+    } else {
+      return int64_t(position);
+    }
+  }
+
+  bool closed() const override {
+    return file.is_open();
+  }
+
+  arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+    if (file.read(reinterpret_cast<char*>(out),
+                 reinterpret_cast<std::streamsize>(nbytes))) {
+      const std::streamsize bytes_read = file.gcount();
+      INFO << "Point A: read bytes " << bytes_read << dendl;
+      position = file.tellg();
+      return bytes_read;
+    } else {
+      ERROR << "unable to read from file" << dendl;
+      return arw::Status::IOError("unable to read from offset %" PRId64,
+                                 int64_t(position));
+    }
+  }
+
+  arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
+    INFO << "entered: " << ": asking for " << nbytes << " bytes" << dendl;
+
+    std::shared_ptr<OwnedBuffer> buffer;
+    ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
 
-  arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
-                                       const flt::Criteria* criteria,
-                                       std::unique_ptr<flt::FlightListing>* listings) {
-    *listings = std::make_unique<RGWFlightListing>();
+    if (file.read(reinterpret_cast<char*>(buffer->writeable_data()),
+                 reinterpret_cast<std::streamsize>(nbytes))) {
+      const auto bytes_read = file.gcount();
+      INFO << "Point B: read bytes " << bytes_read << dendl;
+      // buffer->set_size(bytes_read);
+      position = file.tellg();
+      return buffer;
+    } else if (file.rdstate() & std::ifstream::failbit &&
+              file.rdstate() & std::ifstream::eofbit) {
+      const auto bytes_read = file.gcount();
+      INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl;
+      // buffer->set_size(bytes_read);
+      position = file.tellg();
+      return buffer;
+    } else {
+      ERROR << "unable to read from file" << dendl;
+      return arw::Status::IOError("unable to read from offset %ld", position);
+    }
+  }
+
+  arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+    INFO << "called, not implemented" << dendl;
+    return arw::Status::NotImplemented("peek not currently allowed");
+  }
+
+  bool supports_zero_copy() const override {
+    return false;
+  }
+
+  arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() override {
+    INFO << "called" << dendl;
+    return kv_metadata;
+  }
+}; // class LocalInputStream
+
+class LocalRandomAccessFile : public arw::io::RandomAccessFile {
+
+  FlightData flight_data;
+  const DoutPrefix dp;
+
+  std::iostream::pos_type position;
+  std::fstream file;
+
+public:
+  LocalRandomAccessFile(const FlightData& _flight_data, const DoutPrefix _dp) :
+    flight_data(_flight_data),
+    dp(_dp)
+    { }
+
+  // implement InputStream
+
+  arw::Status Open() {
+    file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in);
+    if (!file.good()) {
+      return arw::Status::IOError("unable to open file");
+    }
+
+    INFO << "file opened successfully" << dendl;
+    position = file.tellg();
     return arw::Status::OK();
   }
 
-  static FlightServer* fs;
+  arw::Status Close() override {
+    file.close();
+    INFO << "file closed" << dendl;
+    return arw::Status::OK();
+  }
 
-  void set_flight_server(FlightServer* _server) {
-    fs = _server;
+  arw::Result<int64_t> Tell() const override {
+    if (position < 0) {
+      return arw::Status::IOError(
+       "could not query file implementaiton with tellg");
+    } else {
+      return int64_t(position);
+    }
   }
 
-  FlightServer* get_flight_server() {
-    return fs;
+  bool closed() const override {
+    return file.is_open();
   }
 
-  FlightStore* get_flight_store() {
-    return fs ? fs->get_flight_store() : nullptr;
+  arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+    if (file.read(reinterpret_cast<char*>(out),
+                 reinterpret_cast<std::streamsize>(nbytes))) {
+      const std::streamsize bytes_read = file.gcount();
+      INFO << "Point A: read bytes " << bytes_read << dendl;
+      position = file.tellg();
+      return bytes_read;
+    } else {
+      ERROR << "unable to read from file" << dendl;
+      return arw::Status::IOError("unable to read from offset %" PRId64,
+                                 int64_t(position));
+    }
   }
 
-  FlightKey propose_flight(const req_state* request) {
-    FlightKey key = get_flight_store()->add_flight(FlightData(request));
-    return key;
+  arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
+    INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+
+    std::shared_ptr<OwnedBuffer> buffer;
+    ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
+
+    if (file.read(reinterpret_cast<char*>(buffer->writeable_data()),
+                 reinterpret_cast<std::streamsize>(nbytes))) {
+      const auto bytes_read = file.gcount();
+      INFO << "Point B: read bytes " << bytes_read << dendl;
+      // buffer->set_size(bytes_read);
+      position = file.tellg();
+      return buffer;
+    } else if (file.rdstate() & std::ifstream::failbit &&
+              file.rdstate() & std::ifstream::eofbit) {
+      const auto bytes_read = file.gcount();
+      INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl;
+      // buffer->set_size(bytes_read);
+      position = file.tellg();
+      return buffer;
+    } else {
+      ERROR << "unable to read from file" << dendl;
+      return arw::Status::IOError("unable to read from offset %ld", position);
+    }
+  }
+
+  bool supports_zero_copy() const override {
+    return false;
+  }
+
+  // implement Seekable
+
+  arw::Result<int64_t> GetSize() override {
+    return flight_data.obj_size;
   }
 
+  arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+    std::iostream::pos_type here = file.tellg();
+    if (here == -1) {
+      return arw::Status::IOError(
+       "unable to determine current position ahead of peek");
+    }
+
+    ARROW_ASSIGN_OR_RAISE(OwningStringView result,
+                         OwningStringView::make(nbytes));
+
+    // read
+    ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+                         Read(nbytes, (void*) result.writeable_data()));
+    (void) bytes_read; // silence unused variable warnings
+
+    // return offset to original
+    ARROW_RETURN_NOT_OK(Seek(here));
+
+    return result;
+  }
+
+  arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() {
+    return flight_data.kv_metadata;
+  }
+
+  arw::Future<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadataAsync(
+    const arw::io::IOContext& io_context) override {
+    return arw::Future<std::shared_ptr<const arw::KeyValueMetadata>>::MakeFinished(ReadMetadata());
+  }
+
+  // implement Seekable interface
+
+  arw::Status Seek(int64_t position) {
+    file.seekg(position);
+    if (file.fail()) {
+      return arw::Status::IOError(
+       "error encountered during seek to %" PRId64, position);
+    } else {
+      return arw::Status::OK();
+    }
+  }
+}; // class LocalRandomAccessFile
+#endif
+
+class RandomAccessObject : public arw::io::RandomAccessFile {
+
+  FlightData flight_data;
+  const DoutPrefix dp;
+
+  int64_t position;
+  bool is_closed;
+  std::unique_ptr<rgw::sal::Object::ReadOp> op;
+
+public:
+
+  RandomAccessObject(const FlightData& _flight_data,
+                    std::unique_ptr<rgw::sal::Object>& obj,
+                    const DoutPrefix _dp) :
+    flight_data(_flight_data),
+    dp(_dp),
+    position(-1),
+    is_closed(false)
+    {
+      op = obj->get_read_op();
+    }
+
+  arw::Status Open() {
+    int ret = op->prepare(null_yield, &dp);
+    if (ret < 0) {
+      return arw::Status::IOError(
+       "unable to prepare object with error %d", ret);
+    }
+    INFO << "file opened successfully" << dendl;
+    position = 0;
+    return arw::Status::OK();
+  }
+
+  // implement InputStream
+
+  arw::Status Close() override {
+    position = -1;
+    is_closed = true;
+    (void) op.reset();
+    INFO << "object closed" << dendl;
+    return arw::Status::OK();
+  }
+
+  arw::Result<int64_t> Tell() const override {
+    if (position < 0) {
+      return arw::Status::IOError("could not determine position");
+    } else {
+      return position;
+    }
+  }
+
+  bool closed() const override {
+    return is_closed;
+  }
+
+  arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
+    INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+
+    if (position < 0) {
+      ERROR << "error, position indicated error" << dendl;
+      return arw::Status::IOError("object read op is in bad state");
+    }
+
+    // note: read function reads through end_position inclusive
+    int64_t end_position = position + nbytes - 1;
+
+    bufferlist bl;
+
+    const int64_t bytes_read =
+      op->read(position, end_position, bl, null_yield, &dp);
+    if (bytes_read < 0) {
+      const int64_t former_position = position;
+      position = -1;
+      ERROR << "read operation returned " << bytes_read << dendl;
+      return arw::Status::IOError(
+       "unable to read object at position %" PRId64 ", error code: %" PRId64,
+       former_position,
+       bytes_read);
+    }
+
+    // TODO: see if there's a way to get rid of this copy, perhaps
+    // updating rgw::sal::read_op
+    bl.cbegin().copy(bytes_read, reinterpret_cast<char*>(out));
+
+    position += bytes_read;
+
+    if (nbytes != bytes_read) {
+      INFO << "partial read: nbytes=" << nbytes <<
+       ", bytes_read=" << bytes_read << dendl;
+    }
+    INFO << bytes_read << " bytes read" << dendl;
+    return bytes_read;
+  }
+
+  arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
+    INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+
+    std::shared_ptr<OwnedBuffer> buffer;
+    ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
+
+    ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read,
+                         Read(nbytes, buffer->writeable_data()));
+    buffer->set_size(bytes_read);
+
+    return buffer;
+  }
+
+  bool supports_zero_copy() const override {
+    return false;
+  }
+
+  // implement Seekable
+
+  arw::Result<int64_t> GetSize() override {
+    INFO << "entered: " << flight_data.obj_size << " returned" << dendl;
+    return flight_data.obj_size;
+  }
+
+  arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+    INFO << "entered: " << nbytes << " bytes" << dendl;
+
+    int64_t saved_position = position;
+
+    ARROW_ASSIGN_OR_RAISE(OwningStringView buffer,
+                         OwningStringView::make(nbytes));
+
+    ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read,
+                         Read(nbytes, (void*) buffer.writeable_data()));
+
+    // restore position for a peek
+    position = saved_position;
+
+    if (bytes_read < nbytes) {
+      // create new OwningStringView with moved buffer
+      return OwningStringView::shrink(std::move(buffer), bytes_read);
+    } else {
+      return buffer;
+    }
+  }
+
+  arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() {
+    return flight_data.kv_metadata;
+  }
+
+  arw::Future<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadataAsync(
+    const arw::io::IOContext& io_context) override {
+    return arw::Future<std::shared_ptr<const arw::KeyValueMetadata>>::MakeFinished(ReadMetadata());
+  }
+
+  // implement Seekable interface
+
+  arw::Status Seek(int64_t new_position) {
+    INFO << "entered: position: " << new_position << dendl;
+    if (position < 0) {
+      ERROR << "error, position indicated error" << dendl;
+      return arw::Status::IOError("object read op is in bad state");
+    } else {
+      position = new_position;
+      return arw::Status::OK();
+    }
+  }
+}; // class RandomAccessObject
+
+arw::Status FlightServer::DoGet(const flt::ServerCallContext &context,
+                               const flt::Ticket &request,
+                               std::unique_ptr<flt::FlightDataStream> *stream) {
+  int ret;
+
+  ARROW_ASSIGN_OR_RAISE(FlightKey key, TicketToFlightKey(request));
+  ARROW_ASSIGN_OR_RAISE(FlightData fd, get_flight_store()->get_flight(key));
+
+  std::unique_ptr<rgw::sal::User> user = driver->get_user(fd.user_id);
+  if (user->empty()) {
+    INFO << "user is empty" << dendl;
+  } else {
+    // TODO: test what happens if user is not loaded
+    ret = user->load_user(&dp, null_yield);
+    if (ret < 0) {
+      ERROR << "load_user returned " << ret << dendl;
+      // TODO return something
+    }
+    INFO << "user is " << user->get_display_name() << dendl;
+  }
+
+  std::unique_ptr<rgw::sal::Bucket> bucket;
+
+  ret = driver->get_bucket(&dp, &(*user), fd.tenant_name, fd.bucket_name,
+                          &bucket, null_yield);
+  if (ret < 0) {
+    ERROR << "get_bucket returned " << ret << dendl;
+    // TODO return something
+  }
+
+  std::unique_ptr<rgw::sal::Object> object = bucket->get_object(fd.object_key);
+
+  auto input = std::make_shared<RandomAccessObject>(fd, object, dp);
+  ARROW_RETURN_NOT_OK(input->Open());
+
+  std::unique_ptr<parquet::arrow::FileReader> reader;
+  ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input,
+                                              arw::default_memory_pool(),
+                                              &reader));
+
+  std::shared_ptr<arrow::Table> table;
+  ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
+
+  std::vector<std::shared_ptr<arw::RecordBatch>> batches;
+  arw::TableBatchReader batch_reader(*table);
+  ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
+
+  ARROW_ASSIGN_OR_RAISE(auto owning_reader,
+                       arw::RecordBatchReader::Make(
+                         std::move(batches), table->schema()));
+  *stream = std::unique_ptr<flt::FlightDataStream>(
+    new flt::RecordBatchStream(owning_reader));
+
+  return arw::Status::OK();
+} // flightServer::DoGet
+
 } // namespace rgw::flight
index 1f4d37213762ba152387e3758d64c32b9e98f521..8f6c4ade7b7837647a4d1592d02fe380836328fc 100644 (file)
 #include "rgw_frontend.h"
 #include "arrow/type.h"
 #include "arrow/flight/server.h"
+#include "arrow/util/string_view.h"
 
 #include "rgw_flight_frontend.h"
 
-namespace arw = arrow;
-namespace flt = arrow::flight;
-
-struct req_state;
 
-namespace rgw::flight {
+#define INFO_F(dp)   ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
+#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
+#define WARN_F(dp)   ldpp_dout(&dp,  0) << "WARNING: " << __func__ << ": "
+#define ERROR_F(dp)  ldpp_dout(&dp,  0) << "ERROR: " << __func__ << ": "
 
-  static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
+#define INFO   INFO_F(dp)
+#define STATUS STATUS_F(dp)
+#define WARN   WARN_F(dp)
+#define ERROR  ERROR_F(dp)
 
-  struct FlightData {
-    FlightKey key;
-    coarse_real_clock::time_point expires;
 
-    rgw::sal::Bucket* bucket;
-#if 0
-    rgw::sal::Object object;
-    rgw::sal::User user;
-#endif
+namespace arw = arrow;
+namespace flt = arrow::flight;
 
 
-    FlightData(const req_state* state);
-  };
+struct req_state;
 
-  // stores flights that have been created and helps expire them
-  class FlightStore {
-  public:
-    virtual ~FlightStore();
-    virtual FlightKey add_flight(FlightData&& flight) = 0;
-    virtual int get_flight(const FlightKey& key) = 0;
-    virtual int remove_flight(const FlightKey& key) = 0;
-    virtual int expire_flights() = 0;
-  };
+namespace rgw::flight {
 
-  class MemoryFlightStore : public FlightStore {
-    std::map<FlightKey, FlightData> map;
+static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
 
-  public:
+struct FlightData {
+  FlightKey key;
+  // coarse_real_clock::time_point expires;
+  std::string uri;
+  std::string tenant_name;
+  std::string bucket_name;
+  rgw_obj_key object_key;
+  // NB: what about object's namespace and instance?
+  uint64_t num_records;
+  uint64_t obj_size;
+  std::shared_ptr<arw::Schema> schema;
+  std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
 
-    virtual ~MemoryFlightStore();
-    FlightKey add_flight(FlightData&& flight) override;
-    int get_flight(const FlightKey& key) override;
-    int remove_flight(const FlightKey& key) override;
-    int expire_flights() override;
-  };
+  rgw_user user_id; // TODO: this should be removed when we do
+  // proper flight authentication
 
-  class FlightServer : public flt::FlightServerBase {
+  FlightData(const std::string& _uri,
+            const std::string& _tenant_name,
+            const std::string& _bucket_name,
+            const rgw_obj_key& _object_key,
+            uint64_t _num_records,
+            uint64_t _obj_size,
+            std::shared_ptr<arw::Schema>& _schema,
+            std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
+            rgw_user _user_id);
+};
+
+// stores flights that have been created and helps expire them
+class FlightStore {
+
+protected:
+
+  const DoutPrefix& dp;
+
+public:
+
+  FlightStore(const DoutPrefix& dp);
+  virtual ~FlightStore();
+  virtual FlightKey add_flight(FlightData&& flight) = 0;
+
+  // TODO consider returning const shared pointers to FlightData in
+  // the following two functions
+  virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0;
+  virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0;
+
+  virtual int remove_flight(const FlightKey& key) = 0;
+  virtual int expire_flights() = 0;
+};
+
+class MemoryFlightStore : public FlightStore {
+  std::map<FlightKey, FlightData> map;
+  mutable std::mutex mtx; // for map
+
+public:
+
+  MemoryFlightStore(const DoutPrefix& dp);
+  virtual ~MemoryFlightStore();
+  FlightKey add_flight(FlightData&& flight) override;
+  arw::Result<FlightData> get_flight(const FlightKey& key) const override;
+  std::optional<FlightData> after_key(const FlightKey& key) const override;
+  int remove_flight(const FlightKey& key) override;
+  int expire_flights() override;
+};
+
+class FlightServer : public flt::FlightServerBase {
+
+  using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
+
+  RGWProcessEnv& env;
+  rgw::sal::Driver* driver;
+  const DoutPrefix& dp;
+  FlightStore* flight_store;
+
+  std::map<std::string, Data1> data;
+
+public:
+
+  static constexpr int default_port = 8077;
+
+  FlightServer(RGWProcessEnv& env,
+              FlightStore* flight_store,
+              const DoutPrefix& dp);
+  ~FlightServer() override;
+
+  FlightStore* get_flight_store() {
+    return flight_store;
+  }
+
+  arw::Status ListFlights(const flt::ServerCallContext& context,
+                         const flt::Criteria* criteria,
+                         std::unique_ptr<flt::FlightListing>* listings) override;
+
+  arw::Status GetFlightInfo(const flt::ServerCallContext &context,
+                           const flt::FlightDescriptor &request,
+                           std::unique_ptr<flt::FlightInfo> *info) override;
+
+  arw::Status GetSchema(const flt::ServerCallContext &context,
+                       const flt::FlightDescriptor &request,
+                       std::unique_ptr<flt::SchemaResult> *schema) override;
+
+  arw::Status DoGet(const flt::ServerCallContext &context,
+                   const flt::Ticket &request,
+                   std::unique_ptr<flt::FlightDataStream> *stream) override;
+}; // class FlightServer
+
+class OwningStringView : public arw::util::string_view {
+
+  uint8_t* buffer;
+  int64_t capacity;
+  int64_t consumed;
 
-    using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
+  OwningStringView(uint8_t* _buffer, int64_t _size) :
+    arw::util::string_view((const char*) _buffer, _size),
+    buffer(_buffer),
+    capacity(_size),
+    consumed(_size)
+    { }
+
+  OwningStringView(OwningStringView&& from, int64_t new_size) :
+    buffer(nullptr),
+    capacity(from.capacity),
+    consumed(new_size)
+    {
+      // should be impossible due to static function check
+      ceph_assertf(consumed <= capacity, "new size cannot exceed capacity");
 
-    boost::intrusive_ptr<ceph::common::CephContext> cct;
-    const DoutPrefix dp;
-    rgw::sal::Store* store;
-    FlightStore* flight_store;
+      std::swap(buffer, from.buffer);
+      from.capacity = 0;
+      from.consumed = 0;
+    }
 
-    std::map<std::string, Data1> data;
+public:
 
-  public:
+  OwningStringView(OwningStringView&&) = default;
+  OwningStringView& operator=(OwningStringView&&) = default;
 
-    static constexpr int default_port = 8077;
+  uint8_t* writeable_data() {
+    return buffer;
+  }
 
-    FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
-                rgw::sal::Store* _store,
-                FlightStore* _flight_store);
-    ~FlightServer() override;
+  ~OwningStringView() {
+    if (buffer) {
+      delete[] buffer;
+    }
+  }
 
-    FlightStore* get_flight_store() {
-      return flight_store;
+  static arw::Result<OwningStringView> make(int64_t size) {
+    uint8_t* buffer = new uint8_t[size];
+    if (!buffer) {
+      return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
+    }
+    return OwningStringView(buffer, size);
+  }
+
+  static arw::Result<OwningStringView> shrink(OwningStringView&& from,
+                                             int64_t new_size) {
+    if (new_size > from.capacity) {
+      return arw::Status::Invalid("new size cannot exceed capacity");
+    } else {
+      return OwningStringView(std::move(from), new_size);
     }
+  }
 
-    arw::Status ListFlights(const flt::ServerCallContext& context,
-                           const flt::Criteria* criteria,
-                           std::unique_ptr<flt::FlightListing>* listings) override;
-  }; // class FlightServer
+};
 
-  // GLOBAL
+// GLOBAL
 
-  void set_flight_server(FlightServer* _server);
-  FlightServer* get_flight_server();
-  FlightStore* get_flight_store();
-  FlightKey propose_flight(const req_state* request);
+flt::Ticket FlightKeyToTicket(const FlightKey& key);
+arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key);
 
 } // namespace rgw::flight
index 1751e2515d33fe22981730e34617811e7655acae..3da0f5d15e72a31a36f4cf5ca2a6fd46e3ad012b 100644 (file)
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 
+#include <cstdio>
+#include <filesystem>
+#include <sstream>
+
 #include "arrow/type.h"
 #include "arrow/flight/server.h"
+#include "arrow/io/file.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/stream_reader.h"
 
 #include "rgw_flight_frontend.h"
 #include "rgw_flight.h"
 
-#define dout_subsys ceph_subsys_rgw_flight
+
+// logging
+constexpr unsigned dout_subsys = ceph_subsys_rgw_flight;
+constexpr const char* dout_prefix_str = "rgw arrow_flight: ";
+
 
 namespace rgw::flight {
 
-  FlightFrontend::FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
-                                RGWFrontendConfig* _config,
-                                rgw::sal::Store* _store,
-                                int _port) :
-    cct(_cct),
-    dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
-    config(_config),
-    port(_port)
-  {
-    FlightStore* flight_store = new MemoryFlightStore();
-    flight_server = new FlightServer(_cct, _store, flight_store);
+const FlightKey null_flight_key = 0;
+
+FlightFrontend::FlightFrontend(RGWProcessEnv& _env,
+                              RGWFrontendConfig* _config,
+                              int _port) :
+  env(_env),
+  config(_config),
+  port(_port),
+  dp(env.driver->ctx(), dout_subsys, dout_prefix_str)
+{
+  env.flight_store = new MemoryFlightStore(dp);
+  env.flight_server = new FlightServer(env, env.flight_store, dp);
+  INFO << "flight server started" << dendl;
+}
+
+FlightFrontend::~FlightFrontend() {
+  delete env.flight_server;
+  delete env.flight_store;
+  INFO << "flight server shut down" << dendl;
+}
+
+int FlightFrontend::init() {
+  if (port <= 0) {
+    port = FlightServer::default_port;
+  }
+  const std::string url =
+    std::string("grpc+tcp://localhost:") + std::to_string(port);
+  flt::Location location;
+  arw::Status s = flt::Location::Parse(url, &location);
+  if (!s.ok()) {
+    ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
+    return -EINVAL;
   }
 
-  FlightFrontend::~FlightFrontend() {
-    delete flight_server;
+  flt::FlightServerOptions options(location);
+  options.verify_client = false;
+  s = env.flight_server->Init(options);
+  if (!s.ok()) {
+    ERROR << "couldn't init flight server; status=" << s << dendl;
+    return -EINVAL;
   }
 
-  int FlightFrontend::init() {
-    if (port <= 0) {
-      port = FlightServer::default_port;
-    }
-    const std::string url =
-      std::string("grpc+tcp://localhost:") + std::to_string(port);
-    flt::Location location;
-    arw::Status s = flt::Location::Parse(url, &location);
-    if (!s.ok()) {
-      return -EINVAL;
-    }
-
-    flt::FlightServerOptions options(location);
-    options.verify_client = false;
-    s = flight_server->Init(options);
-    if (!s.ok()) {
-      return -EINVAL;
-    }
-
-    dout(20) << "STATUS: " << __func__ <<
-      ": FlightServer inited; will use port " << port << dendl;
+  INFO << "FlightServer inited; will use port " << port << dendl;
+  return 0;
+}
+
+int FlightFrontend::run() {
+  try {
+    flight_thread = make_named_thread(server_thread_name,
+                                     &FlightServer::Serve,
+                                     env.flight_server);
+
+    INFO << "FlightServer thread started, id=" <<
+      flight_thread.get_id() <<
+      ", joinable=" << flight_thread.joinable() << dendl;
     return 0;
+  } catch (std::system_error& e) {
+    ERROR << "FlightServer thread failed to start" << dendl;
+    return -e.code().value();
   }
-
-  int FlightFrontend::run() {
-    try {
-      flight_thread = make_named_thread(server_thread_name,
-                                       &FlightServer::Serve,
-                                       flight_server);
-      set_flight_server(flight_server);
-
-      dout(20) << "INFO: " << __func__ <<
-       ": FlightServer thread started, id=" << flight_thread.get_id() <<
-       ", joinable=" << flight_thread.joinable() << dendl;
-      return 0;
-    } catch (std::system_error& e) {
-      derr << "ERROR: " << __func__ <<
-       ": FlightServer thread failed to start" << dendl;
-      return -ENOSPC;
-    }
+}
+
+void FlightFrontend::stop() {
+  env.flight_server->Shutdown();
+  env.flight_server->Wait();
+  INFO << "FlightServer shut down" << dendl;
+}
+
+void FlightFrontend::join() {
+  flight_thread.join();
+  INFO << "FlightServer thread joined" << dendl;
+}
+
+void FlightFrontend::pause_for_new_config() {
+  // ignore since config changes won't alter flight_server
+}
+
+void FlightFrontend::unpause_with_new_config() {
+  // ignore since config changes won't alter flight_server
+}
+
+/* ************************************************************ */
+
+FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request,
+                                        RGWGetObj_Filter* next) :
+  RGWGetObj_Filter(next),
+  penv(request->penv),
+  dp(request->cct->get(), dout_subsys, dout_prefix_str),
+  current_offset(0),
+  expected_size(request->obj_size),
+  uri(request->decoded_uri),
+  tenant_name(request->bucket->get_tenant()),
+  bucket_name(request->bucket->get_name()),
+  object_key(request->object->get_key()),
+  // note: what about object namespace and instance?
+  schema_status(arrow::StatusCode::Cancelled,
+               "schema determination incomplete"),
+  user_id(request->user->get_id())
+{
+#warning "TODO: fix use of tmpnam"
+  char name[L_tmpnam];
+  const char* namep = std::tmpnam(name);
+  if (!namep) {
+    //
   }
+  temp_file_name = namep;
 
-  void FlightFrontend::stop() {
-    set_flight_server(nullptr);
-    flight_server->Shutdown();
-    flight_server->Wait();
-    dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl;
-  }
+  temp_file.open(temp_file_name);
+}
 
-  void FlightFrontend::join() {
-    flight_thread.join();
-    dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl;
+FlightGetObj_Filter::~FlightGetObj_Filter() {
+  if (temp_file.is_open()) {
+    temp_file.close();
   }
-
-  void FlightFrontend::pause_for_new_config() {
-    // ignore since config changes won't alter flight_server
+  std::error_code error;
+  std::filesystem::remove(temp_file_name, error);
+  if (error) {
+    ERROR << "FlightGetObj_Filter got error when removing temp file; "
+      "error=" << error.value() <<
+      ", temp_file_name=" << temp_file_name << dendl;
+  } else {
+    INFO << "parquet/arrow schema determination status: " <<
+      schema_status << dendl;
   }
-
-  void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store,
-                              rgw_auth_registry_ptr_t auth_registry) {
-    // ignore since config changes won't alter flight_server
+}
+
+int FlightGetObj_Filter::handle_data(bufferlist& bl,
+                                    off_t bl_ofs, off_t bl_len) {
+  INFO << "flight handling data from offset " <<
+    current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl;
+
+  current_offset += bl_len;
+
+  if (temp_file.is_open()) {
+    bl.write_stream(temp_file);
+
+    if (current_offset >= expected_size) {
+      INFO << "data read is completed, current_offset=" <<
+       current_offset << ", expected_size=" << expected_size << dendl;
+      temp_file.close();
+
+      std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
+      std::shared_ptr<arw::Schema> aw_schema;
+      int64_t num_rows = 0;
+
+      auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status {
+       ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::ReadableFile> file,
+                             arrow::io::ReadableFile::Open(temp_file_name));
+       const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
+
+       file->Close();
+
+       num_rows = metadata->num_rows();
+       kv_metadata = metadata->key_value_metadata();
+       const parquet::SchemaDescriptor* pq_schema = metadata->schema();
+       ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema));
+
+       return arrow::Status::OK();
+      };
+
+      schema_status = process_metadata();
+      if (!schema_status.ok()) {
+       ERROR << "reading metadata to access schema, error=" << schema_status << dendl;
+      } else {
+       // INFO << "arrow_schema=" << *aw_schema << dendl;
+       FlightStore* store = penv.flight_store;
+       auto key =
+         store->add_flight(FlightData(uri, tenant_name, bucket_name,
+                                      object_key, num_rows,
+                                      expected_size, aw_schema,
+                                      kv_metadata, user_id));
+       (void) key; // suppress unused variable warning
+      }
+    } // if last block
+  } // if file opened
+
+    // chain to next filter in stream
+  int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
+
+  return ret;
+}
+
+#if 0
+void code_snippets() {
+  INFO << "num_columns:" << md->num_columns() <<
+    " num_schema_elements:" << md->num_schema_elements() <<
+    " num_rows:" << md->num_rows() <<
+    " num_row_groups:" << md->num_row_groups() << dendl;
+
+
+  INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl;
+  for (int c = 0; c < schema1->num_columns(); ++c) {
+    const parquet::ColumnDescriptor* cd = schema1->Column(c);
+    // const parquet::ConvertedType::type t = cd->converted_type;
+    const std::shared_ptr<const parquet::LogicalType> lt = cd->logical_type();
+    INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl;
   }
 
-  int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
-    // do work here
-    dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl;
-
-    // chain upwards
-    return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
+  INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl;
+  for (int rg = 0; rg < md->num_row_groups(); ++rg) {
+    INFO << "Row Group " << rg << dendl;
+    auto rg_md = md->RowGroup(rg);
+    auto schema2 = rg_md->schema();
   }
+}
+#endif
 
 } // namespace rgw::flight
index c74878a884b4f32b9b312b828cafc7696484a81d..b820ca22b0699811c910a36abd591f5586e25a13 100644 (file)
@@ -8,62 +8,71 @@
 #include "rgw_frontend.h"
 #include "rgw_op.h"
 
+#include "arrow/status.h"
+
 
 namespace rgw::flight {
 
-  using FlightKey = uint32_t;
+using FlightKey = uint32_t;
+extern const FlightKey null_flight_key;
 
-  class FlightServer;
+class FlightServer;
 
-  class FlightFrontend : public RGWFrontend {
+class FlightFrontend : public RGWFrontend {
 
-    static constexpr std::string_view server_thread_name =
-      "Arrow Flight Server thread";
+  static constexpr std::string_view server_thread_name =
+    "Arrow Flight Server thread";
 
-    boost::intrusive_ptr<ceph::common::CephContext>& cct;
-    const DoutPrefix dp;
-    FlightServer* flight_server; // pointer so header file doesn't need to pull in too much
-    std::thread flight_thread;
-    RGWFrontendConfig* config;
-    int port;
+  RGWProcessEnv& env;
+  std::thread flight_thread;
+  RGWFrontendConfig* config;
+  int port;
 
-  public:
+  const DoutPrefix dp;
 
-    // port <= 0 -> let server decide; typically 8077
-    FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& cct,
-                  RGWFrontendConfig* config,
-                  rgw::sal::Store* store,
-                  int port = -1);
-    ~FlightFrontend() override;
-    int init() override;
-    int run() override;
-    void stop() override;
-    void join() override;
+public:
 
-    void pause_for_new_config() override;
-    void unpause_with_new_config(rgw::sal::Store* store,
-                                rgw_auth_registry_ptr_t auth_registry) override;
+  // port <= 0 means let server decide; typically 8077
+  FlightFrontend(RGWProcessEnv& env,
+                RGWFrontendConfig* config,
+                int port = -1);
+  ~FlightFrontend() override;
+  int init() override;
+  int run() override;
+  void stop() override;
+  void join() override;
 
-  }; // class FlightFrontend
+  void pause_for_new_config() override;
+  void unpause_with_new_config() override;
+}; // class FlightFrontend
 
-  class FlightGetObj_Filter : public RGWGetObj_Filter {
+class FlightGetObj_Filter : public RGWGetObj_Filter {
 
-    FlightKey key;
+  const RGWProcessEnv& penv;
+  const DoutPrefix dp;
+  FlightKey key;
+  uint64_t current_offset;
+  uint64_t expected_size;
+  std::string uri;
+  std::string tenant_name;
+  std::string bucket_name;
+  rgw_obj_key object_key;
+  std::string temp_file_name;
+  std::ofstream temp_file;
+  arrow::Status schema_status;
+  rgw_user user_id; // TODO: this should be removed when we do
+  // proper flight authentication
 
-  public:
+public:
 
-    FlightGetObj_Filter(const FlightKey& _key, RGWGetObj_Filter* next) :
-      RGWGetObj_Filter(next),
-      key(_key)
-    {
-      // empty
-    }
+  FlightGetObj_Filter(const req_state* request, RGWGetObj_Filter* next);
+  ~FlightGetObj_Filter();
 
-    int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
+  int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override;
 #if 0
-    // this would allow the range to be modified if necessary;
-    int fixup_range(off_t& ofs, off_t& end) override;
+  // this would allow the range to be modified if necessary;
+  int fixup_range(off_t& ofs, off_t& end) override;
 #endif
-  };
+};
 
 } // namespace rgw::flight
index 9c2388274454652d66961f299e6e81bd869a53f1..e5589ae15089b431c439f08e89828e059c81645a 100644 (file)
@@ -2238,10 +2238,8 @@ void RGWGetObj::execute(optional_yield y)
 
 #ifdef WITH_ARROW_FLIGHT
   if (ofs == 0) {
-    rgw::flight::FlightKey key = rgw::flight::propose_flight(s);
-    ldpp_dout(this, 0) << "ERIC: added arrow flight with key=" << key << dendl;
-
-    flight_filter.emplace(key, filter);
+    // insert a GetObj_Filter to monitor and create flight
+    flight_filter.emplace(s, filter);
     filter = &*flight_filter;
   }
 #endif
index 7e62b6afcd491b48a6108bb5e3ea3fe9cd2b15ce..3193ff15121f5fad1b1977f9b3f8430b8160ec5e 100644 (file)
@@ -20,6 +20,13 @@ namespace rgw::sal {
   class LuaManager;
 }
 
+#ifdef WITH_ARROW_FLIGHT
+namespace rgw::flight {
+  class FlightServer;
+  class FlightStore;
+}
+#endif
+
 struct RGWLuaProcessEnv {
   std::string luarocks_path;
   rgw::lua::Background* background = nullptr;
@@ -33,4 +40,11 @@ struct RGWProcessEnv {
   OpsLogSink *olog = nullptr;
   std::unique_ptr<rgw::auth::StrategyRegistry> auth_registry;
   ActiveRateLimiter* ratelimiting = nullptr;
+
+#ifdef WITH_ARROW_FLIGHT
+  // managed by rgw:flight::FlightFrontend in rgw_flight_frontend.cc
+  rgw::flight::FlightServer* flight_server;
+  rgw::flight::FlightStore* flight_store;
+#endif
 };