跳到主要内容
版本:latest

Arrow Flight SQL

Arrow Flight SQL 简介

Arrow Flight SQL 是一种使用 Arrow 内存格式和 Flight RPC 框架与 SQL 数据库交互的协议。

目前我们支持Arrow Flight SQL 客户端的环境有:

Arrow Flight SQL 优势

  1. 功能强大。功能与JDBC和ODBC等API类似,包括执行查询,创建准备好的语句。
  2. 安全。Flight,支持开箱即用的加密和身份验证等功能。
  3. 性能。与实现Arrow Flight 的客户端服务端通信,无需进行数据转化,同时允许进一步优化,如并行数据访问。

虽然它可以直接用于数据库访问,但它不能直接替代 JDBC/ODBC。 但是,Flight SQL 可以用作具体的有线协议/驱动程序实现,支持 JDBC/ODBC 驱动程序,并减少数据库的实现负担。

Arrow Flight SQL 查询流程

客户端使用arrow flight sql 客户端与数据库连接,查询数据,执行SQL的流程大致如下。

  1. 创建FlightSql客户端
  2. 验证用户名,密码
  3. 执行SQL,获取FlightInfo结构体
  4. 通过FlightInfo结构体中的FlightEndPoint获取到FlightData数据流

FlightInfo中包含有关数据所在位置的详细信息, 客户端可以从适当的服务器获取数据。 服务器信息被编码为 FlightInfo 中的一系列 FlightEndpoint 消息。 每个Endpoint代表包含响应数据子集的某个位置。

一个FlightEndpoint包含一个服务器地址列表, 一个Ticket, 一个服务器用来识别请求数据的二进制Token。 FlightEndPoint 没有定义顺序,如果数据集是排序的, 只会在一个FlightEndPoint中返回数据。

流程图如下

流程图

不同客户端的使用方式

提示

本章节分别介绍不同客户端的使用方式。

  • 安装Apache Arrow

    你可以去官方文档找到详细的安装教程 在Mac系统下,使用brew命令就可以简单安装了。

    brew install apache-arrow
    brew install apache-arrow-glib
  • 配置CMakeLists.txt

    cmake_minimum_required(VERSION 3.24)
    project(arrow_flight_cpp)

    set(CMAKE_CXX_STANDARD 20)

    find_package(Arrow REQUIRED)
    find_package(ArrowFlight REQUIRED)
    find_package(ArrowFlightSql REQUIRED)

    include_directories(${ARROW_INCLUDE_DIR})
    add_executable(arrow_flight_cpp main.cpp)
    target_link_libraries(arrow_flight_cpp PRIVATE Arrow::arrow_shared)
    target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlight::arrow_flight_shared)
    target_link_libraries(arrow_flight_cpp PRIVATE ArrowFlightSql::arrow_flight_sql_shared)
  • C++ Arrow库的用法

    arrow的函数大多数是返回 arrow::Result<T> 类型,因此需要把代码写在返回值为 arrow::Result<T> 的类型的函数中,如下:

     arrow::Result <std::unique_ptr<FlightClient>> get_location() {
    ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 8904));
    ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
    }

    ARROW_ASSIGN_OR_RAISE宏的效果是,先对右边返回值为 arrow::Result<T> 类型的表达式求值,如果出现异常,则提前return,赋上相应的Status值。

    为了方便展示,我们把代码写在lambda函数中。

    int main() {
    auto fun = []() {
    // code
    }
    fun();
    return 0;
    }
  • 验证身份获取令牌,并创建一个FlightSqlClient

    ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 8904))
    ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))
    auto user = "root";
    auto password = "";
    //Base64加密认证
    auto auth = client->AuthenticateBasicToken({}, user, password);
    ARROW_RETURN_NOT_OK(auth); // 如果result出现异常,直接return
    FlightCallOptions call_options;
    call_options.headers.push_back(auth.ValueOrDie()); //把认证放到调用选项中
    auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
  • 执行sql取得FlightInfo

    ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "select now();"));
    const auto endpoints = info->endpoints();
  • 通过FlightEndPoint取回数据

    for (auto i = 0; i < endpoints.size(); i++) {
    auto &ticket = endpoints[i].ticket;
    // stream中包含数据
    ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));
    // 获取数据的Schema
    auto schema = stream->GetSchema();
    ARROW_RETURN_NOT_OK(schema);
    std::cout << "Schema:" << schema->get()->ToString() << std::endl;
    // 取得并打印数据
    while(true) {
    ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
    if (chunk.data == nullptr) {
    break;
    }
    std::cout << chunk.data->ToString();
    }
    }

整体代码

#include <iostream>
#include <arrow/flight/api.h>
#include <arrow/flight/sql/api.h>
using namespace arrow::flight;
using namespace arrow::flight::sql;
using namespace arrow;

int main() {

auto fun = []() {
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 8904))
ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location))

auto user = "root";
auto password = "";
auto auth = client->AuthenticateBasicToken({}, user, password);
auto sql_client = std::make_unique<FlightSqlClient>(std::move(client));
ARROW_RETURN_NOT_OK(auth);
FlightCallOptions call_options;
call_options.headers.push_back(auth.ValueOrDie());

ARROW_ASSIGN_OR_RAISE(auto info, sql_client->Execute(call_options, "select now();"));
const auto endpoints = info->endpoints();
for (auto i = 0; i < endpoints.size(); i++) {
auto &ticket = endpoints[i].ticket;

ARROW_ASSIGN_OR_RAISE(auto stream, sql_client->DoGet(call_options, ticket));

auto schema = stream->GetSchema();
ARROW_RETURN_NOT_OK(schema);

std::cout << "Schema:" << schema->get()->ToString() << std::endl;
while(true) {
ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, stream->Next());
if (chunk.data == nullptr) {
break;
}
std::cout << chunk.data->ToString();
}
}
return Status::OK();
};

auto status = fun();

return 0;
}