// vim: ts=8 sw=2 smarttab ft=cpp
#include <iostream>
+#include <fstream>
#include <mutex>
#include <map>
+#include <algorithm>
#include "arrow/type.h"
+#include "arrow/buffer.h"
+#include "arrow/util/string_view.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+
#include "arrow/flight/server.h"
+#include "parquet/arrow/reader.h"
+
#include "common/dout.h"
#include "rgw_op.h"
#include "rgw_flight_frontend.h"
-#define dout_subsys ceph_subsys_rgw_flight
+namespace rgw::flight {
-#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
-#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
-#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": "
-#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": "
+// Ticket and FlightKey
-#define INFO INFO_F(dp)
-#define STATUS STATUS_F(dp)
-#define WARN WARN_F(dp)
-#define ERROR ERROR_F(dp)
+std::atomic<FlightKey> next_flight_key = null_flight_key;
+flt::Ticket FlightKeyToTicket(const FlightKey& key) {
+ flt::Ticket result;
+ result.ticket = std::to_string(key);
+ return result;
+}
-namespace rgw::flight {
+arw::Result<FlightKey> TicketToFlightKey(const flt::Ticket& t) {
+ try {
+ return (FlightKey) std::stoul(t.ticket);
+ } catch (std::invalid_argument const& ex) {
+ return arw::Status::Invalid(
+ "could not convert Ticket containing \"%s\" into a Flight Key",
+ t.ticket);
+ } catch (const std::out_of_range& ex) {
+ return arw::Status::Invalid(
+ "could not convert Ticket containing \"%s\" into a Flight Key due to range",
+ t.ticket);
+ }
+}
+
+// FlightData
+
+FlightData::FlightData(const std::string& _uri,
+ const std::string& _tenant_name,
+ const std::string& _bucket_name,
+ const rgw_obj_key& _object_key,
+ uint64_t _num_records,
+ uint64_t _obj_size,
+ std::shared_ptr<arw::Schema>& _schema,
+ std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
+ rgw_user _user_id) :
+ key(++next_flight_key),
+ /* expires(coarse_real_clock::now() + lifespan), */
+ uri(_uri),
+ tenant_name(_tenant_name),
+ bucket_name(_bucket_name),
+ object_key(_object_key),
+ num_records(_num_records),
+ obj_size(_obj_size),
+ schema(_schema),
+ kv_metadata(_kv_metadata),
+ user_id(_user_id)
+{ }
+
+/**** FlightStore ****/
+
+FlightStore::FlightStore(const DoutPrefix& _dp) :
+ dp(_dp)
+{ }
+
+FlightStore::~FlightStore() { }
- std::atomic<FlightKey> next_flight_key = 0;
+/**** MemoryFlightStore ****/
- FlightData::FlightData(const req_state* state) :
- key(next_flight_key++),
- expires(coarse_real_clock::now() + lifespan)
+MemoryFlightStore::MemoryFlightStore(const DoutPrefix& _dp) :
+ FlightStore(_dp)
+{ }
+
+MemoryFlightStore::~MemoryFlightStore() { }
+
+FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
+ std::pair<decltype(map)::iterator,bool> result;
{
-#if 0
- bucket = new rgw::sal::Bucket(*state->bucket);
-#endif
+ const std::lock_guard lock(mtx);
+ result = map.insert( {flight.key, std::move(flight)} );
}
+ ceph_assertf(result.second,
+ "unable to add FlightData to MemoryFlightStore"); // temporary until error handling
- FlightStore::~FlightStore() {
- // empty
+ return result.first->second.key;
+}
+
+arw::Result<FlightData> MemoryFlightStore::get_flight(const FlightKey& key) const {
+ const std::lock_guard lock(mtx);
+ auto i = map.find(key);
+ if (i == map.cend()) {
+ return arw::Status::KeyError("could not find Flight with Key %" PRIu32,
+ key);
+ } else {
+ return i->second;
}
+}
- MemoryFlightStore::~MemoryFlightStore() {
- // empty
+// returns either the next FilghtData or, if at end, empty optional
+std::optional<FlightData> MemoryFlightStore::after_key(const FlightKey& key) const {
+ std::optional<FlightData> result;
+ {
+ const std::lock_guard lock(mtx);
+ auto i = map.upper_bound(key);
+ if (i != map.end()) {
+ result = i->second;
+ }
}
+ return result;
+}
+
+int MemoryFlightStore::remove_flight(const FlightKey& key) {
+ return 0;
+}
+
+int MemoryFlightStore::expire_flights() {
+ return 0;
+}
+
+/**** FlightServer ****/
+
+FlightServer::FlightServer(RGWProcessEnv& _env,
+ FlightStore* _flight_store,
+ const DoutPrefix& _dp) :
+ env(_env),
+ driver(env.driver),
+ dp(_dp),
+ flight_store(_flight_store)
+{ }
+
+FlightServer::~FlightServer()
+{ }
- FlightKey MemoryFlightStore::add_flight(FlightData&& flight) {
- FlightKey key = flight.key;
- auto p = map.insert( {key, flight} );
- ceph_assertf(p.second,
- "unable to add FlightData to MemoryFlightStore"); // temporary until error handling
+arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
+ const flt::Criteria* criteria,
+ std::unique_ptr<flt::FlightListing>* listings) {
- return key;
+ // function local class to implement FlightListing interface
+ class RGWFlightListing : public flt::FlightListing {
+
+ FlightStore* flight_store;
+ FlightKey previous_key;
+
+ public:
+
+ RGWFlightListing(FlightStore* flight_store) :
+ flight_store(flight_store),
+ previous_key(null_flight_key)
+ { }
+
+ arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
+ std::optional<FlightData> fd = flight_store->after_key(previous_key);
+ if (fd) {
+ previous_key = fd->key;
+ auto descriptor =
+ flt::FlightDescriptor::Path(
+ { fd->tenant_name, fd->bucket_name, fd->object_key.name, fd->object_key.instance, fd->object_key.ns });
+ flt::FlightEndpoint endpoint;
+ endpoint.ticket = FlightKeyToTicket(fd->key);
+ std::vector<flt::FlightEndpoint> endpoints { endpoint };
+
+ ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
+ flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
+ *info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
+ return arw::Status::OK();
+ } else {
+ *info = nullptr;
+ return arw::Status::OK();
+ }
+ }
+ }; // class RGWFlightListing
+
+ *listings = std::make_unique<RGWFlightListing>(flight_store);
+ return arw::Status::OK();
+} // FlightServer::ListFlights
+
+
+arw::Status FlightServer::GetFlightInfo(const flt::ServerCallContext &context,
+ const flt::FlightDescriptor &request,
+ std::unique_ptr<flt::FlightInfo> *info) {
+ return arw::Status::OK();
+} // FlightServer::GetFlightInfo
+
+
+arw::Status FlightServer::GetSchema(const flt::ServerCallContext &context,
+ const flt::FlightDescriptor &request,
+ std::unique_ptr<flt::SchemaResult> *schema) {
+ return arw::Status::OK();
+} // FlightServer::GetSchema
+
+ // A Buffer that owns its memory and frees it when the Buffer is
+ // destructed
+class OwnedBuffer : public arw::Buffer {
+
+ uint8_t* buffer;
+
+protected:
+
+ OwnedBuffer(uint8_t* _buffer, int64_t _size) :
+ Buffer(_buffer, _size),
+ buffer(_buffer)
+ { }
+
+public:
+
+ ~OwnedBuffer() override {
+ delete[] buffer;
}
- // int MemoryFlightStore::add_flight(const FlightKey& key) { return 0; }
- int MemoryFlightStore::get_flight(const FlightKey& key) { return 0; }
- int MemoryFlightStore::remove_flight(const FlightKey& key) { return 0; }
- int MemoryFlightStore::expire_flights() { return 0; }
- FlightServer::FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
- rgw::sal::Store* _store,
- FlightStore* _flight_store) :
- cct(_cct),
- dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
- store(_store),
- flight_store(_flight_store)
- {
- INFO << "FlightServer constructed" << dendl;
+ static arw::Result<std::shared_ptr<OwnedBuffer>> make(int64_t size) {
+ uint8_t* buffer = new (std::nothrow) uint8_t[size];
+ if (!buffer) {
+ return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
+ }
+
+ OwnedBuffer* ptr = new OwnedBuffer(buffer, size);
+ std::shared_ptr<OwnedBuffer> result;
+ result.reset(ptr);
+ return result;
}
- FlightServer::~FlightServer()
- {
- INFO << "FlightServer destructed" << dendl;
+ // if what's read in is less than capacity
+ void set_size(int64_t size) {
+ size_ = size;
}
+ // pointer that can be used to write into buffer
+ uint8_t* writeable_data() {
+ return buffer;
+ }
+}; // class OwnedBuffer
+
+#if 0 // remove classes used for testing and incrementally building
+
+// make local to DoGet eventually
+class LocalInputStream : public arw::io::InputStream {
+
+ std::iostream::pos_type position;
+ std::fstream file;
+ std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
+ const DoutPrefix dp;
-class RGWFlightListing : public flt::FlightListing {
public:
- RGWFlightListing() {
-#if 0
- const int64_t total_records = 2;
- const int64_t total_bytes = 2048;
- const std::vector<flt::FlightEndpoint> endpoints;
- const auto descriptor = flt::FlightDescriptor::Command("fake-cmd");
- arw::FieldVector fields;
- const arw::Schema schema(fields, nullptr);
- auto info1 = flt::FlightInfo::Make(schema, descriptor, endpoints, total_records, total_bytes);
-#endif
+ LocalInputStream(std::shared_ptr<const arw::KeyValueMetadata> _kv_metadata,
+ const DoutPrefix _dp) :
+ kv_metadata(_kv_metadata),
+ dp(_dp)
+ {}
+
+ arw::Status Open() {
+ file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in);
+ if (!file.good()) {
+ return arw::Status::IOError("unable to open file");
+ }
+
+ INFO << "file opened successfully" << dendl;
+ position = file.tellg();
+ return arw::Status::OK();
}
- arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
- *info = nullptr;
+ arw::Status Close() override {
+ file.close();
+ INFO << "file closed" << dendl;
return arw::Status::OK();
}
-};
+ arw::Result<int64_t> Tell() const override {
+ if (position < 0) {
+ return arw::Status::IOError(
+ "could not query file implementaiton with tellg");
+ } else {
+ return int64_t(position);
+ }
+ }
+
+ bool closed() const override {
+ return file.is_open();
+ }
+
+ arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+ if (file.read(reinterpret_cast<char*>(out),
+ reinterpret_cast<std::streamsize>(nbytes))) {
+ const std::streamsize bytes_read = file.gcount();
+ INFO << "Point A: read bytes " << bytes_read << dendl;
+ position = file.tellg();
+ return bytes_read;
+ } else {
+ ERROR << "unable to read from file" << dendl;
+ return arw::Status::IOError("unable to read from offset %" PRId64,
+ int64_t(position));
+ }
+ }
+
+ arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
+ INFO << "entered: " << ": asking for " << nbytes << " bytes" << dendl;
+
+ std::shared_ptr<OwnedBuffer> buffer;
+ ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
- arw::Status FlightServer::ListFlights(const flt::ServerCallContext& context,
- const flt::Criteria* criteria,
- std::unique_ptr<flt::FlightListing>* listings) {
- *listings = std::make_unique<RGWFlightListing>();
+ if (file.read(reinterpret_cast<char*>(buffer->writeable_data()),
+ reinterpret_cast<std::streamsize>(nbytes))) {
+ const auto bytes_read = file.gcount();
+ INFO << "Point B: read bytes " << bytes_read << dendl;
+ // buffer->set_size(bytes_read);
+ position = file.tellg();
+ return buffer;
+ } else if (file.rdstate() & std::ifstream::failbit &&
+ file.rdstate() & std::ifstream::eofbit) {
+ const auto bytes_read = file.gcount();
+ INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl;
+ // buffer->set_size(bytes_read);
+ position = file.tellg();
+ return buffer;
+ } else {
+ ERROR << "unable to read from file" << dendl;
+ return arw::Status::IOError("unable to read from offset %ld", position);
+ }
+ }
+
+ arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+ INFO << "called, not implemented" << dendl;
+ return arw::Status::NotImplemented("peek not currently allowed");
+ }
+
+ bool supports_zero_copy() const override {
+ return false;
+ }
+
+ arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() override {
+ INFO << "called" << dendl;
+ return kv_metadata;
+ }
+}; // class LocalInputStream
+
+class LocalRandomAccessFile : public arw::io::RandomAccessFile {
+
+ FlightData flight_data;
+ const DoutPrefix dp;
+
+ std::iostream::pos_type position;
+ std::fstream file;
+
+public:
+ LocalRandomAccessFile(const FlightData& _flight_data, const DoutPrefix _dp) :
+ flight_data(_flight_data),
+ dp(_dp)
+ { }
+
+ // implement InputStream
+
+ arw::Status Open() {
+ file.open("/tmp/green_tripdata_2022-04.parquet", std::ios::in);
+ if (!file.good()) {
+ return arw::Status::IOError("unable to open file");
+ }
+
+ INFO << "file opened successfully" << dendl;
+ position = file.tellg();
return arw::Status::OK();
}
- static FlightServer* fs;
+ arw::Status Close() override {
+ file.close();
+ INFO << "file closed" << dendl;
+ return arw::Status::OK();
+ }
- void set_flight_server(FlightServer* _server) {
- fs = _server;
+ arw::Result<int64_t> Tell() const override {
+ if (position < 0) {
+ return arw::Status::IOError(
+ "could not query file implementaiton with tellg");
+ } else {
+ return int64_t(position);
+ }
}
- FlightServer* get_flight_server() {
- return fs;
+ bool closed() const override {
+ return file.is_open();
}
- FlightStore* get_flight_store() {
- return fs ? fs->get_flight_store() : nullptr;
+ arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+ if (file.read(reinterpret_cast<char*>(out),
+ reinterpret_cast<std::streamsize>(nbytes))) {
+ const std::streamsize bytes_read = file.gcount();
+ INFO << "Point A: read bytes " << bytes_read << dendl;
+ position = file.tellg();
+ return bytes_read;
+ } else {
+ ERROR << "unable to read from file" << dendl;
+ return arw::Status::IOError("unable to read from offset %" PRId64,
+ int64_t(position));
+ }
}
- FlightKey propose_flight(const req_state* request) {
- FlightKey key = get_flight_store()->add_flight(FlightData(request));
- return key;
+ arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
+ INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+
+ std::shared_ptr<OwnedBuffer> buffer;
+ ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
+
+ if (file.read(reinterpret_cast<char*>(buffer->writeable_data()),
+ reinterpret_cast<std::streamsize>(nbytes))) {
+ const auto bytes_read = file.gcount();
+ INFO << "Point B: read bytes " << bytes_read << dendl;
+ // buffer->set_size(bytes_read);
+ position = file.tellg();
+ return buffer;
+ } else if (file.rdstate() & std::ifstream::failbit &&
+ file.rdstate() & std::ifstream::eofbit) {
+ const auto bytes_read = file.gcount();
+ INFO << "3 read bytes " << bytes_read << " and reached EOF" << dendl;
+ // buffer->set_size(bytes_read);
+ position = file.tellg();
+ return buffer;
+ } else {
+ ERROR << "unable to read from file" << dendl;
+ return arw::Status::IOError("unable to read from offset %ld", position);
+ }
+ }
+
+ bool supports_zero_copy() const override {
+ return false;
+ }
+
+ // implement Seekable
+
+ arw::Result<int64_t> GetSize() override {
+ return flight_data.obj_size;
}
+ arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+ std::iostream::pos_type here = file.tellg();
+ if (here == -1) {
+ return arw::Status::IOError(
+ "unable to determine current position ahead of peek");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(OwningStringView result,
+ OwningStringView::make(nbytes));
+
+ // read
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ Read(nbytes, (void*) result.writeable_data()));
+ (void) bytes_read; // silence unused variable warnings
+
+ // return offset to original
+ ARROW_RETURN_NOT_OK(Seek(here));
+
+ return result;
+ }
+
+ arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() {
+ return flight_data.kv_metadata;
+ }
+
+ arw::Future<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadataAsync(
+ const arw::io::IOContext& io_context) override {
+ return arw::Future<std::shared_ptr<const arw::KeyValueMetadata>>::MakeFinished(ReadMetadata());
+ }
+
+ // implement Seekable interface
+
+ arw::Status Seek(int64_t position) {
+ file.seekg(position);
+ if (file.fail()) {
+ return arw::Status::IOError(
+ "error encountered during seek to %" PRId64, position);
+ } else {
+ return arw::Status::OK();
+ }
+ }
+}; // class LocalRandomAccessFile
+#endif
+
+class RandomAccessObject : public arw::io::RandomAccessFile {
+
+ FlightData flight_data;
+ const DoutPrefix dp;
+
+ int64_t position;
+ bool is_closed;
+ std::unique_ptr<rgw::sal::Object::ReadOp> op;
+
+public:
+
+ RandomAccessObject(const FlightData& _flight_data,
+ std::unique_ptr<rgw::sal::Object>& obj,
+ const DoutPrefix _dp) :
+ flight_data(_flight_data),
+ dp(_dp),
+ position(-1),
+ is_closed(false)
+ {
+ op = obj->get_read_op();
+ }
+
+ arw::Status Open() {
+ int ret = op->prepare(null_yield, &dp);
+ if (ret < 0) {
+ return arw::Status::IOError(
+ "unable to prepare object with error %d", ret);
+ }
+ INFO << "file opened successfully" << dendl;
+ position = 0;
+ return arw::Status::OK();
+ }
+
+ // implement InputStream
+
+ arw::Status Close() override {
+ position = -1;
+ is_closed = true;
+ (void) op.reset();
+ INFO << "object closed" << dendl;
+ return arw::Status::OK();
+ }
+
+ arw::Result<int64_t> Tell() const override {
+ if (position < 0) {
+ return arw::Status::IOError("could not determine position");
+ } else {
+ return position;
+ }
+ }
+
+ bool closed() const override {
+ return is_closed;
+ }
+
+ arw::Result<int64_t> Read(int64_t nbytes, void* out) override {
+ INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+
+ if (position < 0) {
+ ERROR << "error, position indicated error" << dendl;
+ return arw::Status::IOError("object read op is in bad state");
+ }
+
+ // note: read function reads through end_position inclusive
+ int64_t end_position = position + nbytes - 1;
+
+ bufferlist bl;
+
+ const int64_t bytes_read =
+ op->read(position, end_position, bl, null_yield, &dp);
+ if (bytes_read < 0) {
+ const int64_t former_position = position;
+ position = -1;
+ ERROR << "read operation returned " << bytes_read << dendl;
+ return arw::Status::IOError(
+ "unable to read object at position %" PRId64 ", error code: %" PRId64,
+ former_position,
+ bytes_read);
+ }
+
+ // TODO: see if there's a way to get rid of this copy, perhaps
+ // updating rgw::sal::read_op
+ bl.cbegin().copy(bytes_read, reinterpret_cast<char*>(out));
+
+ position += bytes_read;
+
+ if (nbytes != bytes_read) {
+ INFO << "partial read: nbytes=" << nbytes <<
+ ", bytes_read=" << bytes_read << dendl;
+ }
+ INFO << bytes_read << " bytes read" << dendl;
+ return bytes_read;
+ }
+
+ arw::Result<std::shared_ptr<arw::Buffer>> Read(int64_t nbytes) override {
+ INFO << "entered: asking for " << nbytes << " bytes" << dendl;
+
+ std::shared_ptr<OwnedBuffer> buffer;
+ ARROW_ASSIGN_OR_RAISE(buffer, OwnedBuffer::make(nbytes));
+
+ ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read,
+ Read(nbytes, buffer->writeable_data()));
+ buffer->set_size(bytes_read);
+
+ return buffer;
+ }
+
+ bool supports_zero_copy() const override {
+ return false;
+ }
+
+ // implement Seekable
+
+ arw::Result<int64_t> GetSize() override {
+ INFO << "entered: " << flight_data.obj_size << " returned" << dendl;
+ return flight_data.obj_size;
+ }
+
+ arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+ INFO << "entered: " << nbytes << " bytes" << dendl;
+
+ int64_t saved_position = position;
+
+ ARROW_ASSIGN_OR_RAISE(OwningStringView buffer,
+ OwningStringView::make(nbytes));
+
+ ARROW_ASSIGN_OR_RAISE(const int64_t bytes_read,
+ Read(nbytes, (void*) buffer.writeable_data()));
+
+ // restore position for a peek
+ position = saved_position;
+
+ if (bytes_read < nbytes) {
+ // create new OwningStringView with moved buffer
+ return OwningStringView::shrink(std::move(buffer), bytes_read);
+ } else {
+ return buffer;
+ }
+ }
+
+ arw::Result<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadata() {
+ return flight_data.kv_metadata;
+ }
+
+ arw::Future<std::shared_ptr<const arw::KeyValueMetadata>> ReadMetadataAsync(
+ const arw::io::IOContext& io_context) override {
+ return arw::Future<std::shared_ptr<const arw::KeyValueMetadata>>::MakeFinished(ReadMetadata());
+ }
+
+ // implement Seekable interface
+
+ arw::Status Seek(int64_t new_position) {
+ INFO << "entered: position: " << new_position << dendl;
+ if (position < 0) {
+ ERROR << "error, position indicated error" << dendl;
+ return arw::Status::IOError("object read op is in bad state");
+ } else {
+ position = new_position;
+ return arw::Status::OK();
+ }
+ }
+}; // class RandomAccessObject
+
+arw::Status FlightServer::DoGet(const flt::ServerCallContext &context,
+ const flt::Ticket &request,
+ std::unique_ptr<flt::FlightDataStream> *stream) {
+ int ret;
+
+ ARROW_ASSIGN_OR_RAISE(FlightKey key, TicketToFlightKey(request));
+ ARROW_ASSIGN_OR_RAISE(FlightData fd, get_flight_store()->get_flight(key));
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(fd.user_id);
+ if (user->empty()) {
+ INFO << "user is empty" << dendl;
+ } else {
+ // TODO: test what happens if user is not loaded
+ ret = user->load_user(&dp, null_yield);
+ if (ret < 0) {
+ ERROR << "load_user returned " << ret << dendl;
+ // TODO return something
+ }
+ INFO << "user is " << user->get_display_name() << dendl;
+ }
+
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+
+ ret = driver->get_bucket(&dp, &(*user), fd.tenant_name, fd.bucket_name,
+ &bucket, null_yield);
+ if (ret < 0) {
+ ERROR << "get_bucket returned " << ret << dendl;
+ // TODO return something
+ }
+
+ std::unique_ptr<rgw::sal::Object> object = bucket->get_object(fd.object_key);
+
+ auto input = std::make_shared<RandomAccessObject>(fd, object, dp);
+ ARROW_RETURN_NOT_OK(input->Open());
+
+ std::unique_ptr<parquet::arrow::FileReader> reader;
+ ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input,
+ arw::default_memory_pool(),
+ &reader));
+
+ std::shared_ptr<arrow::Table> table;
+ ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
+
+ std::vector<std::shared_ptr<arw::RecordBatch>> batches;
+ arw::TableBatchReader batch_reader(*table);
+ ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
+
+ ARROW_ASSIGN_OR_RAISE(auto owning_reader,
+ arw::RecordBatchReader::Make(
+ std::move(batches), table->schema()));
+ *stream = std::unique_ptr<flt::FlightDataStream>(
+ new flt::RecordBatchStream(owning_reader));
+
+ return arw::Status::OK();
+} // flightServer::DoGet
+
} // namespace rgw::flight
#include "rgw_frontend.h"
#include "arrow/type.h"
#include "arrow/flight/server.h"
+#include "arrow/util/string_view.h"
#include "rgw_flight_frontend.h"
-namespace arw = arrow;
-namespace flt = arrow::flight;
-
-struct req_state;
-namespace rgw::flight {
+#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
+#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
+#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": "
+#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": "
- static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
+#define INFO INFO_F(dp)
+#define STATUS STATUS_F(dp)
+#define WARN WARN_F(dp)
+#define ERROR ERROR_F(dp)
- struct FlightData {
- FlightKey key;
- coarse_real_clock::time_point expires;
- rgw::sal::Bucket* bucket;
-#if 0
- rgw::sal::Object object;
- rgw::sal::User user;
-#endif
+namespace arw = arrow;
+namespace flt = arrow::flight;
- FlightData(const req_state* state);
- };
+struct req_state;
- // stores flights that have been created and helps expire them
- class FlightStore {
- public:
- virtual ~FlightStore();
- virtual FlightKey add_flight(FlightData&& flight) = 0;
- virtual int get_flight(const FlightKey& key) = 0;
- virtual int remove_flight(const FlightKey& key) = 0;
- virtual int expire_flights() = 0;
- };
+namespace rgw::flight {
- class MemoryFlightStore : public FlightStore {
- std::map<FlightKey, FlightData> map;
+static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
- public:
+struct FlightData {
+ FlightKey key;
+ // coarse_real_clock::time_point expires;
+ std::string uri;
+ std::string tenant_name;
+ std::string bucket_name;
+ rgw_obj_key object_key;
+ // NB: what about object's namespace and instance?
+ uint64_t num_records;
+ uint64_t obj_size;
+ std::shared_ptr<arw::Schema> schema;
+ std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
- virtual ~MemoryFlightStore();
- FlightKey add_flight(FlightData&& flight) override;
- int get_flight(const FlightKey& key) override;
- int remove_flight(const FlightKey& key) override;
- int expire_flights() override;
- };
+ rgw_user user_id; // TODO: this should be removed when we do
+ // proper flight authentication
- class FlightServer : public flt::FlightServerBase {
+ FlightData(const std::string& _uri,
+ const std::string& _tenant_name,
+ const std::string& _bucket_name,
+ const rgw_obj_key& _object_key,
+ uint64_t _num_records,
+ uint64_t _obj_size,
+ std::shared_ptr<arw::Schema>& _schema,
+ std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
+ rgw_user _user_id);
+};
+
+// stores flights that have been created and helps expire them
+class FlightStore {
+
+protected:
+
+ const DoutPrefix& dp;
+
+public:
+
+ FlightStore(const DoutPrefix& dp);
+ virtual ~FlightStore();
+ virtual FlightKey add_flight(FlightData&& flight) = 0;
+
+ // TODO consider returning const shared pointers to FlightData in
+ // the following two functions
+ virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0;
+ virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0;
+
+ virtual int remove_flight(const FlightKey& key) = 0;
+ virtual int expire_flights() = 0;
+};
+
+class MemoryFlightStore : public FlightStore {
+ std::map<FlightKey, FlightData> map;
+ mutable std::mutex mtx; // for map
+
+public:
+
+ MemoryFlightStore(const DoutPrefix& dp);
+ virtual ~MemoryFlightStore();
+ FlightKey add_flight(FlightData&& flight) override;
+ arw::Result<FlightData> get_flight(const FlightKey& key) const override;
+ std::optional<FlightData> after_key(const FlightKey& key) const override;
+ int remove_flight(const FlightKey& key) override;
+ int expire_flights() override;
+};
+
+class FlightServer : public flt::FlightServerBase {
+
+ using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
+
+ RGWProcessEnv& env;
+ rgw::sal::Driver* driver;
+ const DoutPrefix& dp;
+ FlightStore* flight_store;
+
+ std::map<std::string, Data1> data;
+
+public:
+
+ static constexpr int default_port = 8077;
+
+ FlightServer(RGWProcessEnv& env,
+ FlightStore* flight_store,
+ const DoutPrefix& dp);
+ ~FlightServer() override;
+
+ FlightStore* get_flight_store() {
+ return flight_store;
+ }
+
+ arw::Status ListFlights(const flt::ServerCallContext& context,
+ const flt::Criteria* criteria,
+ std::unique_ptr<flt::FlightListing>* listings) override;
+
+ arw::Status GetFlightInfo(const flt::ServerCallContext &context,
+ const flt::FlightDescriptor &request,
+ std::unique_ptr<flt::FlightInfo> *info) override;
+
+ arw::Status GetSchema(const flt::ServerCallContext &context,
+ const flt::FlightDescriptor &request,
+ std::unique_ptr<flt::SchemaResult> *schema) override;
+
+ arw::Status DoGet(const flt::ServerCallContext &context,
+ const flt::Ticket &request,
+ std::unique_ptr<flt::FlightDataStream> *stream) override;
+}; // class FlightServer
+
+class OwningStringView : public arw::util::string_view {
+
+ uint8_t* buffer;
+ int64_t capacity;
+ int64_t consumed;
- using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
+ OwningStringView(uint8_t* _buffer, int64_t _size) :
+ arw::util::string_view((const char*) _buffer, _size),
+ buffer(_buffer),
+ capacity(_size),
+ consumed(_size)
+ { }
+
+ OwningStringView(OwningStringView&& from, int64_t new_size) :
+ buffer(nullptr),
+ capacity(from.capacity),
+ consumed(new_size)
+ {
+ // should be impossible due to static function check
+ ceph_assertf(consumed <= capacity, "new size cannot exceed capacity");
- boost::intrusive_ptr<ceph::common::CephContext> cct;
- const DoutPrefix dp;
- rgw::sal::Store* store;
- FlightStore* flight_store;
+ std::swap(buffer, from.buffer);
+ from.capacity = 0;
+ from.consumed = 0;
+ }
- std::map<std::string, Data1> data;
+public:
- public:
+ OwningStringView(OwningStringView&&) = default;
+ OwningStringView& operator=(OwningStringView&&) = default;
- static constexpr int default_port = 8077;
+ uint8_t* writeable_data() {
+ return buffer;
+ }
- FlightServer(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
- rgw::sal::Store* _store,
- FlightStore* _flight_store);
- ~FlightServer() override;
+ ~OwningStringView() {
+ if (buffer) {
+ delete[] buffer;
+ }
+ }
- FlightStore* get_flight_store() {
- return flight_store;
+ static arw::Result<OwningStringView> make(int64_t size) {
+ uint8_t* buffer = new uint8_t[size];
+ if (!buffer) {
+ return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
+ }
+ return OwningStringView(buffer, size);
+ }
+
+ static arw::Result<OwningStringView> shrink(OwningStringView&& from,
+ int64_t new_size) {
+ if (new_size > from.capacity) {
+ return arw::Status::Invalid("new size cannot exceed capacity");
+ } else {
+ return OwningStringView(std::move(from), new_size);
}
+ }
- arw::Status ListFlights(const flt::ServerCallContext& context,
- const flt::Criteria* criteria,
- std::unique_ptr<flt::FlightListing>* listings) override;
- }; // class FlightServer
+};
- // GLOBAL
+// GLOBAL
- void set_flight_server(FlightServer* _server);
- FlightServer* get_flight_server();
- FlightStore* get_flight_store();
- FlightKey propose_flight(const req_state* request);
+flt::Ticket FlightKeyToTicket(const FlightKey& key);
+arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key);
} // namespace rgw::flight
// vim: ts=8 sw=2 smarttab ft=cpp
+#include <cstdio>
+#include <filesystem>
+#include <sstream>
+
#include "arrow/type.h"
#include "arrow/flight/server.h"
+#include "arrow/io/file.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/stream_reader.h"
#include "rgw_flight_frontend.h"
#include "rgw_flight.h"
-#define dout_subsys ceph_subsys_rgw_flight
+
+// logging
+constexpr unsigned dout_subsys = ceph_subsys_rgw_flight;
+constexpr const char* dout_prefix_str = "rgw arrow_flight: ";
+
namespace rgw::flight {
- FlightFrontend::FlightFrontend(boost::intrusive_ptr<ceph::common::CephContext>& _cct,
- RGWFrontendConfig* _config,
- rgw::sal::Store* _store,
- int _port) :
- cct(_cct),
- dp(cct.get(), dout_subsys, "rgw arrow_flight: "),
- config(_config),
- port(_port)
- {
- FlightStore* flight_store = new MemoryFlightStore();
- flight_server = new FlightServer(_cct, _store, flight_store);
+const FlightKey null_flight_key = 0;
+
+FlightFrontend::FlightFrontend(RGWProcessEnv& _env,
+ RGWFrontendConfig* _config,
+ int _port) :
+ env(_env),
+ config(_config),
+ port(_port),
+ dp(env.driver->ctx(), dout_subsys, dout_prefix_str)
+{
+ env.flight_store = new MemoryFlightStore(dp);
+ env.flight_server = new FlightServer(env, env.flight_store, dp);
+ INFO << "flight server started" << dendl;
+}
+
+FlightFrontend::~FlightFrontend() {
+ delete env.flight_server;
+ delete env.flight_store;
+ INFO << "flight server shut down" << dendl;
+}
+
+int FlightFrontend::init() {
+ if (port <= 0) {
+ port = FlightServer::default_port;
+ }
+ const std::string url =
+ std::string("grpc+tcp://localhost:") + std::to_string(port);
+ flt::Location location;
+ arw::Status s = flt::Location::Parse(url, &location);
+ if (!s.ok()) {
+ ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
+ return -EINVAL;
}
- FlightFrontend::~FlightFrontend() {
- delete flight_server;
+ flt::FlightServerOptions options(location);
+ options.verify_client = false;
+ s = env.flight_server->Init(options);
+ if (!s.ok()) {
+ ERROR << "couldn't init flight server; status=" << s << dendl;
+ return -EINVAL;
}
- int FlightFrontend::init() {
- if (port <= 0) {
- port = FlightServer::default_port;
- }
- const std::string url =
- std::string("grpc+tcp://localhost:") + std::to_string(port);
- flt::Location location;
- arw::Status s = flt::Location::Parse(url, &location);
- if (!s.ok()) {
- return -EINVAL;
- }
-
- flt::FlightServerOptions options(location);
- options.verify_client = false;
- s = flight_server->Init(options);
- if (!s.ok()) {
- return -EINVAL;
- }
-
- dout(20) << "STATUS: " << __func__ <<
- ": FlightServer inited; will use port " << port << dendl;
+ INFO << "FlightServer inited; will use port " << port << dendl;
+ return 0;
+}
+
+int FlightFrontend::run() {
+ try {
+ flight_thread = make_named_thread(server_thread_name,
+ &FlightServer::Serve,
+ env.flight_server);
+
+ INFO << "FlightServer thread started, id=" <<
+ flight_thread.get_id() <<
+ ", joinable=" << flight_thread.joinable() << dendl;
return 0;
+ } catch (std::system_error& e) {
+ ERROR << "FlightServer thread failed to start" << dendl;
+ return -e.code().value();
}
-
- int FlightFrontend::run() {
- try {
- flight_thread = make_named_thread(server_thread_name,
- &FlightServer::Serve,
- flight_server);
- set_flight_server(flight_server);
-
- dout(20) << "INFO: " << __func__ <<
- ": FlightServer thread started, id=" << flight_thread.get_id() <<
- ", joinable=" << flight_thread.joinable() << dendl;
- return 0;
- } catch (std::system_error& e) {
- derr << "ERROR: " << __func__ <<
- ": FlightServer thread failed to start" << dendl;
- return -ENOSPC;
- }
+}
+
+void FlightFrontend::stop() {
+ env.flight_server->Shutdown();
+ env.flight_server->Wait();
+ INFO << "FlightServer shut down" << dendl;
+}
+
+void FlightFrontend::join() {
+ flight_thread.join();
+ INFO << "FlightServer thread joined" << dendl;
+}
+
+void FlightFrontend::pause_for_new_config() {
+ // ignore since config changes won't alter flight_server
+}
+
+void FlightFrontend::unpause_with_new_config() {
+ // ignore since config changes won't alter flight_server
+}
+
+/* ************************************************************ */
+
+FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request,
+ RGWGetObj_Filter* next) :
+ RGWGetObj_Filter(next),
+ penv(request->penv),
+ dp(request->cct->get(), dout_subsys, dout_prefix_str),
+ current_offset(0),
+ expected_size(request->obj_size),
+ uri(request->decoded_uri),
+ tenant_name(request->bucket->get_tenant()),
+ bucket_name(request->bucket->get_name()),
+ object_key(request->object->get_key()),
+ // note: what about object namespace and instance?
+ schema_status(arrow::StatusCode::Cancelled,
+ "schema determination incomplete"),
+ user_id(request->user->get_id())
+{
+#warning "TODO: fix use of tmpnam"
+ char name[L_tmpnam];
+ const char* namep = std::tmpnam(name);
+ if (!namep) {
+ //
}
+ temp_file_name = namep;
- void FlightFrontend::stop() {
- set_flight_server(nullptr);
- flight_server->Shutdown();
- flight_server->Wait();
- dout(20) << "INFO: " << __func__ << ": FlightServer shut down" << dendl;
- }
+ temp_file.open(temp_file_name);
+}
- void FlightFrontend::join() {
- flight_thread.join();
- dout(20) << "INFO: " << __func__ << ": FlightServer thread joined" << dendl;
+FlightGetObj_Filter::~FlightGetObj_Filter() {
+ if (temp_file.is_open()) {
+ temp_file.close();
}
-
- void FlightFrontend::pause_for_new_config() {
- // ignore since config changes won't alter flight_server
+ std::error_code error;
+ std::filesystem::remove(temp_file_name, error);
+ if (error) {
+ ERROR << "FlightGetObj_Filter got error when removing temp file; "
+ "error=" << error.value() <<
+ ", temp_file_name=" << temp_file_name << dendl;
+ } else {
+ INFO << "parquet/arrow schema determination status: " <<
+ schema_status << dendl;
}
-
- void FlightFrontend::unpause_with_new_config(rgw::sal::Store* store,
- rgw_auth_registry_ptr_t auth_registry) {
- // ignore since config changes won't alter flight_server
+}
+
+int FlightGetObj_Filter::handle_data(bufferlist& bl,
+ off_t bl_ofs, off_t bl_len) {
+ INFO << "flight handling data from offset " <<
+ current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl;
+
+ current_offset += bl_len;
+
+ if (temp_file.is_open()) {
+ bl.write_stream(temp_file);
+
+ if (current_offset >= expected_size) {
+ INFO << "data read is completed, current_offset=" <<
+ current_offset << ", expected_size=" << expected_size << dendl;
+ temp_file.close();
+
+ std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
+ std::shared_ptr<arw::Schema> aw_schema;
+ int64_t num_rows = 0;
+
+ auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::ReadableFile> file,
+ arrow::io::ReadableFile::Open(temp_file_name));
+ const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
+
+ file->Close();
+
+ num_rows = metadata->num_rows();
+ kv_metadata = metadata->key_value_metadata();
+ const parquet::SchemaDescriptor* pq_schema = metadata->schema();
+ ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema));
+
+ return arrow::Status::OK();
+ };
+
+ schema_status = process_metadata();
+ if (!schema_status.ok()) {
+ ERROR << "reading metadata to access schema, error=" << schema_status << dendl;
+ } else {
+ // INFO << "arrow_schema=" << *aw_schema << dendl;
+ FlightStore* store = penv.flight_store;
+ auto key =
+ store->add_flight(FlightData(uri, tenant_name, bucket_name,
+ object_key, num_rows,
+ expected_size, aw_schema,
+ kv_metadata, user_id));
+ (void) key; // suppress unused variable warning
+ }
+ } // if last block
+ } // if file opened
+
+ // chain to next filter in stream
+ int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
+
+ return ret;
+}
+
+#if 0
+void code_snippets() {
+ INFO << "num_columns:" << md->num_columns() <<
+ " num_schema_elements:" << md->num_schema_elements() <<
+ " num_rows:" << md->num_rows() <<
+ " num_row_groups:" << md->num_row_groups() << dendl;
+
+
+ INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl;
+ for (int c = 0; c < schema1->num_columns(); ++c) {
+ const parquet::ColumnDescriptor* cd = schema1->Column(c);
+ // const parquet::ConvertedType::type t = cd->converted_type;
+ const std::shared_ptr<const parquet::LogicalType> lt = cd->logical_type();
+ INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl;
}
- int FlightGetObj_Filter::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
- // do work here
- dout(0) << "ERIC: " << __func__ << ": flight handling data from offset " << bl_ofs << " of size " << bl_len << dendl;
-
- // chain upwards
- return RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
+ INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl;
+ for (int rg = 0; rg < md->num_row_groups(); ++rg) {
+ INFO << "Row Group " << rg << dendl;
+ auto rg_md = md->RowGroup(rg);
+ auto schema2 = rg_md->schema();
}
+}
+#endif
} // namespace rgw::flight