#include "arrow/type.h"
#include "arrow/buffer.h"
-#include "arrow/util/string_view.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/table.h"
previous_key(null_flight_key)
{ }
- arw::Status Next(std::unique_ptr<flt::FlightInfo>* info) {
+ arrow::Result<std::unique_ptr<flt::FlightInfo>> Next() override {
std::optional<FlightData> fd = flight_store->after_key(previous_key);
if (fd) {
previous_key = fd->key;
ARROW_ASSIGN_OR_RAISE(flt::FlightInfo info_obj,
flt::FlightInfo::Make(*fd->schema, descriptor, endpoints, fd->num_records, fd->obj_size));
- *info = std::make_unique<flt::FlightInfo>(std::move(info_obj));
- return arw::Status::OK();
+ return std::make_unique<flt::FlightInfo>(std::move(info_obj));
} else {
- *info = nullptr;
- return arw::Status::OK();
+ return nullptr;
}
}
}; // class RGWFlightListing
}
}
- arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+ arw::Result<std::string_view> Peek(int64_t nbytes) override {
INFO << "called, not implemented" << dendl;
return arw::Status::NotImplemented("peek not currently allowed");
}
return flight_data.obj_size;
}
- arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+ arw::Result<std::string_view> Peek(int64_t nbytes) override {
std::iostream::pos_type here = file.tellg();
if (here == -1) {
return arw::Status::IOError(
return flight_data.obj_size;
}
- arw::Result<arw::util::string_view> Peek(int64_t nbytes) override {
+ arw::Result<std::string_view> Peek(int64_t nbytes) override {
INFO << "entered: " << nbytes << " bytes" << dendl;
int64_t saved_position = position;
std::vector<std::shared_ptr<arw::RecordBatch>> batches;
arw::TableBatchReader batch_reader(*table);
- ARROW_RETURN_NOT_OK(batch_reader.ReadAll(&batches));
+ while (true) {
+ std::shared_ptr<arw::RecordBatch> p;
+ auto s = batch_reader.ReadNext(&p);
+ if (!s.ok()) {
+ break;
+ }
+ batches.push_back(p);
+ }
ARROW_ASSIGN_OR_RAISE(auto owning_reader,
arw::RecordBatchReader::Make(
#include "rgw_frontend.h"
#include "arrow/type.h"
#include "arrow/flight/server.h"
-#include "arrow/util/string_view.h"
#include "rgw_flight_frontend.h"
FlightStore* flight_store;
std::map<std::string, Data1> data;
+ arw::Status serve_return_value;
public:
const DoutPrefix& dp);
~FlightServer() override;
+ // provides a version of Serve that has no return value, to avoid
+ // warnings when launching in a thread
+ void ServeAlt() {
+ serve_return_value = Serve();
+ }
+
FlightStore* get_flight_store() {
return flight_store;
}
std::unique_ptr<flt::FlightDataStream> *stream) override;
}; // class FlightServer
-class OwningStringView : public arw::util::string_view {
+class OwningStringView : public std::string_view {
uint8_t* buffer;
int64_t capacity;
int64_t consumed;
OwningStringView(uint8_t* _buffer, int64_t _size) :
- arw::util::string_view((const char*) _buffer, _size),
+ std::string_view((const char*) _buffer, _size),
buffer(_buffer),
capacity(_size),
consumed(_size)
}
const std::string url =
std::string("grpc+tcp://localhost:") + std::to_string(port);
- flt::Location location;
- arw::Status s = flt::Location::Parse(url, &location);
- if (!s.ok()) {
- ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
+ auto r = flt::Location::Parse(url);
+ if (!r.ok()) {
+ ERROR << "could not parse server uri: " << url << dendl;
return -EINVAL;
}
+ flt::Location location = *r;
flt::FlightServerOptions options(location);
options.verify_client = false;
- s = env.flight_server->Init(options);
+ auto s = env.flight_server->Init(options);
if (!s.ok()) {
ERROR << "couldn't init flight server; status=" << s << dendl;
return -EINVAL;
int FlightFrontend::run() {
try {
flight_thread = make_named_thread(server_thread_name,
- &FlightServer::Serve,
+ &FlightServer::ServeAlt,
env.flight_server);
INFO << "FlightServer thread started, id=" <<
}
void FlightFrontend::stop() {
- env.flight_server->Shutdown();
- env.flight_server->Wait();
+ arw::Status s;
+ s = env.flight_server->Shutdown();
+ if (!s.ok()) {
+ ERROR << "call to Shutdown failed; status=" << s << dendl;
+ return;
+ }
+
+ s = env.flight_server->Wait();
+ if (!s.ok()) {
+ ERROR << "call to Wait failed; status=" << s << dendl;
+ return;
+ }
+
INFO << "FlightServer shut down" << dendl;
}
arrow::io::ReadableFile::Open(temp_file_name));
const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
- file->Close();
+ ARROW_RETURN_NOT_OK(file->Close());
num_rows = metadata->num_rows();
kv_metadata = metadata->key_value_metadata();