By Qiji, Xiaojiang, and Jiang Jizhong from Alibaba’s Cainiao logistics unit
Cainiao’s intelligent logistics analysis engine, hereinafter referred to as the engine, is a logistics query platform built based on a search architecture. It handles billions of parcel-related events per day and processes most of Cainiao’s logistics data.
The engine adopts a unified technical architecture to adapt to various application scenarios of the delivery and distribution network, to provide powerful throughput and computing capabilities. In the original architecture, data was processed in the following procedure: DataHub collected data from data sources in real time, including the warehousing, distribution, delivery, and order data. Then, Flink preprocessed data, created an order-based wide table that contained order tracking events, and wrote the table data to HBase for external queries. Flink is a real-time computing engine that supports both stream and batch processing.
As the volume of data to be processed increased, it took more time to fully import dimension table data to HBase. As a result, more resources were consumed. In addition, the single-node throughput was low, causing a high unit cost. When the data volume was small, costs were not a key factor to consider. However, as the data volume increased, costs became more significant. The engine must process huge amounts of data every day, which caused high consumption of resources.
In addition, in Cainiao’s business scenarios, some tables are used as Flink dimension tables for primary key-based point queries, and some others are used for online analytical processing (OLAP) queries. HBase cannot meet both query requirements. Data must be synchronized to the batch processing system for OLAP queries and to KVStore for key-value queries. Multiple systems were required to meet different query requirements. In this case, data was imported and exported among different systems. This could cause the increase in the amount of data to synchronize, redundant data storage, and data inconsistency. In addition, development and O&M of multiple systems increased the costs.
Based on this background, the most urgent task was to reduce the overall resource consumption. Therefore, Cainiao needed a service that could provide both storage and high-performance writing. In query scenarios, it was preferred if this service could support both key-value queries and complex OLAP queries. In this way, data silos among multiple systems could be eliminated and diversified data requirements could be met.
We conducted surveys on multiple services within Alibaba and finally decided to replace HBase with Hologres.
The engine processes large numbers of tables and records. It runs full data processing tasks to process snapshots of daily partitions of MaxCompute tables, including express delivery, warehousing, and distribution data, and runs incremental data processing tasks to process event streams.
A full data processing task generates objective fulfillment and historical attribute information of a parcel based on the historical fulfillment progress of the parcel. The information is then synchronized to Hologres in real time by using a Flink job. Hologres provides the information for offline data processing tasks to perform association. After receiving an event message, a real-time data processing task finds the historical fulfillment information about the parcel based on the message. It then predicts the fulfillment progress by calling an algorithm service chain for order combination or splitting, end node prediction, routing, and timeliness prediction. The real-time data processing task writes the predicted fulfillment progress to TT and Hologres for offline data processing tasks to perform association. TT is message-oriented middleware similar to Kafka.
We sorted out data relationships and minimized the dependency between data based on the coordination of data processing tasks. The following figure shows the final business processing architecture.
Data driver layer:
Provides a primary table driver for full data processing tasks, a primary table driver for incremental data processing tasks, and a secondary table driver for business.
Data association layer:
Provides SQL operators of various Blink versions (Alibaba Flink versions). This layer associates and distributes data to different data partitions based on optimized storage and computing. In this way, the throughput of full and incremental data processing tasks is increased and the performance is improved.
Data exchange layer:
Writes index data to the indexing service by using the Swift Sink mechanism, and stores internal data to the storage service through a write API.
Replacing HBase with Hologres produces the following benefits:
Overall costs of hardware resources reduced by more than 60%
Compared with HBase, Hologres with the same configuration provides superior write performance and higher throughput. This means that Hologres can process the same amount of data with fewer resources. In actual business scenarios, the overall costs of hardware resources are reduced by more than 60%.
Faster end-to-end data processing (end-to-end processing of 200 million records within 3 minutes)
The time required for full data processing is an important indicator for evaluating service availability. Assume that the code published on a day for processing data has a bug. Due to the bug, newly generated data is unavailable. After the bug is fixed, the error data caused by the bug needs to be processed. In this case, a full data processing task needs to be run to overwrite the error data with normal data. The running duration of the full data processing task determines the duration of the failure. The service can recover from the failure faster if the full data processing task runs faster.
When the engine runs a full data processing task, the task verifies the data in all dimension tables first, which is time-consuming. Hologres can synchronize a table that contains more than 200 million records within about 3 minutes. This reduces the time required for full data processing and helps to deal with emergencies more easily.
One system that supports both key-value queries and OLAP without redundant data
Hologres supports two storage modes: row store and column store. Column store is suitable for interactive analysis of huge amounts of data, and row store is suitable for full-row reading based on primary keys. Therefore, Cainiao can store all data in Hologres and use row store for the data to be used for point queries and column store for the data to be used for complex OLAP queries. Hologres meets both OLAP and key-value query requirements without assistance of other systems. It eliminates redundant data storage as well as data import and export among various systems. In addition, it simplifies O&M.
Real-time SQL queries for large dimension tables
The original architecture of the engine supported data queries through key-value APIs, which was inconvenient. Hologres is compatible with PostgreSQL and can be accessed by using a PostgreSQL client. Users can query data in Hologres tables by using the standard PostgreSQL syntax and various filter criteria to check data in real time.
In the original architecture of the engine, a weak schema was used to store dimension tables. When detecting that some fields to be accessed did not exist, Flink tasks did not report errors but returned empty field values. As a result, incorrect field names in code could not be found until data output or even usage. In addition, the incorrect field names were difficult to locate during debugging. Hologres can immediately report an error when it detects an incorrect field name. This avoids potential errors and saves debugging time.