Apache Arrow Flight as a Data Catalog

Fabric of data to create knowledge

Wasit Limprasert
6 min readAug 20, 2023

We are going to discuss the challenges of accessing large datasets and demo code to show possiblity.

The challenges of accessing large datasets over networks and highlights the popularity of file-based data warehousing formats like CSV, Avro, and Parquet. It mentions the limitations of existing protocols like ODBC and JDBC, which often require data deserialization on receipt, leading to performance variations. The article introduces Apache Arrow’s potential for enhancing data transport efficiency.

Arrow Columnar Format Advantages: Apache Arrow’s columnar format offers “on-the-wire” tabular data representation that avoids deserialization upon receipt. Large datasets are transported in batches of rows called “record batches,” enhancing efficiency.

Language-Independent: Arrow’s format is language-independent and is supported by a growing number of libraries across 11 programming languages.

Flight Protocol: The Flight protocol utilizes Arrow’s columnar format for both data representation and developer APIs. This eliminates serialization costs, making distributed data systems more efficient and enabling efficient communication between systems using Arrow.

Flight Basics: The Arrow Flight libraries provide a development framework for creating services that can send and receive data streams. It supports various requests like Handshake, ListFlights, GetSchema, GetFlightInfo, DoGet, DoPut, DoAction, and ListActions. gRPC’s bidirectional streaming is leveraged for simultaneous data and metadata exchange.

Apache Arrow’s Transformative Impact: Apache Arrow has revolutionized data processing and transport since its 2016 release. It offers an in-memory columnar format, language libraries in over 12 programming languages, and tools like Apache Arrow Gandiva for improved performance with SQL User Defined Functions.

Apache Arrow Flight: Flight is a protocol for transporting Arrow Data over the wire, replacing row-based options like JDBC/ODBC. This protocol improves performance by reducing serialization and deserialization. The transport protocol is implemented as a gRPC API, enabling cross-language communication.

Arrow ODBC/JDBC Drivers: Arrow provides ODBC/JDBC drivers that maintain familiar patterns while leveraging Arrow’s speed benefits. These drivers simplify connection to Arrow-supporting sources and eliminate the need for multiple driver downloads and configurations.

Example code:

Here are servery.py and client.py to demo clinet-to-server data communication.

# server.py
import pyarrow as pa
import pyarrow.flight
import pyarrow.parquet
import pathlib

class FlightServer(pa.flight.FlightServerBase):

def __init__(self, location="grpc://0.0.0.0:8816", repo="./datasets"):
super().__init__(location)
self._location = location
self._repo = pathlib.Path(repo)

def _make_flight_info(self, dataset):
dataset_path = self._repo / dataset
metadata = pa.parquet.read_metadata(dataset_path)
descriptor = pa.flight.FlightDescriptor.for_path(dataset.encode())
endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])]
return pa.flight.FlightInfo(
pa.parquet.read_schema(dataset_path),
descriptor,
endpoints,
metadata.num_rows,
metadata.serialized_size
)

def list_flights(self, context, criteria):
for dataset in self._repo.iterdir():
yield self._make_flight_info(dataset.name)

def get_flight_info(self, context, descriptor):
return self._make_flight_info(descriptor.path[0].decode())

def do_put(self, context, descriptor, reader, writer):
dataset = descriptor.path[0].decode()
dataset_path = self._repo / dataset
data_table = reader.read_all()
pa.parquet.write_table(data_table, dataset_path)

def do_get(self, context, ticket):
dataset = ticket.ticket.decode()
dataset_path = self._repo / dataset
return pa.flight.RecordBatchStream(pa.parquet.read_table(dataset_path))

def list_actions(self, context):
return [("drop_dataset", "Delete a dataset.")]

def do_action(self, context, action):
if action.type == "drop_dataset":
self._drop_dataset(action.body.to_pybytes().decode())
else:
raise NotImplementedError

def _drop_dataset(self, dataset):
(self._repo / dataset).unlink()

if __name__ == "__main__":
server = FlightServer()
server._repo.mkdir(exist_ok=True)
server.serve()

The server.py script implements a Flight server using the Apache Arrow framework for efficient data transport. The script defines a FlightServer class that inherits from pa.flight.FlightServerBase. The server initializes with a given location and a repository path to store datasets. It includes methods to handle various Flight operations:

  1. _make_flight_info: Constructs FlightInfo objects based on dataset information.
  2. list_flights: Provides a list of available datasets as FlightInfo objects.
  3. get_flight_info: Retrieves metadata about a specific dataset.
  4. do_put: Handles storing incoming DataFrames into Parquet files.
  5. do_get: Retrieves stored DataFrames as RecordBatchStreams.
  6. list_actions: Lists available custom actions.
  7. do_action: Executes custom actions (e.g., dropping a dataset).
  8. _drop_dataset: Deletes a dataset from the repository.

The server is initialized with the FlightServer class, and its methods are used to serve data transport and interaction with the Flight client. The repository is managed to store and retrieve datasets efficiently.

# client.py
import pyarrow as pa
import pyarrow.flight

class FlightClient2:
def __init__(self, server_location):
print(' debug 0:__inti__(): initialize client-to-server connection')
self.client = pa.flight.connect(location=server_location)

def upload_dataset(self, table, dataset_path):
print(' debug 1:upload_dataset(): upload table to server and save to parquet')
upload_descriptor = pa.flight.FlightDescriptor.for_path(dataset_path)
writer, flight = self.client.do_put(upload_descriptor, table.schema)
writer.write_table(table)
writer.close()
return flight

def show_metadata(self, flight):
print(' debug 2:retrieve_metadata(): show metadata for a flight object')
descriptor = flight.descriptor
print("\tpath:", descriptor.path[0].decode(), "rows:", flight.total_records, "size:", flight.total_bytes)
schema_str = flight.schema.to_string()
schema_str = '\t'*2 + schema_str.replace('\n', '\n\t\t')
print(schema_str)

def list_datasets(self):
print(' debug 3:list_datasets(): list all datasets on the server')
for flight in self.client.list_flights():
self.show_metadata(flight)
print("")

def print_metadata(self,dataset_path):
flight = self.client.get_flight_info(pa.flight.FlightDescriptor.for_path(dataset_path))
self.show_metadata(flight)

def print_dataset(self, reader):
print(' debug 4:read_and_print_dataset(): read dataframe from the server and print head of the table')
read_table = reader.read_all()
print(read_table.to_pandas().head())

def drop_dataset(self, dataset):
print(' debug:drop_dataset(): drop dataset')
self.client.do_action(pa.flight.Action("drop_dataset", dataset.encode()))

if __name__ == "__main__":
print('# 1.Create client-to-server connection')
c = FlightClient2("grpc://0.0.0.0:8816")

print('# 2.Upload example dataframe to parquet')
data_table = pa.table([["Alex", "Bob", "Maya"],[7,8,9]], names=["character","age"])
c.upload_dataset(data_table, "uploaded.parquet")

print('# 3.Check the result from upload')
c.print_metadata('uploaded.parquet')
c.list_datasets()

print('# 4.Browse datasets on servers')
read_flights = list(c.client.list_flights())
read_flight = read_flights[0]
read_reader = c.client.do_get(read_flight.endpoints[0].ticket)
c.print_dataset(read_reader)

print('# 5. delete the dataset')
c.drop_dataset("uploaded.parquet")
c.list_datasets()

The client.py script interacts with the Flight server, showcasing the Apache Arrow Flight framework. It defines a FlightClient class responsible for managing communication with the server. The script demonstrates various operations:

  1. upload_dataset: Uploads a DataFrame to the server as a Parquet file.
  2. retrieve_metadata: Displays metadata and schema information for a dataset.
  3. list_datasets: Lists available datasets and their metadata.
  4. read_and_print_dataset: Reads and prints the content of a dataset.
  5. drop_dataset: Requests the server to drop a specific dataset.

In the main part of the script, an instance of FlightClient is created, and various operations are performed to upload, retrieve, list, read, and drop datasets on the Flight server. The script showcases the streamlined data transport capabilities of Apache Arrow Flight.

Results:

Here are the results from to demo

  1. Create client-to-server connection
  2. Upload example dataframe to parquet
  3. Check the result from upload
  4. Browse datasets on servers
  5. delete the dataset
# 1.Create client-to-server connection
debug 0:__inti__(): initialize client-to-server connection
# 2.Upload example dataframe to parquet
debug 1:upload_dataset(): upload table to server and save to parquet
# 3.Check the result from upload
debug 2:retrieve_metadata(): show metadata for a flight object
path: uploaded.parquet rows: 3 size: 545
character: string
age: int64
debug 3:list_datasets(): list all datasets on the server
debug 2:retrieve_metadata(): show metadata for a flight object
path: uploaded.parquet rows: 3 size: 545
character: string
age: int64

# 4.Browse datasets on servers
debug 4:read_and_print_dataset(): read dataframe from the server and print head of the table
character age
0 Alex 7
1 Bob 8
2 Maya 9
# 5. delete the dataset
debug:drop_dataset(): drop dataset
debug 3:list_datasets(): list all datasets on the server

Conclusion:

In the realm of modern data management and transport, the scripts server.py and client.py demonstrate the prowess of the Apache Arrow Flight framework. By gracefully handling data transport between a Flight server and client, these scripts emphasize the importance of efficient and standardized methods for sharing and analyzing large datasets. Leveraging Apache Arrow's columnar format and streamlined protocols, data processing and transport are made smoother and more efficient.

The elegance of Pythonic design principles, encapsulation, and modularity in the code highlights the essence of code readability and maintainability. With data stored in Pandas DataFrames, these scripts exemplify the beauty of data manipulation within the Python ecosystem.

Future Possibilities for Data Fabric:

The landscape of data fabric holds intriguing possibilities that these scripts hint at:

  1. Real-time Streaming: Expanding data fabric to incorporate real-time data streaming can enable live analytics, predictive insights, and rapid decision-making.
  2. Distributed Data Processing: Enhancing data fabric to support distributed computing frameworks, like Apache Spark or Dask, could unlock even more powerful data processing capabilities.
  3. Security and Privacy: Integrating robust security measures within the fabric would ensure safe data transmission, safeguarding sensitive information.
  4. Hybrid Cloud Deployments: Extending data fabric to seamlessly bridge on-premises and cloud data sources can provide unified access and processing across environments.
  5. Advanced Analytics: Enriching data fabric with machine learning and AI capabilities opens doors to sophisticated analytics and automation.
  6. Data Governance and Cataloging: Implementing metadata management and governance features within the fabric ensures data quality, lineage, and compliance.
  7. Interoperability: Further extending data fabric’s compatibility with diverse data sources, formats, and programming languages promotes seamless data integration.

As technology advances, the concept of data fabric is poised to weave even tighter connections between data sources, consumers, and applications. The scripts presented here are but a glimpse into the broader potential of data fabric as a dynamic, agile, and powerful solution for data management and analytics in a connected world.

references:

https://github.com/wasit7/demokg/tree/main/flight

--

--

Wasit Limprasert

Wasit Limprasert is at Thammasat University, PhD in Computer Science from Heriot-Watt University. Specialist in Big Data, Deep Learning and Knowledge Graph.